1use std::collections::VecDeque;
8use std::hash::{Hash, Hasher};
9use std::sync::Mutex;
10use std::time::{Duration, Instant};
11
12use tracing::{debug, warn};
13
14use super::config::ChannelConfig;
15use super::errors::DeliveryError;
16use super::telegram::TelegramBot;
17
18const TELEGRAM_DEDUP_TTL: Duration = Duration::from_secs(300);
19const TELEGRAM_DEDUP_CAPACITY: usize = 512;
20
21#[derive(Debug)]
22struct RecentTelegramSends {
23 ttl: Duration,
24 capacity: usize,
25 entries: Mutex<VecDeque<(u64, Instant)>>,
26}
27
28impl RecentTelegramSends {
29 fn new(ttl: Duration, capacity: usize) -> Self {
30 Self {
31 ttl,
32 capacity,
33 entries: Mutex::new(VecDeque::new()),
34 }
35 }
36
37 fn prune_expired(&self, now: Instant, entries: &mut VecDeque<(u64, Instant)>) {
38 while entries
39 .front()
40 .is_some_and(|(_, sent_at)| now.duration_since(*sent_at) > self.ttl)
41 {
42 entries.pop_front();
43 }
44 while entries.len() > self.capacity {
45 entries.pop_front();
46 }
47 }
48
49 fn contains_recent(&self, message_id: u64) -> bool {
50 let now = Instant::now();
51 let mut entries = self.entries.lock().unwrap();
52 self.prune_expired(now, &mut entries);
53 entries.iter().any(|(id, _)| *id == message_id)
54 }
55
56 fn record(&self, message_id: u64) {
57 let now = Instant::now();
58 let mut entries = self.entries.lock().unwrap();
59 self.prune_expired(now, &mut entries);
60 entries.push_back((message_id, now));
61 self.prune_expired(now, &mut entries);
62 }
63}
64
65fn telegram_message_id(target: &str, message: &str) -> u64 {
66 let mut hasher = std::collections::hash_map::DefaultHasher::new();
67 target.hash(&mut hasher);
68 message.hash(&mut hasher);
69 hasher.finish()
70}
71
72pub trait Channel: Send + Sync {
74 fn send(&self, message: &str) -> std::result::Result<(), DeliveryError>;
76 fn channel_type(&self) -> &str;
78}
79
80pub struct TelegramChannel {
82 target: String,
83 provider: String,
84 recent_sends: RecentTelegramSends,
85}
86
87impl TelegramChannel {
88 pub fn new(target: String, provider: String) -> Self {
89 Self::with_dedup_settings(
90 target,
91 provider,
92 TELEGRAM_DEDUP_TTL,
93 TELEGRAM_DEDUP_CAPACITY,
94 )
95 }
96
97 pub fn from_config(config: &ChannelConfig) -> Self {
98 Self::new(config.target.clone(), config.provider.clone())
99 }
100
101 fn with_dedup_settings(
102 target: String,
103 provider: String,
104 ttl: Duration,
105 capacity: usize,
106 ) -> Self {
107 Self {
108 target,
109 provider,
110 recent_sends: RecentTelegramSends::new(ttl, capacity),
111 }
112 }
113}
114
115impl Channel for TelegramChannel {
116 fn send(&self, message: &str) -> std::result::Result<(), DeliveryError> {
117 let message_id = telegram_message_id(&self.target, message);
118 if self.recent_sends.contains_recent(message_id) {
119 debug!(target = %self.target, message_id, "suppressing duplicate telegram message");
120 return Ok(());
121 }
122
123 debug!(target = %self.target, provider = %self.provider, len = message.len(), "sending via telegram channel");
124
125 let output = std::process::Command::new(&self.provider)
126 .args([
127 "message",
128 "send",
129 "--to",
130 &self.target,
131 "--message",
132 message,
133 ])
134 .output();
135
136 match output {
137 Ok(out) if out.status.success() => {
138 self.recent_sends.record(message_id);
139 debug!("telegram message sent successfully");
140 Ok(())
141 }
142 Ok(out) => {
143 let stderr = String::from_utf8_lossy(&out.stderr);
144 warn!(status = ?out.status, stderr = %stderr, "telegram send failed");
145 Err(DeliveryError::ChannelSend {
146 recipient: self.target.clone(),
147 detail: stderr.to_string(),
148 })
149 }
150 Err(e) => {
151 warn!(error = %e, provider = %self.provider, "failed to execute channel provider");
152 Err(DeliveryError::ProviderExec {
153 provider: self.provider.clone(),
154 source: e,
155 })
156 }
157 }
158 }
159
160 fn channel_type(&self) -> &str {
161 "telegram"
162 }
163}
164
165pub struct NativeTelegramChannel {
167 bot: TelegramBot,
168 target: String,
169 recent_sends: RecentTelegramSends,
170}
171
172impl NativeTelegramChannel {
173 pub fn new(bot: TelegramBot, target: String) -> Self {
174 Self::with_dedup_settings(target, bot, TELEGRAM_DEDUP_TTL, TELEGRAM_DEDUP_CAPACITY)
175 }
176
177 pub fn from_config(config: &ChannelConfig) -> Option<Self> {
179 TelegramBot::from_config(config).map(|bot| Self::new(bot, config.target.clone()))
180 }
181
182 fn with_dedup_settings(
183 target: String,
184 bot: TelegramBot,
185 ttl: Duration,
186 capacity: usize,
187 ) -> Self {
188 Self {
189 bot,
190 target,
191 recent_sends: RecentTelegramSends::new(ttl, capacity),
192 }
193 }
194}
195
196impl Channel for NativeTelegramChannel {
197 fn send(&self, message: &str) -> std::result::Result<(), DeliveryError> {
198 let message_id = telegram_message_id(&self.target, message);
199 if self.recent_sends.contains_recent(message_id) {
200 debug!(
201 target = %self.target,
202 message_id,
203 "suppressing duplicate native telegram message"
204 );
205 return Ok(());
206 }
207
208 debug!(target = %self.target, len = message.len(), "sending via native telegram channel");
209 self.bot
210 .send_message(&self.target, message)
211 .map(|_| {
212 self.recent_sends.record(message_id);
213 })
214 .map_err(|error| DeliveryError::ChannelSend {
215 recipient: self.target.clone(),
216 detail: error.to_string(),
217 })
218 }
219
220 fn channel_type(&self) -> &str {
221 "telegram-native"
222 }
223}
224
225pub fn channel_from_config(
227 channel_type: &str,
228 config: &ChannelConfig,
229) -> std::result::Result<Box<dyn Channel>, DeliveryError> {
230 match channel_type {
231 "telegram" => {
232 if let Some(native) = NativeTelegramChannel::from_config(config) {
233 Ok(Box::new(native))
234 } else {
235 Ok(Box::new(TelegramChannel::from_config(config)))
236 }
237 }
238 other => Err(DeliveryError::UnsupportedChannel {
239 channel_type: other.to_string(),
240 }),
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247 use std::fs;
248
249 #[test]
250 fn telegram_channel_type() {
251 let ch = TelegramChannel::new("12345".into(), "openclaw".into());
252 assert_eq!(ch.channel_type(), "telegram");
253 }
254
255 #[test]
256 fn native_telegram_channel_type() {
257 let bot = TelegramBot::new("test-token".into(), vec![]);
258 let ch = NativeTelegramChannel::new(bot, "12345".into());
259 assert_eq!(ch.channel_type(), "telegram-native");
260 }
261
262 #[test]
263 fn channel_from_config_telegram() {
264 let config = ChannelConfig {
265 target: "12345".into(),
266 provider: "openclaw".into(),
267 bot_token: None,
268 allowed_user_ids: vec![],
269 };
270 if std::env::var("BATTY_TELEGRAM_BOT_TOKEN").is_err() {
272 let ch = channel_from_config("telegram", &config).unwrap();
273 assert_eq!(ch.channel_type(), "telegram");
274 }
275 }
276
277 #[test]
278 fn channel_from_config_telegram_with_bot_token() {
279 let config = ChannelConfig {
280 target: "12345".into(),
281 provider: "openclaw".into(),
282 bot_token: Some("test-bot-token".into()),
283 allowed_user_ids: vec![],
284 };
285 let ch = channel_from_config("telegram", &config).unwrap();
286 assert_eq!(ch.channel_type(), "telegram-native");
287 }
288
289 #[test]
290 fn channel_from_config_telegram_without_bot_token() {
291 let config = ChannelConfig {
292 target: "12345".into(),
293 provider: "openclaw".into(),
294 bot_token: None,
295 allowed_user_ids: vec![],
296 };
297 if std::env::var("BATTY_TELEGRAM_BOT_TOKEN").is_err() {
299 let ch = channel_from_config("telegram", &config).unwrap();
300 assert_eq!(ch.channel_type(), "telegram");
301 }
302 }
303
304 #[test]
305 fn channel_from_config_unknown_type() {
306 let config = ChannelConfig {
307 target: "x".into(),
308 provider: "x".into(),
309 bot_token: None,
310 allowed_user_ids: vec![],
311 };
312 match channel_from_config("slack", &config) {
313 Err(e) => assert!(e.to_string().contains("unsupported")),
314 Ok(_) => panic!("expected error for unsupported channel"),
315 }
316 }
317
318 #[test]
319 fn telegram_send_fails_gracefully_with_missing_provider() {
320 let ch = TelegramChannel::new("12345".into(), "/nonexistent/binary".into());
321 let result = ch.send("hello");
322 assert!(result.is_err());
323 assert!(
324 result
325 .unwrap_err()
326 .to_string()
327 .contains("failed to execute")
328 );
329 }
330
331 #[test]
332 fn telegram_message_id_changes_with_target_and_body() {
333 let first = telegram_message_id("12345", "hello");
334 let second = telegram_message_id("12345", "hello again");
335 let third = telegram_message_id("67890", "hello");
336 assert_ne!(first, second);
337 assert_ne!(first, third);
338 }
339
340 #[test]
341 fn telegram_recent_sends_respects_ttl() {
342 let cache = RecentTelegramSends::new(Duration::from_millis(50), 16);
343 let id = telegram_message_id("12345", "hello");
344 assert!(!cache.contains_recent(id));
345 cache.record(id);
346 assert!(cache.contains_recent(id));
347 std::thread::sleep(Duration::from_millis(100));
348 assert!(!cache.contains_recent(id));
349 }
350
351 #[test]
352 fn telegram_channel_suppresses_duplicate_messages() {
353 let tmp = tempfile::tempdir().unwrap();
354 let log_path = tmp.path().join("provider.log");
355 let script_path = tmp.path().join("fake-provider.sh");
356 fs::write(
357 &script_path,
358 format!(
359 "#!/bin/sh\nprintf '%s\\n' \"$*\" >> \"{}\"\n",
360 log_path.display()
361 ),
362 )
363 .unwrap();
364 let mut perms = fs::metadata(&script_path).unwrap().permissions();
365 #[cfg(unix)]
366 {
367 use std::os::unix::fs::PermissionsExt;
368 perms.set_mode(0o755);
369 }
370 fs::set_permissions(&script_path, perms).unwrap();
371
372 let ch = TelegramChannel::with_dedup_settings(
373 "12345".into(),
374 script_path.display().to_string(),
375 Duration::from_secs(60),
376 16,
377 );
378 ch.send("hello").unwrap();
379 ch.send("hello").unwrap();
380
381 let lines = fs::read_to_string(&log_path).unwrap();
382 assert_eq!(lines.lines().count(), 1);
383 }
384
385 #[test]
386 fn telegram_channel_allows_unique_messages_and_retries_after_ttl() {
387 let tmp = tempfile::tempdir().unwrap();
388 let log_path = tmp.path().join("provider.log");
389 let script_path = tmp.path().join("fake-provider.sh");
390 fs::write(
391 &script_path,
392 format!(
393 "#!/bin/sh\nprintf '%s\\n' \"$*\" >> \"{}\"\n",
394 log_path.display()
395 ),
396 )
397 .unwrap();
398 let mut perms = fs::metadata(&script_path).unwrap().permissions();
399 #[cfg(unix)]
400 {
401 use std::os::unix::fs::PermissionsExt;
402 perms.set_mode(0o755);
403 }
404 fs::set_permissions(&script_path, perms).unwrap();
405
406 let ch = TelegramChannel::with_dedup_settings(
407 "12345".into(),
408 script_path.display().to_string(),
409 Duration::from_millis(5),
410 16,
411 );
412 ch.send("first").unwrap();
413 ch.send("second").unwrap();
414 std::thread::sleep(Duration::from_millis(10));
415 ch.send("first").unwrap();
416
417 let lines = fs::read_to_string(&log_path).unwrap();
418 assert_eq!(lines.lines().count(), 3);
419 }
420}