1use std::collections::VecDeque;
8use std::hash::{Hash, Hasher};
9use std::sync::Mutex;
10use std::time::{Duration, Instant};
11
12use tracing::{debug, warn};
13
14use super::config::ChannelConfig;
15use super::errors::DeliveryError;
16use super::telegram::TelegramBot;
17
18const TELEGRAM_DEDUP_TTL: Duration = Duration::from_secs(300);
19const TELEGRAM_DEDUP_CAPACITY: usize = 512;
20
21#[derive(Debug)]
22struct RecentTelegramSends {
23 ttl: Duration,
24 capacity: usize,
25 entries: Mutex<VecDeque<(u64, Instant)>>,
26}
27
28impl RecentTelegramSends {
29 fn new(ttl: Duration, capacity: usize) -> Self {
30 Self {
31 ttl,
32 capacity,
33 entries: Mutex::new(VecDeque::new()),
34 }
35 }
36
37 fn prune_expired(&self, now: Instant, entries: &mut VecDeque<(u64, Instant)>) {
38 while entries
39 .front()
40 .is_some_and(|(_, sent_at)| now.duration_since(*sent_at) > self.ttl)
41 {
42 entries.pop_front();
43 }
44 while entries.len() > self.capacity {
45 entries.pop_front();
46 }
47 }
48
49 fn contains_recent(&self, message_id: u64) -> bool {
50 let now = Instant::now();
51 let mut entries = self.entries.lock().unwrap();
52 self.prune_expired(now, &mut entries);
53 entries.iter().any(|(id, _)| *id == message_id)
54 }
55
56 fn record(&self, message_id: u64) {
57 let now = Instant::now();
58 let mut entries = self.entries.lock().unwrap();
59 self.prune_expired(now, &mut entries);
60 entries.push_back((message_id, now));
61 self.prune_expired(now, &mut entries);
62 }
63}
64
65fn telegram_message_id(target: &str, message: &str) -> u64 {
66 let mut hasher = std::collections::hash_map::DefaultHasher::new();
67 target.hash(&mut hasher);
68 message.hash(&mut hasher);
69 hasher.finish()
70}
71
72pub trait Channel: Send + Sync {
74 fn send(&self, message: &str) -> std::result::Result<(), DeliveryError>;
76 #[allow(dead_code)] fn channel_type(&self) -> &str;
79}
80
81pub struct TelegramChannel {
83 target: String,
84 provider: String,
85 recent_sends: RecentTelegramSends,
86}
87
88impl TelegramChannel {
89 pub fn new(target: String, provider: String) -> Self {
90 Self::with_dedup_settings(
91 target,
92 provider,
93 TELEGRAM_DEDUP_TTL,
94 TELEGRAM_DEDUP_CAPACITY,
95 )
96 }
97
98 pub fn from_config(config: &ChannelConfig) -> Self {
99 Self::new(config.target.clone(), config.provider.clone())
100 }
101
102 fn with_dedup_settings(
103 target: String,
104 provider: String,
105 ttl: Duration,
106 capacity: usize,
107 ) -> Self {
108 Self {
109 target,
110 provider,
111 recent_sends: RecentTelegramSends::new(ttl, capacity),
112 }
113 }
114}
115
116impl Channel for TelegramChannel {
117 fn send(&self, message: &str) -> std::result::Result<(), DeliveryError> {
118 let message_id = telegram_message_id(&self.target, message);
119 if self.recent_sends.contains_recent(message_id) {
120 debug!(target = %self.target, message_id, "suppressing duplicate telegram message");
121 return Ok(());
122 }
123
124 debug!(target = %self.target, provider = %self.provider, len = message.len(), "sending via telegram channel");
125
126 let output = std::process::Command::new(&self.provider)
127 .args([
128 "message",
129 "send",
130 "--to",
131 &self.target,
132 "--message",
133 message,
134 ])
135 .output();
136
137 match output {
138 Ok(out) if out.status.success() => {
139 self.recent_sends.record(message_id);
140 debug!("telegram message sent successfully");
141 Ok(())
142 }
143 Ok(out) => {
144 let stderr = String::from_utf8_lossy(&out.stderr);
145 warn!(status = ?out.status, stderr = %stderr, "telegram send failed");
146 Err(DeliveryError::ChannelSend {
147 recipient: self.target.clone(),
148 detail: stderr.to_string(),
149 })
150 }
151 Err(e) => {
152 warn!(error = %e, provider = %self.provider, "failed to execute channel provider");
153 Err(DeliveryError::ProviderExec {
154 provider: self.provider.clone(),
155 source: e,
156 })
157 }
158 }
159 }
160
161 fn channel_type(&self) -> &str {
162 "telegram"
163 }
164}
165
166pub struct NativeTelegramChannel {
168 bot: TelegramBot,
169 target: String,
170 recent_sends: RecentTelegramSends,
171}
172
173impl NativeTelegramChannel {
174 pub fn new(bot: TelegramBot, target: String) -> Self {
175 Self::with_dedup_settings(target, bot, TELEGRAM_DEDUP_TTL, TELEGRAM_DEDUP_CAPACITY)
176 }
177
178 pub fn from_config(config: &ChannelConfig) -> Option<Self> {
180 TelegramBot::from_config(config).map(|bot| Self::new(bot, config.target.clone()))
181 }
182
183 fn with_dedup_settings(
184 target: String,
185 bot: TelegramBot,
186 ttl: Duration,
187 capacity: usize,
188 ) -> Self {
189 Self {
190 bot,
191 target,
192 recent_sends: RecentTelegramSends::new(ttl, capacity),
193 }
194 }
195}
196
197impl Channel for NativeTelegramChannel {
198 fn send(&self, message: &str) -> std::result::Result<(), DeliveryError> {
199 let message_id = telegram_message_id(&self.target, message);
200 if self.recent_sends.contains_recent(message_id) {
201 debug!(
202 target = %self.target,
203 message_id,
204 "suppressing duplicate native telegram message"
205 );
206 return Ok(());
207 }
208
209 debug!(target = %self.target, len = message.len(), "sending via native telegram channel");
210 self.bot
211 .send_message(&self.target, message)
212 .map(|_| {
213 self.recent_sends.record(message_id);
214 })
215 .map_err(|error| DeliveryError::ChannelSend {
216 recipient: self.target.clone(),
217 detail: error.to_string(),
218 })
219 }
220
221 fn channel_type(&self) -> &str {
222 "telegram-native"
223 }
224}
225
226pub fn channel_from_config(
228 channel_type: &str,
229 config: &ChannelConfig,
230) -> std::result::Result<Box<dyn Channel>, DeliveryError> {
231 match channel_type {
232 "telegram" => {
233 if let Some(native) = NativeTelegramChannel::from_config(config) {
234 Ok(Box::new(native))
235 } else {
236 Ok(Box::new(TelegramChannel::from_config(config)))
237 }
238 }
239 other => Err(DeliveryError::UnsupportedChannel {
240 channel_type: other.to_string(),
241 }),
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248 use std::fs;
249
250 #[test]
251 fn telegram_channel_type() {
252 let ch = TelegramChannel::new("12345".into(), "openclaw".into());
253 assert_eq!(ch.channel_type(), "telegram");
254 }
255
256 #[test]
257 fn native_telegram_channel_type() {
258 let bot = TelegramBot::new("test-token".into(), vec![]);
259 let ch = NativeTelegramChannel::new(bot, "12345".into());
260 assert_eq!(ch.channel_type(), "telegram-native");
261 }
262
263 #[test]
264 fn channel_from_config_telegram() {
265 let config = ChannelConfig {
266 target: "12345".into(),
267 provider: "openclaw".into(),
268 bot_token: None,
269 allowed_user_ids: vec![],
270 };
271 if std::env::var("BATTY_TELEGRAM_BOT_TOKEN").is_err() {
273 let ch = channel_from_config("telegram", &config).unwrap();
274 assert_eq!(ch.channel_type(), "telegram");
275 }
276 }
277
278 #[test]
279 fn channel_from_config_telegram_with_bot_token() {
280 let config = ChannelConfig {
281 target: "12345".into(),
282 provider: "openclaw".into(),
283 bot_token: Some("test-bot-token".into()),
284 allowed_user_ids: vec![],
285 };
286 let ch = channel_from_config("telegram", &config).unwrap();
287 assert_eq!(ch.channel_type(), "telegram-native");
288 }
289
290 #[test]
291 fn channel_from_config_telegram_without_bot_token() {
292 let config = ChannelConfig {
293 target: "12345".into(),
294 provider: "openclaw".into(),
295 bot_token: None,
296 allowed_user_ids: vec![],
297 };
298 if std::env::var("BATTY_TELEGRAM_BOT_TOKEN").is_err() {
300 let ch = channel_from_config("telegram", &config).unwrap();
301 assert_eq!(ch.channel_type(), "telegram");
302 }
303 }
304
305 #[test]
306 fn channel_from_config_unknown_type() {
307 let config = ChannelConfig {
308 target: "x".into(),
309 provider: "x".into(),
310 bot_token: None,
311 allowed_user_ids: vec![],
312 };
313 match channel_from_config("slack", &config) {
314 Err(e) => assert!(e.to_string().contains("unsupported")),
315 Ok(_) => panic!("expected error for unsupported channel"),
316 }
317 }
318
319 #[test]
320 fn telegram_send_fails_gracefully_with_missing_provider() {
321 let ch = TelegramChannel::new("12345".into(), "/nonexistent/binary".into());
322 let result = ch.send("hello");
323 assert!(result.is_err());
324 assert!(
325 result
326 .unwrap_err()
327 .to_string()
328 .contains("failed to execute")
329 );
330 }
331
332 #[test]
333 fn telegram_message_id_changes_with_target_and_body() {
334 let first = telegram_message_id("12345", "hello");
335 let second = telegram_message_id("12345", "hello again");
336 let third = telegram_message_id("67890", "hello");
337 assert_ne!(first, second);
338 assert_ne!(first, third);
339 }
340
341 #[test]
342 fn telegram_recent_sends_respects_ttl() {
343 let cache = RecentTelegramSends::new(Duration::from_millis(50), 16);
344 let id = telegram_message_id("12345", "hello");
345 assert!(!cache.contains_recent(id));
346 cache.record(id);
347 assert!(cache.contains_recent(id));
348 std::thread::sleep(Duration::from_millis(100));
349 assert!(!cache.contains_recent(id));
350 }
351
352 #[test]
353 fn telegram_channel_suppresses_duplicate_messages() {
354 let tmp = tempfile::tempdir().unwrap();
355 let log_path = tmp.path().join("provider.log");
356 let script_path = tmp.path().join("fake-provider.sh");
357 fs::write(
358 &script_path,
359 format!(
360 "#!/bin/sh\nprintf '%s\\n' \"$*\" >> \"{}\"\n",
361 log_path.display()
362 ),
363 )
364 .unwrap();
365 let mut perms = fs::metadata(&script_path).unwrap().permissions();
366 #[cfg(unix)]
367 {
368 use std::os::unix::fs::PermissionsExt;
369 perms.set_mode(0o755);
370 }
371 fs::set_permissions(&script_path, perms).unwrap();
372
373 let ch = TelegramChannel::with_dedup_settings(
374 "12345".into(),
375 script_path.display().to_string(),
376 Duration::from_secs(60),
377 16,
378 );
379 ch.send("hello").unwrap();
380 ch.send("hello").unwrap();
381
382 let lines = fs::read_to_string(&log_path).unwrap();
383 assert_eq!(lines.lines().count(), 1);
384 }
385
386 #[test]
387 fn telegram_channel_allows_unique_messages_and_retries_after_ttl() {
388 let tmp = tempfile::tempdir().unwrap();
389 let log_path = tmp.path().join("provider.log");
390 let script_path = tmp.path().join("fake-provider.sh");
391 fs::write(
392 &script_path,
393 format!(
394 "#!/bin/sh\nprintf '%s\\n' \"$*\" >> \"{}\"\n",
395 log_path.display()
396 ),
397 )
398 .unwrap();
399 let mut perms = fs::metadata(&script_path).unwrap().permissions();
400 #[cfg(unix)]
401 {
402 use std::os::unix::fs::PermissionsExt;
403 perms.set_mode(0o755);
404 }
405 fs::set_permissions(&script_path, perms).unwrap();
406
407 let ch = TelegramChannel::with_dedup_settings(
408 "12345".into(),
409 script_path.display().to_string(),
410 Duration::from_millis(5),
411 16,
412 );
413 ch.send("first").unwrap();
414 ch.send("second").unwrap();
415 std::thread::sleep(Duration::from_millis(10));
416 ch.send("first").unwrap();
417
418 let lines = fs::read_to_string(&log_path).unwrap();
419 assert_eq!(lines.lines().count(), 3);
420 }
421}