Skip to main content

batty_cli/team/
comms.rs

1//! External communication channels for user roles.
2//!
3//! The `user` role type communicates via channels (Telegram, Slack, etc.)
4//! instead of tmux panes. Each channel provider is a CLI tool that the
5//! daemon invokes for outbound messages.
6
7use std::collections::VecDeque;
8use std::hash::{Hash, Hasher};
9use std::sync::Mutex;
10use std::time::{Duration, Instant};
11
12use tracing::{debug, warn};
13
14use super::config::ChannelConfig;
15use super::errors::DeliveryError;
16use super::telegram::TelegramBot;
17
18const TELEGRAM_DEDUP_TTL: Duration = Duration::from_secs(300);
19const TELEGRAM_DEDUP_CAPACITY: usize = 512;
20
21#[derive(Debug)]
22struct RecentTelegramSends {
23    ttl: Duration,
24    capacity: usize,
25    entries: Mutex<VecDeque<(u64, Instant)>>,
26}
27
28impl RecentTelegramSends {
29    fn new(ttl: Duration, capacity: usize) -> Self {
30        Self {
31            ttl,
32            capacity,
33            entries: Mutex::new(VecDeque::new()),
34        }
35    }
36
37    fn prune_expired(&self, now: Instant, entries: &mut VecDeque<(u64, Instant)>) {
38        while entries
39            .front()
40            .is_some_and(|(_, sent_at)| now.duration_since(*sent_at) > self.ttl)
41        {
42            entries.pop_front();
43        }
44        while entries.len() > self.capacity {
45            entries.pop_front();
46        }
47    }
48
49    fn contains_recent(&self, message_id: u64) -> bool {
50        let now = Instant::now();
51        let mut entries = self.entries.lock().unwrap();
52        self.prune_expired(now, &mut entries);
53        entries.iter().any(|(id, _)| *id == message_id)
54    }
55
56    fn record(&self, message_id: u64) {
57        let now = Instant::now();
58        let mut entries = self.entries.lock().unwrap();
59        self.prune_expired(now, &mut entries);
60        entries.push_back((message_id, now));
61        self.prune_expired(now, &mut entries);
62    }
63}
64
65fn telegram_message_id(target: &str, message: &str) -> u64 {
66    let mut hasher = std::collections::hash_map::DefaultHasher::new();
67    target.hash(&mut hasher);
68    message.hash(&mut hasher);
69    hasher.finish()
70}
71
72/// Trait for outbound message delivery to external channels.
73pub trait Channel: Send + Sync {
74    /// Send a text message to the channel destination.
75    fn send(&self, message: &str) -> std::result::Result<(), DeliveryError>;
76    /// Channel type identifier (e.g., "telegram").
77    fn channel_type(&self) -> &str;
78}
79
80/// Telegram channel via openclaw (or any CLI provider).
81pub struct TelegramChannel {
82    target: String,
83    provider: String,
84    recent_sends: RecentTelegramSends,
85}
86
87impl TelegramChannel {
88    pub fn new(target: String, provider: String) -> Self {
89        Self::with_dedup_settings(
90            target,
91            provider,
92            TELEGRAM_DEDUP_TTL,
93            TELEGRAM_DEDUP_CAPACITY,
94        )
95    }
96
97    pub fn from_config(config: &ChannelConfig) -> Self {
98        Self::new(config.target.clone(), config.provider.clone())
99    }
100
101    fn with_dedup_settings(
102        target: String,
103        provider: String,
104        ttl: Duration,
105        capacity: usize,
106    ) -> Self {
107        Self {
108            target,
109            provider,
110            recent_sends: RecentTelegramSends::new(ttl, capacity),
111        }
112    }
113}
114
115impl Channel for TelegramChannel {
116    fn send(&self, message: &str) -> std::result::Result<(), DeliveryError> {
117        let message_id = telegram_message_id(&self.target, message);
118        if self.recent_sends.contains_recent(message_id) {
119            debug!(target = %self.target, message_id, "suppressing duplicate telegram message");
120            return Ok(());
121        }
122
123        debug!(target = %self.target, provider = %self.provider, len = message.len(), "sending via telegram channel");
124
125        let output = std::process::Command::new(&self.provider)
126            .args([
127                "message",
128                "send",
129                "--to",
130                &self.target,
131                "--message",
132                message,
133            ])
134            .output();
135
136        match output {
137            Ok(out) if out.status.success() => {
138                self.recent_sends.record(message_id);
139                debug!("telegram message sent successfully");
140                Ok(())
141            }
142            Ok(out) => {
143                let stderr = String::from_utf8_lossy(&out.stderr);
144                warn!(status = ?out.status, stderr = %stderr, "telegram send failed");
145                Err(DeliveryError::ChannelSend {
146                    recipient: self.target.clone(),
147                    detail: stderr.to_string(),
148                })
149            }
150            Err(e) => {
151                warn!(error = %e, provider = %self.provider, "failed to execute channel provider");
152                Err(DeliveryError::ProviderExec {
153                    provider: self.provider.clone(),
154                    source: e,
155                })
156            }
157        }
158    }
159
160    fn channel_type(&self) -> &str {
161        "telegram"
162    }
163}
164
165/// Native Telegram channel using the Bot API directly (no CLI provider).
166pub struct NativeTelegramChannel {
167    bot: TelegramBot,
168    target: String,
169    recent_sends: RecentTelegramSends,
170}
171
172impl NativeTelegramChannel {
173    pub fn new(bot: TelegramBot, target: String) -> Self {
174        Self::with_dedup_settings(target, bot, TELEGRAM_DEDUP_TTL, TELEGRAM_DEDUP_CAPACITY)
175    }
176
177    /// Build from a `ChannelConfig`, returning `None` if no bot token is available.
178    pub fn from_config(config: &ChannelConfig) -> Option<Self> {
179        TelegramBot::from_config(config).map(|bot| Self::new(bot, config.target.clone()))
180    }
181
182    fn with_dedup_settings(
183        target: String,
184        bot: TelegramBot,
185        ttl: Duration,
186        capacity: usize,
187    ) -> Self {
188        Self {
189            bot,
190            target,
191            recent_sends: RecentTelegramSends::new(ttl, capacity),
192        }
193    }
194}
195
196impl Channel for NativeTelegramChannel {
197    fn send(&self, message: &str) -> std::result::Result<(), DeliveryError> {
198        let message_id = telegram_message_id(&self.target, message);
199        if self.recent_sends.contains_recent(message_id) {
200            debug!(
201                target = %self.target,
202                message_id,
203                "suppressing duplicate native telegram message"
204            );
205            return Ok(());
206        }
207
208        debug!(target = %self.target, len = message.len(), "sending via native telegram channel");
209        self.bot
210            .send_message(&self.target, message)
211            .map(|_| {
212                self.recent_sends.record(message_id);
213            })
214            .map_err(|error| DeliveryError::ChannelSend {
215                recipient: self.target.clone(),
216                detail: error.to_string(),
217            })
218    }
219
220    fn channel_type(&self) -> &str {
221        "telegram-native"
222    }
223}
224
225/// Create a channel from config fields.
226pub fn channel_from_config(
227    channel_type: &str,
228    config: &ChannelConfig,
229) -> std::result::Result<Box<dyn Channel>, DeliveryError> {
230    match channel_type {
231        "telegram" => {
232            if let Some(native) = NativeTelegramChannel::from_config(config) {
233                Ok(Box::new(native))
234            } else {
235                Ok(Box::new(TelegramChannel::from_config(config)))
236            }
237        }
238        other => Err(DeliveryError::UnsupportedChannel {
239            channel_type: other.to_string(),
240        }),
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use std::fs;
248
249    #[test]
250    fn telegram_channel_type() {
251        let ch = TelegramChannel::new("12345".into(), "openclaw".into());
252        assert_eq!(ch.channel_type(), "telegram");
253    }
254
255    #[test]
256    fn native_telegram_channel_type() {
257        let bot = TelegramBot::new("test-token".into(), vec![]);
258        let ch = NativeTelegramChannel::new(bot, "12345".into());
259        assert_eq!(ch.channel_type(), "telegram-native");
260    }
261
262    #[test]
263    fn channel_from_config_telegram() {
264        let config = ChannelConfig {
265            target: "12345".into(),
266            provider: "openclaw".into(),
267            bot_token: None,
268            allowed_user_ids: vec![],
269        };
270        // Without bot_token (and assuming env var is not set), falls back to CLI channel.
271        if std::env::var("BATTY_TELEGRAM_BOT_TOKEN").is_err() {
272            let ch = channel_from_config("telegram", &config).unwrap();
273            assert_eq!(ch.channel_type(), "telegram");
274        }
275    }
276
277    #[test]
278    fn channel_from_config_telegram_with_bot_token() {
279        let config = ChannelConfig {
280            target: "12345".into(),
281            provider: "openclaw".into(),
282            bot_token: Some("test-bot-token".into()),
283            allowed_user_ids: vec![],
284        };
285        let ch = channel_from_config("telegram", &config).unwrap();
286        assert_eq!(ch.channel_type(), "telegram-native");
287    }
288
289    #[test]
290    fn channel_from_config_telegram_without_bot_token() {
291        let config = ChannelConfig {
292            target: "12345".into(),
293            provider: "openclaw".into(),
294            bot_token: None,
295            allowed_user_ids: vec![],
296        };
297        // Only assert CLI fallback when the env var is also absent.
298        if std::env::var("BATTY_TELEGRAM_BOT_TOKEN").is_err() {
299            let ch = channel_from_config("telegram", &config).unwrap();
300            assert_eq!(ch.channel_type(), "telegram");
301        }
302    }
303
304    #[test]
305    fn channel_from_config_unknown_type() {
306        let config = ChannelConfig {
307            target: "x".into(),
308            provider: "x".into(),
309            bot_token: None,
310            allowed_user_ids: vec![],
311        };
312        match channel_from_config("slack", &config) {
313            Err(e) => assert!(e.to_string().contains("unsupported")),
314            Ok(_) => panic!("expected error for unsupported channel"),
315        }
316    }
317
318    #[test]
319    fn telegram_send_fails_gracefully_with_missing_provider() {
320        let ch = TelegramChannel::new("12345".into(), "/nonexistent/binary".into());
321        let result = ch.send("hello");
322        assert!(result.is_err());
323        assert!(
324            result
325                .unwrap_err()
326                .to_string()
327                .contains("failed to execute")
328        );
329    }
330
331    #[test]
332    fn telegram_message_id_changes_with_target_and_body() {
333        let first = telegram_message_id("12345", "hello");
334        let second = telegram_message_id("12345", "hello again");
335        let third = telegram_message_id("67890", "hello");
336        assert_ne!(first, second);
337        assert_ne!(first, third);
338    }
339
340    #[test]
341    fn telegram_recent_sends_respects_ttl() {
342        let cache = RecentTelegramSends::new(Duration::from_millis(50), 16);
343        let id = telegram_message_id("12345", "hello");
344        assert!(!cache.contains_recent(id));
345        cache.record(id);
346        assert!(cache.contains_recent(id));
347        std::thread::sleep(Duration::from_millis(100));
348        assert!(!cache.contains_recent(id));
349    }
350
351    #[test]
352    fn telegram_channel_suppresses_duplicate_messages() {
353        let tmp = tempfile::tempdir().unwrap();
354        let log_path = tmp.path().join("provider.log");
355        let script_path = tmp.path().join("fake-provider.sh");
356        fs::write(
357            &script_path,
358            format!(
359                "#!/bin/sh\nprintf '%s\\n' \"$*\" >> \"{}\"\n",
360                log_path.display()
361            ),
362        )
363        .unwrap();
364        let mut perms = fs::metadata(&script_path).unwrap().permissions();
365        #[cfg(unix)]
366        {
367            use std::os::unix::fs::PermissionsExt;
368            perms.set_mode(0o755);
369        }
370        fs::set_permissions(&script_path, perms).unwrap();
371
372        let ch = TelegramChannel::with_dedup_settings(
373            "12345".into(),
374            script_path.display().to_string(),
375            Duration::from_secs(60),
376            16,
377        );
378        ch.send("hello").unwrap();
379        ch.send("hello").unwrap();
380
381        let lines = fs::read_to_string(&log_path).unwrap();
382        assert_eq!(lines.lines().count(), 1);
383    }
384
385    #[test]
386    fn telegram_channel_allows_unique_messages_and_retries_after_ttl() {
387        let tmp = tempfile::tempdir().unwrap();
388        let log_path = tmp.path().join("provider.log");
389        let script_path = tmp.path().join("fake-provider.sh");
390        fs::write(
391            &script_path,
392            format!(
393                "#!/bin/sh\nprintf '%s\\n' \"$*\" >> \"{}\"\n",
394                log_path.display()
395            ),
396        )
397        .unwrap();
398        let mut perms = fs::metadata(&script_path).unwrap().permissions();
399        #[cfg(unix)]
400        {
401            use std::os::unix::fs::PermissionsExt;
402            perms.set_mode(0o755);
403        }
404        fs::set_permissions(&script_path, perms).unwrap();
405
406        let ch = TelegramChannel::with_dedup_settings(
407            "12345".into(),
408            script_path.display().to_string(),
409            Duration::from_millis(5),
410            16,
411        );
412        ch.send("first").unwrap();
413        ch.send("second").unwrap();
414        std::thread::sleep(Duration::from_millis(10));
415        ch.send("first").unwrap();
416
417        let lines = fs::read_to_string(&log_path).unwrap();
418        assert_eq!(lines.lines().count(), 3);
419    }
420}