Skip to main content

batty_cli/team/
comms.rs

1//! External communication channels for user roles.
2//!
3//! The `user` role type communicates via channels (Telegram, Slack, etc.)
4//! instead of tmux panes. Each channel provider is a CLI tool that the
5//! daemon invokes for outbound messages.
6
7use std::collections::VecDeque;
8use std::hash::{Hash, Hasher};
9use std::sync::Mutex;
10use std::time::{Duration, Instant};
11
12use tracing::{debug, warn};
13
14use super::config::ChannelConfig;
15use super::discord::DiscordBot;
16use super::errors::DeliveryError;
17use super::telegram::TelegramBot;
18
19const TELEGRAM_DEDUP_TTL: Duration = Duration::from_secs(300);
20const TELEGRAM_DEDUP_CAPACITY: usize = 512;
21
22#[derive(Debug)]
23struct RecentTelegramSends {
24    ttl: Duration,
25    capacity: usize,
26    entries: Mutex<VecDeque<(u64, Instant)>>,
27}
28
29impl RecentTelegramSends {
30    fn new(ttl: Duration, capacity: usize) -> Self {
31        Self {
32            ttl,
33            capacity,
34            entries: Mutex::new(VecDeque::new()),
35        }
36    }
37
38    fn prune_expired(&self, now: Instant, entries: &mut VecDeque<(u64, Instant)>) {
39        while entries
40            .front()
41            .is_some_and(|(_, sent_at)| now.duration_since(*sent_at) > self.ttl)
42        {
43            entries.pop_front();
44        }
45        while entries.len() > self.capacity {
46            entries.pop_front();
47        }
48    }
49
50    fn contains_recent(&self, message_id: u64) -> bool {
51        let now = Instant::now();
52        let mut entries = self.entries.lock().unwrap();
53        self.prune_expired(now, &mut entries);
54        entries.iter().any(|(id, _)| *id == message_id)
55    }
56
57    fn record(&self, message_id: u64) {
58        let now = Instant::now();
59        let mut entries = self.entries.lock().unwrap();
60        self.prune_expired(now, &mut entries);
61        entries.push_back((message_id, now));
62        self.prune_expired(now, &mut entries);
63    }
64}
65
66fn telegram_message_id(target: &str, message: &str) -> u64 {
67    let mut hasher = std::collections::hash_map::DefaultHasher::new();
68    target.hash(&mut hasher);
69    message.hash(&mut hasher);
70    hasher.finish()
71}
72
73/// Trait for outbound message delivery to external channels.
74pub trait Channel: Send + Sync {
75    /// Send a text message to the channel destination.
76    fn send(&self, message: &str) -> std::result::Result<(), DeliveryError>;
77    /// Channel type identifier (e.g., "telegram").
78    fn channel_type(&self) -> &str;
79}
80
81/// Telegram channel via openclaw (or any CLI provider).
82pub struct TelegramChannel {
83    target: String,
84    provider: String,
85    recent_sends: RecentTelegramSends,
86}
87
88impl TelegramChannel {
89    pub fn new(target: String, provider: String) -> Self {
90        Self::with_dedup_settings(
91            target,
92            provider,
93            TELEGRAM_DEDUP_TTL,
94            TELEGRAM_DEDUP_CAPACITY,
95        )
96    }
97
98    pub fn from_config(config: &ChannelConfig) -> Self {
99        Self::new(config.target.clone(), config.provider.clone())
100    }
101
102    fn with_dedup_settings(
103        target: String,
104        provider: String,
105        ttl: Duration,
106        capacity: usize,
107    ) -> Self {
108        Self {
109            target,
110            provider,
111            recent_sends: RecentTelegramSends::new(ttl, capacity),
112        }
113    }
114}
115
116impl Channel for TelegramChannel {
117    fn send(&self, message: &str) -> std::result::Result<(), DeliveryError> {
118        let message_id = telegram_message_id(&self.target, message);
119        if self.recent_sends.contains_recent(message_id) {
120            debug!(target = %self.target, message_id, "suppressing duplicate telegram message");
121            return Ok(());
122        }
123
124        debug!(target = %self.target, provider = %self.provider, len = message.len(), "sending via telegram channel");
125
126        let output = std::process::Command::new(&self.provider)
127            .args([
128                "message",
129                "send",
130                "--to",
131                &self.target,
132                "--message",
133                message,
134            ])
135            .output();
136
137        match output {
138            Ok(out) if out.status.success() => {
139                self.recent_sends.record(message_id);
140                debug!("telegram message sent successfully");
141                Ok(())
142            }
143            Ok(out) => {
144                let stderr = String::from_utf8_lossy(&out.stderr);
145                warn!(status = ?out.status, stderr = %stderr, "telegram send failed");
146                Err(DeliveryError::ChannelSend {
147                    recipient: self.target.clone(),
148                    detail: stderr.to_string(),
149                })
150            }
151            Err(e) => {
152                warn!(error = %e, provider = %self.provider, "failed to execute channel provider");
153                Err(DeliveryError::ProviderExec {
154                    provider: self.provider.clone(),
155                    source: e,
156                })
157            }
158        }
159    }
160
161    fn channel_type(&self) -> &str {
162        "telegram"
163    }
164}
165
166/// Native Telegram channel using the Bot API directly (no CLI provider).
167pub struct NativeTelegramChannel {
168    bot: TelegramBot,
169    target: String,
170    recent_sends: RecentTelegramSends,
171}
172
173impl NativeTelegramChannel {
174    pub fn new(bot: TelegramBot, target: String) -> Self {
175        Self::with_dedup_settings(target, bot, TELEGRAM_DEDUP_TTL, TELEGRAM_DEDUP_CAPACITY)
176    }
177
178    /// Build from a `ChannelConfig`, returning `None` if no bot token is available.
179    pub fn from_config(config: &ChannelConfig) -> Option<Self> {
180        TelegramBot::from_config(config).map(|bot| Self::new(bot, config.target.clone()))
181    }
182
183    fn with_dedup_settings(
184        target: String,
185        bot: TelegramBot,
186        ttl: Duration,
187        capacity: usize,
188    ) -> Self {
189        Self {
190            bot,
191            target,
192            recent_sends: RecentTelegramSends::new(ttl, capacity),
193        }
194    }
195}
196
197impl Channel for NativeTelegramChannel {
198    fn send(&self, message: &str) -> std::result::Result<(), DeliveryError> {
199        let message_id = telegram_message_id(&self.target, message);
200        if self.recent_sends.contains_recent(message_id) {
201            debug!(
202                target = %self.target,
203                message_id,
204                "suppressing duplicate native telegram message"
205            );
206            return Ok(());
207        }
208
209        debug!(target = %self.target, len = message.len(), "sending via native telegram channel");
210        self.bot
211            .send_message(&self.target, message)
212            .map(|_| {
213                self.recent_sends.record(message_id);
214            })
215            .map_err(|error| DeliveryError::ChannelSend {
216                recipient: self.target.clone(),
217                detail: error.to_string(),
218            })
219    }
220
221    fn channel_type(&self) -> &str {
222        "telegram-native"
223    }
224}
225
226/// Native Discord channel using the Bot API directly.
227pub struct DiscordChannel {
228    bot: DiscordBot,
229    channel_id: String,
230}
231
232impl DiscordChannel {
233    pub fn new(bot: DiscordBot, channel_id: String) -> Self {
234        Self { bot, channel_id }
235    }
236
237    pub fn from_config(config: &ChannelConfig) -> Option<Self> {
238        let channel_id = config
239            .commands_channel_id
240            .clone()
241            .or_else(|| config.events_channel_id.clone())?;
242        DiscordBot::from_config(config).map(|bot| Self::new(bot, channel_id))
243    }
244}
245
246impl Channel for DiscordChannel {
247    fn send(&self, message: &str) -> std::result::Result<(), DeliveryError> {
248        self.bot
249            .send_formatted_message(&self.channel_id, message)
250            .map_err(|error| DeliveryError::ChannelSend {
251                recipient: self.channel_id.clone(),
252                detail: error.to_string(),
253            })
254    }
255
256    fn channel_type(&self) -> &str {
257        "discord"
258    }
259}
260
261/// Create a channel from config fields.
262pub fn channel_from_config(
263    channel_type: &str,
264    config: &ChannelConfig,
265) -> std::result::Result<Box<dyn Channel>, DeliveryError> {
266    match channel_type {
267        "telegram" => {
268            if let Some(native) = NativeTelegramChannel::from_config(config) {
269                Ok(Box::new(native))
270            } else {
271                Ok(Box::new(TelegramChannel::from_config(config)))
272            }
273        }
274        "discord" => DiscordChannel::from_config(config)
275            .map(|channel| Box::new(channel) as Box<dyn Channel>)
276            .ok_or_else(|| DeliveryError::UnsupportedChannel {
277                channel_type: "discord".to_string(),
278            }),
279        other => Err(DeliveryError::UnsupportedChannel {
280            channel_type: other.to_string(),
281        }),
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288    use std::fs;
289
290    #[test]
291    fn telegram_channel_type() {
292        let ch = TelegramChannel::new("12345".into(), "openclaw".into());
293        assert_eq!(ch.channel_type(), "telegram");
294    }
295
296    #[test]
297    fn native_telegram_channel_type() {
298        let bot = TelegramBot::new("test-token".into(), vec![]);
299        let ch = NativeTelegramChannel::new(bot, "12345".into());
300        assert_eq!(ch.channel_type(), "telegram-native");
301    }
302
303    #[test]
304    fn discord_channel_type() {
305        let bot = DiscordBot::new("test-token".into(), vec![42], "67890".into());
306        let ch = DiscordChannel::new(bot, "67890".into());
307        assert_eq!(ch.channel_type(), "discord");
308    }
309
310    #[test]
311    fn channel_from_config_telegram() {
312        let config = ChannelConfig {
313            target: "12345".into(),
314            provider: "openclaw".into(),
315            bot_token: None,
316            allowed_user_ids: vec![],
317            events_channel_id: None,
318            agents_channel_id: None,
319            commands_channel_id: None,
320            board_channel_id: None,
321        };
322        // Without bot_token (and assuming env var is not set), falls back to CLI channel.
323        if std::env::var("BATTY_TELEGRAM_BOT_TOKEN").is_err() {
324            let ch = channel_from_config("telegram", &config).unwrap();
325            assert_eq!(ch.channel_type(), "telegram");
326        }
327    }
328
329    #[test]
330    fn channel_from_config_telegram_with_bot_token() {
331        let config = ChannelConfig {
332            target: "12345".into(),
333            provider: "openclaw".into(),
334            bot_token: Some("test-bot-token".into()),
335            allowed_user_ids: vec![],
336            events_channel_id: None,
337            agents_channel_id: None,
338            commands_channel_id: None,
339            board_channel_id: None,
340        };
341        let ch = channel_from_config("telegram", &config).unwrap();
342        assert_eq!(ch.channel_type(), "telegram-native");
343    }
344
345    #[test]
346    fn channel_from_config_telegram_without_bot_token() {
347        let config = ChannelConfig {
348            target: "12345".into(),
349            provider: "openclaw".into(),
350            bot_token: None,
351            allowed_user_ids: vec![],
352            events_channel_id: None,
353            agents_channel_id: None,
354            commands_channel_id: None,
355            board_channel_id: None,
356        };
357        // Only assert CLI fallback when the env var is also absent.
358        if std::env::var("BATTY_TELEGRAM_BOT_TOKEN").is_err() {
359            let ch = channel_from_config("telegram", &config).unwrap();
360            assert_eq!(ch.channel_type(), "telegram");
361        }
362    }
363
364    #[test]
365    fn channel_from_config_unknown_type() {
366        let config = ChannelConfig {
367            target: "x".into(),
368            provider: "x".into(),
369            bot_token: None,
370            allowed_user_ids: vec![],
371            events_channel_id: None,
372            agents_channel_id: None,
373            commands_channel_id: None,
374            board_channel_id: None,
375        };
376        match channel_from_config("slack", &config) {
377            Err(e) => assert!(e.to_string().contains("unsupported")),
378            Ok(_) => panic!("expected error for unsupported channel"),
379        }
380    }
381
382    #[test]
383    fn channel_from_config_discord() {
384        let config = ChannelConfig {
385            target: String::new(),
386            provider: String::new(),
387            bot_token: Some("discord-token".into()),
388            allowed_user_ids: vec![42],
389            events_channel_id: Some("100".into()),
390            agents_channel_id: Some("200".into()),
391            commands_channel_id: Some("300".into()),
392            board_channel_id: None,
393        };
394        let ch = channel_from_config("discord", &config).unwrap();
395        assert_eq!(ch.channel_type(), "discord");
396    }
397
398    #[test]
399    fn telegram_send_fails_gracefully_with_missing_provider() {
400        let ch = TelegramChannel::new("12345".into(), "/nonexistent/binary".into());
401        let result = ch.send("hello");
402        assert!(result.is_err());
403        assert!(
404            result
405                .unwrap_err()
406                .to_string()
407                .contains("failed to execute")
408        );
409    }
410
411    #[test]
412    fn telegram_message_id_changes_with_target_and_body() {
413        let first = telegram_message_id("12345", "hello");
414        let second = telegram_message_id("12345", "hello again");
415        let third = telegram_message_id("67890", "hello");
416        assert_ne!(first, second);
417        assert_ne!(first, third);
418    }
419
420    #[test]
421    fn telegram_recent_sends_respects_ttl() {
422        let cache = RecentTelegramSends::new(Duration::from_millis(50), 16);
423        let id = telegram_message_id("12345", "hello");
424        assert!(!cache.contains_recent(id));
425        cache.record(id);
426        assert!(cache.contains_recent(id));
427        std::thread::sleep(Duration::from_millis(100));
428        assert!(!cache.contains_recent(id));
429    }
430
431    #[test]
432    fn telegram_channel_suppresses_duplicate_messages() {
433        let tmp = tempfile::tempdir().unwrap();
434        let log_path = tmp.path().join("provider.log");
435        let script_path = tmp.path().join("fake-provider.sh");
436        fs::write(
437            &script_path,
438            format!(
439                "#!/bin/sh\nprintf '%s\\n' \"$*\" >> \"{}\"\n",
440                log_path.display()
441            ),
442        )
443        .unwrap();
444        let mut perms = fs::metadata(&script_path).unwrap().permissions();
445        #[cfg(unix)]
446        {
447            use std::os::unix::fs::PermissionsExt;
448            perms.set_mode(0o755);
449        }
450        fs::set_permissions(&script_path, perms).unwrap();
451
452        let ch = TelegramChannel::with_dedup_settings(
453            "12345".into(),
454            script_path.display().to_string(),
455            Duration::from_secs(60),
456            16,
457        );
458        ch.send("hello").unwrap();
459        ch.send("hello").unwrap();
460
461        let lines = fs::read_to_string(&log_path).unwrap();
462        assert_eq!(lines.lines().count(), 1);
463    }
464
465    #[test]
466    fn telegram_channel_allows_unique_messages_and_retries_after_ttl() {
467        let tmp = tempfile::tempdir().unwrap();
468        let log_path = tmp.path().join("provider.log");
469        let script_path = tmp.path().join("fake-provider.sh");
470        fs::write(
471            &script_path,
472            format!(
473                "#!/bin/sh\nprintf '%s\\n' \"$*\" >> \"{}\"\n",
474                log_path.display()
475            ),
476        )
477        .unwrap();
478        let mut perms = fs::metadata(&script_path).unwrap().permissions();
479        #[cfg(unix)]
480        {
481            use std::os::unix::fs::PermissionsExt;
482            perms.set_mode(0o755);
483        }
484        fs::set_permissions(&script_path, perms).unwrap();
485
486        let ch = TelegramChannel::with_dedup_settings(
487            "12345".into(),
488            script_path.display().to_string(),
489            Duration::from_millis(5),
490            16,
491        );
492        ch.send("first").unwrap();
493        ch.send("second").unwrap();
494        std::thread::sleep(Duration::from_millis(10));
495        ch.send("first").unwrap();
496
497        let lines = fs::read_to_string(&log_path).unwrap();
498        assert_eq!(lines.lines().count(), 3);
499    }
500}