1use std::collections::VecDeque;
8use std::hash::{Hash, Hasher};
9use std::sync::Mutex;
10use std::time::{Duration, Instant};
11
12use tracing::{debug, warn};
13
14use super::config::ChannelConfig;
15use super::discord::DiscordBot;
16use super::errors::DeliveryError;
17use super::telegram::TelegramBot;
18
19const TELEGRAM_DEDUP_TTL: Duration = Duration::from_secs(300);
20const TELEGRAM_DEDUP_CAPACITY: usize = 512;
21
22#[derive(Debug)]
23struct RecentTelegramSends {
24 ttl: Duration,
25 capacity: usize,
26 entries: Mutex<VecDeque<(u64, Instant)>>,
27}
28
29impl RecentTelegramSends {
30 fn new(ttl: Duration, capacity: usize) -> Self {
31 Self {
32 ttl,
33 capacity,
34 entries: Mutex::new(VecDeque::new()),
35 }
36 }
37
38 fn prune_expired(&self, now: Instant, entries: &mut VecDeque<(u64, Instant)>) {
39 while entries
40 .front()
41 .is_some_and(|(_, sent_at)| now.duration_since(*sent_at) > self.ttl)
42 {
43 entries.pop_front();
44 }
45 while entries.len() > self.capacity {
46 entries.pop_front();
47 }
48 }
49
50 fn contains_recent(&self, message_id: u64) -> bool {
51 let now = Instant::now();
52 let mut entries = self.entries.lock().unwrap();
53 self.prune_expired(now, &mut entries);
54 entries.iter().any(|(id, _)| *id == message_id)
55 }
56
57 fn record(&self, message_id: u64) {
58 let now = Instant::now();
59 let mut entries = self.entries.lock().unwrap();
60 self.prune_expired(now, &mut entries);
61 entries.push_back((message_id, now));
62 self.prune_expired(now, &mut entries);
63 }
64}
65
66fn telegram_message_id(target: &str, message: &str) -> u64 {
67 let mut hasher = std::collections::hash_map::DefaultHasher::new();
68 target.hash(&mut hasher);
69 message.hash(&mut hasher);
70 hasher.finish()
71}
72
73pub trait Channel: Send + Sync {
75 fn send(&self, message: &str) -> std::result::Result<(), DeliveryError>;
77 fn channel_type(&self) -> &str;
79}
80
81pub struct TelegramChannel {
83 target: String,
84 provider: String,
85 recent_sends: RecentTelegramSends,
86}
87
88impl TelegramChannel {
89 pub fn new(target: String, provider: String) -> Self {
90 Self::with_dedup_settings(
91 target,
92 provider,
93 TELEGRAM_DEDUP_TTL,
94 TELEGRAM_DEDUP_CAPACITY,
95 )
96 }
97
98 pub fn from_config(config: &ChannelConfig) -> Self {
99 Self::new(config.target.clone(), config.provider.clone())
100 }
101
102 fn with_dedup_settings(
103 target: String,
104 provider: String,
105 ttl: Duration,
106 capacity: usize,
107 ) -> Self {
108 Self {
109 target,
110 provider,
111 recent_sends: RecentTelegramSends::new(ttl, capacity),
112 }
113 }
114}
115
116impl Channel for TelegramChannel {
117 fn send(&self, message: &str) -> std::result::Result<(), DeliveryError> {
118 let message_id = telegram_message_id(&self.target, message);
119 if self.recent_sends.contains_recent(message_id) {
120 debug!(target = %self.target, message_id, "suppressing duplicate telegram message");
121 return Ok(());
122 }
123
124 debug!(target = %self.target, provider = %self.provider, len = message.len(), "sending via telegram channel");
125
126 let output = std::process::Command::new(&self.provider)
127 .args([
128 "message",
129 "send",
130 "--to",
131 &self.target,
132 "--message",
133 message,
134 ])
135 .output();
136
137 match output {
138 Ok(out) if out.status.success() => {
139 self.recent_sends.record(message_id);
140 debug!("telegram message sent successfully");
141 Ok(())
142 }
143 Ok(out) => {
144 let stderr = String::from_utf8_lossy(&out.stderr);
145 warn!(status = ?out.status, stderr = %stderr, "telegram send failed");
146 Err(DeliveryError::ChannelSend {
147 recipient: self.target.clone(),
148 detail: stderr.to_string(),
149 })
150 }
151 Err(e) => {
152 warn!(error = %e, provider = %self.provider, "failed to execute channel provider");
153 Err(DeliveryError::ProviderExec {
154 provider: self.provider.clone(),
155 source: e,
156 })
157 }
158 }
159 }
160
161 fn channel_type(&self) -> &str {
162 "telegram"
163 }
164}
165
166pub struct NativeTelegramChannel {
168 bot: TelegramBot,
169 target: String,
170 recent_sends: RecentTelegramSends,
171}
172
173impl NativeTelegramChannel {
174 pub fn new(bot: TelegramBot, target: String) -> Self {
175 Self::with_dedup_settings(target, bot, TELEGRAM_DEDUP_TTL, TELEGRAM_DEDUP_CAPACITY)
176 }
177
178 pub fn from_config(config: &ChannelConfig) -> Option<Self> {
180 TelegramBot::from_config(config).map(|bot| Self::new(bot, config.target.clone()))
181 }
182
183 fn with_dedup_settings(
184 target: String,
185 bot: TelegramBot,
186 ttl: Duration,
187 capacity: usize,
188 ) -> Self {
189 Self {
190 bot,
191 target,
192 recent_sends: RecentTelegramSends::new(ttl, capacity),
193 }
194 }
195}
196
197impl Channel for NativeTelegramChannel {
198 fn send(&self, message: &str) -> std::result::Result<(), DeliveryError> {
199 let message_id = telegram_message_id(&self.target, message);
200 if self.recent_sends.contains_recent(message_id) {
201 debug!(
202 target = %self.target,
203 message_id,
204 "suppressing duplicate native telegram message"
205 );
206 return Ok(());
207 }
208
209 debug!(target = %self.target, len = message.len(), "sending via native telegram channel");
210 self.bot
211 .send_message(&self.target, message)
212 .map(|_| {
213 self.recent_sends.record(message_id);
214 })
215 .map_err(|error| DeliveryError::ChannelSend {
216 recipient: self.target.clone(),
217 detail: error.to_string(),
218 })
219 }
220
221 fn channel_type(&self) -> &str {
222 "telegram-native"
223 }
224}
225
226pub struct DiscordChannel {
228 bot: DiscordBot,
229 channel_id: String,
230}
231
232impl DiscordChannel {
233 pub fn new(bot: DiscordBot, channel_id: String) -> Self {
234 Self { bot, channel_id }
235 }
236
237 pub fn from_config(config: &ChannelConfig) -> Option<Self> {
238 let channel_id = config
239 .commands_channel_id
240 .clone()
241 .or_else(|| config.events_channel_id.clone())?;
242 DiscordBot::from_config(config).map(|bot| Self::new(bot, channel_id))
243 }
244}
245
246impl Channel for DiscordChannel {
247 fn send(&self, message: &str) -> std::result::Result<(), DeliveryError> {
248 self.bot
249 .send_formatted_message(&self.channel_id, message)
250 .map_err(|error| DeliveryError::ChannelSend {
251 recipient: self.channel_id.clone(),
252 detail: error.to_string(),
253 })
254 }
255
256 fn channel_type(&self) -> &str {
257 "discord"
258 }
259}
260
261pub fn channel_from_config(
263 channel_type: &str,
264 config: &ChannelConfig,
265) -> std::result::Result<Box<dyn Channel>, DeliveryError> {
266 match channel_type {
267 "telegram" => {
268 if let Some(native) = NativeTelegramChannel::from_config(config) {
269 Ok(Box::new(native))
270 } else {
271 Ok(Box::new(TelegramChannel::from_config(config)))
272 }
273 }
274 "discord" => DiscordChannel::from_config(config)
275 .map(|channel| Box::new(channel) as Box<dyn Channel>)
276 .ok_or_else(|| DeliveryError::UnsupportedChannel {
277 channel_type: "discord".to_string(),
278 }),
279 other => Err(DeliveryError::UnsupportedChannel {
280 channel_type: other.to_string(),
281 }),
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288 use std::fs;
289
290 #[test]
291 fn telegram_channel_type() {
292 let ch = TelegramChannel::new("12345".into(), "openclaw".into());
293 assert_eq!(ch.channel_type(), "telegram");
294 }
295
296 #[test]
297 fn native_telegram_channel_type() {
298 let bot = TelegramBot::new("test-token".into(), vec![]);
299 let ch = NativeTelegramChannel::new(bot, "12345".into());
300 assert_eq!(ch.channel_type(), "telegram-native");
301 }
302
303 #[test]
304 fn discord_channel_type() {
305 let bot = DiscordBot::new("test-token".into(), vec![42], "67890".into());
306 let ch = DiscordChannel::new(bot, "67890".into());
307 assert_eq!(ch.channel_type(), "discord");
308 }
309
310 #[test]
311 fn channel_from_config_telegram() {
312 let config = ChannelConfig {
313 target: "12345".into(),
314 provider: "openclaw".into(),
315 bot_token: None,
316 allowed_user_ids: vec![],
317 events_channel_id: None,
318 agents_channel_id: None,
319 commands_channel_id: None,
320 board_channel_id: None,
321 };
322 if std::env::var("BATTY_TELEGRAM_BOT_TOKEN").is_err() {
324 let ch = channel_from_config("telegram", &config).unwrap();
325 assert_eq!(ch.channel_type(), "telegram");
326 }
327 }
328
329 #[test]
330 fn channel_from_config_telegram_with_bot_token() {
331 let config = ChannelConfig {
332 target: "12345".into(),
333 provider: "openclaw".into(),
334 bot_token: Some("test-bot-token".into()),
335 allowed_user_ids: vec![],
336 events_channel_id: None,
337 agents_channel_id: None,
338 commands_channel_id: None,
339 board_channel_id: None,
340 };
341 let ch = channel_from_config("telegram", &config).unwrap();
342 assert_eq!(ch.channel_type(), "telegram-native");
343 }
344
345 #[test]
346 fn channel_from_config_telegram_without_bot_token() {
347 let config = ChannelConfig {
348 target: "12345".into(),
349 provider: "openclaw".into(),
350 bot_token: None,
351 allowed_user_ids: vec![],
352 events_channel_id: None,
353 agents_channel_id: None,
354 commands_channel_id: None,
355 board_channel_id: None,
356 };
357 if std::env::var("BATTY_TELEGRAM_BOT_TOKEN").is_err() {
359 let ch = channel_from_config("telegram", &config).unwrap();
360 assert_eq!(ch.channel_type(), "telegram");
361 }
362 }
363
364 #[test]
365 fn channel_from_config_unknown_type() {
366 let config = ChannelConfig {
367 target: "x".into(),
368 provider: "x".into(),
369 bot_token: None,
370 allowed_user_ids: vec![],
371 events_channel_id: None,
372 agents_channel_id: None,
373 commands_channel_id: None,
374 board_channel_id: None,
375 };
376 match channel_from_config("slack", &config) {
377 Err(e) => assert!(e.to_string().contains("unsupported")),
378 Ok(_) => panic!("expected error for unsupported channel"),
379 }
380 }
381
382 #[test]
383 fn channel_from_config_discord() {
384 let config = ChannelConfig {
385 target: String::new(),
386 provider: String::new(),
387 bot_token: Some("discord-token".into()),
388 allowed_user_ids: vec![42],
389 events_channel_id: Some("100".into()),
390 agents_channel_id: Some("200".into()),
391 commands_channel_id: Some("300".into()),
392 board_channel_id: None,
393 };
394 let ch = channel_from_config("discord", &config).unwrap();
395 assert_eq!(ch.channel_type(), "discord");
396 }
397
398 #[test]
399 fn telegram_send_fails_gracefully_with_missing_provider() {
400 let ch = TelegramChannel::new("12345".into(), "/nonexistent/binary".into());
401 let result = ch.send("hello");
402 assert!(result.is_err());
403 assert!(
404 result
405 .unwrap_err()
406 .to_string()
407 .contains("failed to execute")
408 );
409 }
410
411 #[test]
412 fn telegram_message_id_changes_with_target_and_body() {
413 let first = telegram_message_id("12345", "hello");
414 let second = telegram_message_id("12345", "hello again");
415 let third = telegram_message_id("67890", "hello");
416 assert_ne!(first, second);
417 assert_ne!(first, third);
418 }
419
420 #[test]
421 fn telegram_recent_sends_respects_ttl() {
422 let cache = RecentTelegramSends::new(Duration::from_millis(50), 16);
423 let id = telegram_message_id("12345", "hello");
424 assert!(!cache.contains_recent(id));
425 cache.record(id);
426 assert!(cache.contains_recent(id));
427 std::thread::sleep(Duration::from_millis(100));
428 assert!(!cache.contains_recent(id));
429 }
430
431 #[test]
432 fn telegram_channel_suppresses_duplicate_messages() {
433 let tmp = tempfile::tempdir().unwrap();
434 let log_path = tmp.path().join("provider.log");
435 let script_path = tmp.path().join("fake-provider.sh");
436 fs::write(
437 &script_path,
438 format!(
439 "#!/bin/sh\nprintf '%s\\n' \"$*\" >> \"{}\"\n",
440 log_path.display()
441 ),
442 )
443 .unwrap();
444 let mut perms = fs::metadata(&script_path).unwrap().permissions();
445 #[cfg(unix)]
446 {
447 use std::os::unix::fs::PermissionsExt;
448 perms.set_mode(0o755);
449 }
450 fs::set_permissions(&script_path, perms).unwrap();
451
452 let ch = TelegramChannel::with_dedup_settings(
453 "12345".into(),
454 script_path.display().to_string(),
455 Duration::from_secs(60),
456 16,
457 );
458 ch.send("hello").unwrap();
459 ch.send("hello").unwrap();
460
461 let lines = fs::read_to_string(&log_path).unwrap();
462 assert_eq!(lines.lines().count(), 1);
463 }
464
465 #[test]
466 fn telegram_channel_allows_unique_messages_and_retries_after_ttl() {
467 let tmp = tempfile::tempdir().unwrap();
468 let log_path = tmp.path().join("provider.log");
469 let script_path = tmp.path().join("fake-provider.sh");
470 fs::write(
471 &script_path,
472 format!(
473 "#!/bin/sh\nprintf '%s\\n' \"$*\" >> \"{}\"\n",
474 log_path.display()
475 ),
476 )
477 .unwrap();
478 let mut perms = fs::metadata(&script_path).unwrap().permissions();
479 #[cfg(unix)]
480 {
481 use std::os::unix::fs::PermissionsExt;
482 perms.set_mode(0o755);
483 }
484 fs::set_permissions(&script_path, perms).unwrap();
485
486 let ch = TelegramChannel::with_dedup_settings(
487 "12345".into(),
488 script_path.display().to_string(),
489 Duration::from_millis(5),
490 16,
491 );
492 ch.send("first").unwrap();
493 ch.send("second").unwrap();
494 std::thread::sleep(Duration::from_millis(10));
495 ch.send("first").unwrap();
496
497 let lines = fs::read_to_string(&log_path).unwrap();
498 assert_eq!(lines.lines().count(), 3);
499 }
500}