Skip to main content

batty_cli/team/
comms.rs

1//! External communication channels for user roles.
2//!
3//! The `user` role type communicates via channels (Telegram, Slack, etc.)
4//! instead of tmux panes. Each channel provider is a CLI tool that the
5//! daemon invokes for outbound messages.
6
7use std::collections::VecDeque;
8use std::hash::{Hash, Hasher};
9use std::sync::Mutex;
10use std::time::{Duration, Instant};
11
12use tracing::{debug, warn};
13
14use super::config::ChannelConfig;
15use super::errors::DeliveryError;
16use super::telegram::TelegramBot;
17
18const TELEGRAM_DEDUP_TTL: Duration = Duration::from_secs(300);
19const TELEGRAM_DEDUP_CAPACITY: usize = 512;
20
21#[derive(Debug)]
22struct RecentTelegramSends {
23    ttl: Duration,
24    capacity: usize,
25    entries: Mutex<VecDeque<(u64, Instant)>>,
26}
27
28impl RecentTelegramSends {
29    fn new(ttl: Duration, capacity: usize) -> Self {
30        Self {
31            ttl,
32            capacity,
33            entries: Mutex::new(VecDeque::new()),
34        }
35    }
36
37    fn prune_expired(&self, now: Instant, entries: &mut VecDeque<(u64, Instant)>) {
38        while entries
39            .front()
40            .is_some_and(|(_, sent_at)| now.duration_since(*sent_at) > self.ttl)
41        {
42            entries.pop_front();
43        }
44        while entries.len() > self.capacity {
45            entries.pop_front();
46        }
47    }
48
49    fn contains_recent(&self, message_id: u64) -> bool {
50        let now = Instant::now();
51        let mut entries = self.entries.lock().unwrap();
52        self.prune_expired(now, &mut entries);
53        entries.iter().any(|(id, _)| *id == message_id)
54    }
55
56    fn record(&self, message_id: u64) {
57        let now = Instant::now();
58        let mut entries = self.entries.lock().unwrap();
59        self.prune_expired(now, &mut entries);
60        entries.push_back((message_id, now));
61        self.prune_expired(now, &mut entries);
62    }
63}
64
65fn telegram_message_id(target: &str, message: &str) -> u64 {
66    let mut hasher = std::collections::hash_map::DefaultHasher::new();
67    target.hash(&mut hasher);
68    message.hash(&mut hasher);
69    hasher.finish()
70}
71
72/// Trait for outbound message delivery to external channels.
73pub trait Channel: Send + Sync {
74    /// Send a text message to the channel destination.
75    fn send(&self, message: &str) -> std::result::Result<(), DeliveryError>;
76    /// Channel type identifier (e.g., "telegram").
77    #[allow(dead_code)] // Reserved for diagnostics and provider-specific routing.
78    fn channel_type(&self) -> &str;
79}
80
81/// Telegram channel via openclaw (or any CLI provider).
82pub struct TelegramChannel {
83    target: String,
84    provider: String,
85    recent_sends: RecentTelegramSends,
86}
87
88impl TelegramChannel {
89    pub fn new(target: String, provider: String) -> Self {
90        Self::with_dedup_settings(
91            target,
92            provider,
93            TELEGRAM_DEDUP_TTL,
94            TELEGRAM_DEDUP_CAPACITY,
95        )
96    }
97
98    pub fn from_config(config: &ChannelConfig) -> Self {
99        Self::new(config.target.clone(), config.provider.clone())
100    }
101
102    fn with_dedup_settings(
103        target: String,
104        provider: String,
105        ttl: Duration,
106        capacity: usize,
107    ) -> Self {
108        Self {
109            target,
110            provider,
111            recent_sends: RecentTelegramSends::new(ttl, capacity),
112        }
113    }
114}
115
116impl Channel for TelegramChannel {
117    fn send(&self, message: &str) -> std::result::Result<(), DeliveryError> {
118        let message_id = telegram_message_id(&self.target, message);
119        if self.recent_sends.contains_recent(message_id) {
120            debug!(target = %self.target, message_id, "suppressing duplicate telegram message");
121            return Ok(());
122        }
123
124        debug!(target = %self.target, provider = %self.provider, len = message.len(), "sending via telegram channel");
125
126        let output = std::process::Command::new(&self.provider)
127            .args([
128                "message",
129                "send",
130                "--to",
131                &self.target,
132                "--message",
133                message,
134            ])
135            .output();
136
137        match output {
138            Ok(out) if out.status.success() => {
139                self.recent_sends.record(message_id);
140                debug!("telegram message sent successfully");
141                Ok(())
142            }
143            Ok(out) => {
144                let stderr = String::from_utf8_lossy(&out.stderr);
145                warn!(status = ?out.status, stderr = %stderr, "telegram send failed");
146                Err(DeliveryError::ChannelSend {
147                    recipient: self.target.clone(),
148                    detail: stderr.to_string(),
149                })
150            }
151            Err(e) => {
152                warn!(error = %e, provider = %self.provider, "failed to execute channel provider");
153                Err(DeliveryError::ProviderExec {
154                    provider: self.provider.clone(),
155                    source: e,
156                })
157            }
158        }
159    }
160
161    fn channel_type(&self) -> &str {
162        "telegram"
163    }
164}
165
166/// Native Telegram channel using the Bot API directly (no CLI provider).
167pub struct NativeTelegramChannel {
168    bot: TelegramBot,
169    target: String,
170    recent_sends: RecentTelegramSends,
171}
172
173impl NativeTelegramChannel {
174    pub fn new(bot: TelegramBot, target: String) -> Self {
175        Self::with_dedup_settings(target, bot, TELEGRAM_DEDUP_TTL, TELEGRAM_DEDUP_CAPACITY)
176    }
177
178    /// Build from a `ChannelConfig`, returning `None` if no bot token is available.
179    pub fn from_config(config: &ChannelConfig) -> Option<Self> {
180        TelegramBot::from_config(config).map(|bot| Self::new(bot, config.target.clone()))
181    }
182
183    fn with_dedup_settings(
184        target: String,
185        bot: TelegramBot,
186        ttl: Duration,
187        capacity: usize,
188    ) -> Self {
189        Self {
190            bot,
191            target,
192            recent_sends: RecentTelegramSends::new(ttl, capacity),
193        }
194    }
195}
196
197impl Channel for NativeTelegramChannel {
198    fn send(&self, message: &str) -> std::result::Result<(), DeliveryError> {
199        let message_id = telegram_message_id(&self.target, message);
200        if self.recent_sends.contains_recent(message_id) {
201            debug!(
202                target = %self.target,
203                message_id,
204                "suppressing duplicate native telegram message"
205            );
206            return Ok(());
207        }
208
209        debug!(target = %self.target, len = message.len(), "sending via native telegram channel");
210        self.bot
211            .send_message(&self.target, message)
212            .map(|_| {
213                self.recent_sends.record(message_id);
214            })
215            .map_err(|error| DeliveryError::ChannelSend {
216                recipient: self.target.clone(),
217                detail: error.to_string(),
218            })
219    }
220
221    fn channel_type(&self) -> &str {
222        "telegram-native"
223    }
224}
225
226/// Create a channel from config fields.
227pub fn channel_from_config(
228    channel_type: &str,
229    config: &ChannelConfig,
230) -> std::result::Result<Box<dyn Channel>, DeliveryError> {
231    match channel_type {
232        "telegram" => {
233            if let Some(native) = NativeTelegramChannel::from_config(config) {
234                Ok(Box::new(native))
235            } else {
236                Ok(Box::new(TelegramChannel::from_config(config)))
237            }
238        }
239        other => Err(DeliveryError::UnsupportedChannel {
240            channel_type: other.to_string(),
241        }),
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248    use std::fs;
249
250    #[test]
251    fn telegram_channel_type() {
252        let ch = TelegramChannel::new("12345".into(), "openclaw".into());
253        assert_eq!(ch.channel_type(), "telegram");
254    }
255
256    #[test]
257    fn native_telegram_channel_type() {
258        let bot = TelegramBot::new("test-token".into(), vec![]);
259        let ch = NativeTelegramChannel::new(bot, "12345".into());
260        assert_eq!(ch.channel_type(), "telegram-native");
261    }
262
263    #[test]
264    fn channel_from_config_telegram() {
265        let config = ChannelConfig {
266            target: "12345".into(),
267            provider: "openclaw".into(),
268            bot_token: None,
269            allowed_user_ids: vec![],
270        };
271        // Without bot_token (and assuming env var is not set), falls back to CLI channel.
272        if std::env::var("BATTY_TELEGRAM_BOT_TOKEN").is_err() {
273            let ch = channel_from_config("telegram", &config).unwrap();
274            assert_eq!(ch.channel_type(), "telegram");
275        }
276    }
277
278    #[test]
279    fn channel_from_config_telegram_with_bot_token() {
280        let config = ChannelConfig {
281            target: "12345".into(),
282            provider: "openclaw".into(),
283            bot_token: Some("test-bot-token".into()),
284            allowed_user_ids: vec![],
285        };
286        let ch = channel_from_config("telegram", &config).unwrap();
287        assert_eq!(ch.channel_type(), "telegram-native");
288    }
289
290    #[test]
291    fn channel_from_config_telegram_without_bot_token() {
292        let config = ChannelConfig {
293            target: "12345".into(),
294            provider: "openclaw".into(),
295            bot_token: None,
296            allowed_user_ids: vec![],
297        };
298        // Only assert CLI fallback when the env var is also absent.
299        if std::env::var("BATTY_TELEGRAM_BOT_TOKEN").is_err() {
300            let ch = channel_from_config("telegram", &config).unwrap();
301            assert_eq!(ch.channel_type(), "telegram");
302        }
303    }
304
305    #[test]
306    fn channel_from_config_unknown_type() {
307        let config = ChannelConfig {
308            target: "x".into(),
309            provider: "x".into(),
310            bot_token: None,
311            allowed_user_ids: vec![],
312        };
313        match channel_from_config("slack", &config) {
314            Err(e) => assert!(e.to_string().contains("unsupported")),
315            Ok(_) => panic!("expected error for unsupported channel"),
316        }
317    }
318
319    #[test]
320    fn telegram_send_fails_gracefully_with_missing_provider() {
321        let ch = TelegramChannel::new("12345".into(), "/nonexistent/binary".into());
322        let result = ch.send("hello");
323        assert!(result.is_err());
324        assert!(
325            result
326                .unwrap_err()
327                .to_string()
328                .contains("failed to execute")
329        );
330    }
331
332    #[test]
333    fn telegram_message_id_changes_with_target_and_body() {
334        let first = telegram_message_id("12345", "hello");
335        let second = telegram_message_id("12345", "hello again");
336        let third = telegram_message_id("67890", "hello");
337        assert_ne!(first, second);
338        assert_ne!(first, third);
339    }
340
341    #[test]
342    fn telegram_recent_sends_respects_ttl() {
343        let cache = RecentTelegramSends::new(Duration::from_millis(50), 16);
344        let id = telegram_message_id("12345", "hello");
345        assert!(!cache.contains_recent(id));
346        cache.record(id);
347        assert!(cache.contains_recent(id));
348        std::thread::sleep(Duration::from_millis(100));
349        assert!(!cache.contains_recent(id));
350    }
351
352    #[test]
353    fn telegram_channel_suppresses_duplicate_messages() {
354        let tmp = tempfile::tempdir().unwrap();
355        let log_path = tmp.path().join("provider.log");
356        let script_path = tmp.path().join("fake-provider.sh");
357        fs::write(
358            &script_path,
359            format!(
360                "#!/bin/sh\nprintf '%s\\n' \"$*\" >> \"{}\"\n",
361                log_path.display()
362            ),
363        )
364        .unwrap();
365        let mut perms = fs::metadata(&script_path).unwrap().permissions();
366        #[cfg(unix)]
367        {
368            use std::os::unix::fs::PermissionsExt;
369            perms.set_mode(0o755);
370        }
371        fs::set_permissions(&script_path, perms).unwrap();
372
373        let ch = TelegramChannel::with_dedup_settings(
374            "12345".into(),
375            script_path.display().to_string(),
376            Duration::from_secs(60),
377            16,
378        );
379        ch.send("hello").unwrap();
380        ch.send("hello").unwrap();
381
382        let lines = fs::read_to_string(&log_path).unwrap();
383        assert_eq!(lines.lines().count(), 1);
384    }
385
386    #[test]
387    fn telegram_channel_allows_unique_messages_and_retries_after_ttl() {
388        let tmp = tempfile::tempdir().unwrap();
389        let log_path = tmp.path().join("provider.log");
390        let script_path = tmp.path().join("fake-provider.sh");
391        fs::write(
392            &script_path,
393            format!(
394                "#!/bin/sh\nprintf '%s\\n' \"$*\" >> \"{}\"\n",
395                log_path.display()
396            ),
397        )
398        .unwrap();
399        let mut perms = fs::metadata(&script_path).unwrap().permissions();
400        #[cfg(unix)]
401        {
402            use std::os::unix::fs::PermissionsExt;
403            perms.set_mode(0o755);
404        }
405        fs::set_permissions(&script_path, perms).unwrap();
406
407        let ch = TelegramChannel::with_dedup_settings(
408            "12345".into(),
409            script_path.display().to_string(),
410            Duration::from_millis(5),
411            16,
412        );
413        ch.send("first").unwrap();
414        ch.send("second").unwrap();
415        std::thread::sleep(Duration::from_millis(10));
416        ch.send("first").unwrap();
417
418        let lines = fs::read_to_string(&log_path).unwrap();
419        assert_eq!(lines.lines().count(), 3);
420    }
421}