1use super::{
7 Channel, ChannelCapabilities, ChannelMessage, ChannelStatus, ChannelType, ChannelUser,
8 MessageId, StreamingMode,
9};
10use crate::error::{ChannelError, RustantError};
11use async_trait::async_trait;
12use serde::{Deserialize, Serialize};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct TelegramConfig {
17 pub bot_token: String,
18 pub allowed_chat_ids: Vec<i64>,
19 pub polling_timeout_secs: u64,
20}
21
22impl Default for TelegramConfig {
23 fn default() -> Self {
24 Self {
25 bot_token: String::new(),
26 allowed_chat_ids: Vec::new(),
27 polling_timeout_secs: 30,
28 }
29 }
30}
31
32#[async_trait]
34pub trait TelegramHttpClient: Send + Sync {
35 async fn send_message(&self, chat_id: i64, text: &str) -> Result<String, String>;
36 async fn get_updates(&self, offset: i64) -> Result<Vec<TelegramUpdate>, String>;
37}
38
39#[derive(Debug, Clone)]
41pub struct TelegramUpdate {
42 pub update_id: i64,
43 pub chat_id: i64,
44 pub from_id: i64,
45 pub from_name: String,
46 pub text: String,
47}
48
49pub struct TelegramChannel {
51 config: TelegramConfig,
52 status: ChannelStatus,
53 http_client: Box<dyn TelegramHttpClient>,
54 last_update_id: i64,
55 name: String,
56}
57
58impl TelegramChannel {
59 pub fn new(config: TelegramConfig, http_client: Box<dyn TelegramHttpClient>) -> Self {
60 Self {
61 config,
62 status: ChannelStatus::Disconnected,
63 http_client,
64 last_update_id: 0,
65 name: "telegram".to_string(),
66 }
67 }
68
69 pub fn with_name(mut self, name: impl Into<String>) -> Self {
70 self.name = name.into();
71 self
72 }
73}
74
75#[async_trait]
76impl Channel for TelegramChannel {
77 fn name(&self) -> &str {
78 &self.name
79 }
80
81 fn channel_type(&self) -> ChannelType {
82 ChannelType::Telegram
83 }
84
85 async fn connect(&mut self) -> Result<(), RustantError> {
86 if self.config.bot_token.is_empty() {
87 return Err(RustantError::Channel(ChannelError::AuthFailed {
88 name: self.name.clone(),
89 }));
90 }
91 self.status = ChannelStatus::Connected;
92 Ok(())
93 }
94
95 async fn disconnect(&mut self) -> Result<(), RustantError> {
96 self.status = ChannelStatus::Disconnected;
97 Ok(())
98 }
99
100 async fn send_message(&self, msg: ChannelMessage) -> Result<MessageId, RustantError> {
101 let text = msg.content.as_text().unwrap_or("");
102 let chat_id: i64 = msg.channel_id.parse().unwrap_or(0);
103
104 self.http_client
105 .send_message(chat_id, text)
106 .await
107 .map(MessageId::new)
108 .map_err(|e| {
109 RustantError::Channel(ChannelError::SendFailed {
110 name: self.name.clone(),
111 message: e,
112 })
113 })
114 }
115
116 async fn receive_messages(&self) -> Result<Vec<ChannelMessage>, RustantError> {
117 let updates = self
118 .http_client
119 .get_updates(self.last_update_id + 1)
120 .await
121 .map_err(|e| {
122 RustantError::Channel(ChannelError::ConnectionFailed {
123 name: self.name.clone(),
124 message: e,
125 })
126 })?;
127
128 let messages: Vec<ChannelMessage> = updates
129 .into_iter()
130 .filter(|u| {
131 self.config.allowed_chat_ids.is_empty()
132 || self.config.allowed_chat_ids.contains(&u.chat_id)
133 })
134 .map(|u| {
135 let sender = ChannelUser::new(u.from_id.to_string(), ChannelType::Telegram)
136 .with_name(u.from_name);
137 ChannelMessage::text(ChannelType::Telegram, u.chat_id.to_string(), sender, u.text)
138 })
139 .collect();
140
141 Ok(messages)
142 }
143
144 fn status(&self) -> ChannelStatus {
145 self.status
146 }
147
148 fn capabilities(&self) -> ChannelCapabilities {
149 ChannelCapabilities {
150 supports_threads: true,
151 supports_reactions: true,
152 supports_files: true,
153 supports_voice: true,
154 supports_video: false,
155 max_message_length: Some(4096),
156 supports_editing: false,
157 supports_deletion: false,
158 }
159 }
160
161 fn streaming_mode(&self) -> StreamingMode {
162 StreamingMode::Polling {
163 interval_ms: self.config.polling_timeout_secs * 1000,
164 }
165 }
166}
167
168pub struct RealTelegramHttp {
170 client: reqwest::Client,
171 base_url: String,
172}
173
174impl RealTelegramHttp {
175 pub fn new(bot_token: &str) -> Self {
176 Self {
177 client: reqwest::Client::new(),
178 base_url: format!("https://api.telegram.org/bot{}", bot_token),
179 }
180 }
181}
182
183#[async_trait]
184impl TelegramHttpClient for RealTelegramHttp {
185 async fn send_message(&self, chat_id: i64, text: &str) -> Result<String, String> {
186 let url = format!("{}/sendMessage", self.base_url);
187 let resp = self
188 .client
189 .post(&url)
190 .json(&serde_json::json!({
191 "chat_id": chat_id,
192 "text": text,
193 }))
194 .send()
195 .await
196 .map_err(|e| format!("HTTP error: {e}"))?;
197
198 let status = resp.status();
199 let body: serde_json::Value = resp
200 .json()
201 .await
202 .map_err(|e| format!("JSON parse error: {e}"))?;
203
204 if !body["ok"].as_bool().unwrap_or(false) {
205 let desc = body["description"].as_str().unwrap_or("unknown error");
206 return Err(format!("Telegram API error ({}): {}", status, desc));
207 }
208
209 let message_id = body["result"]["message_id"]
210 .as_i64()
211 .unwrap_or(0)
212 .to_string();
213 Ok(message_id)
214 }
215
216 async fn get_updates(&self, offset: i64) -> Result<Vec<TelegramUpdate>, String> {
217 let url = format!("{}/getUpdates?offset={}&timeout=30", self.base_url, offset);
218 let resp = self
219 .client
220 .get(&url)
221 .send()
222 .await
223 .map_err(|e| format!("HTTP error: {e}"))?;
224
225 let body: serde_json::Value = resp
226 .json()
227 .await
228 .map_err(|e| format!("JSON parse error: {e}"))?;
229
230 if !body["ok"].as_bool().unwrap_or(false) {
231 let desc = body["description"].as_str().unwrap_or("unknown error");
232 return Err(format!("Telegram API error: {}", desc));
233 }
234
235 let updates = body["result"]
236 .as_array()
237 .unwrap_or(&Vec::new())
238 .iter()
239 .filter_map(|u| {
240 let msg = &u["message"];
241 Some(TelegramUpdate {
242 update_id: u["update_id"].as_i64()?,
243 chat_id: msg["chat"]["id"].as_i64()?,
244 from_id: msg["from"]["id"].as_i64().unwrap_or(0),
245 from_name: msg["from"]["first_name"]
246 .as_str()
247 .unwrap_or("Unknown")
248 .to_string(),
249 text: msg["text"].as_str().unwrap_or("").to_string(),
250 })
251 })
252 .collect();
253
254 Ok(updates)
255 }
256}
257
258pub fn create_telegram_channel(config: TelegramConfig) -> TelegramChannel {
260 let http = RealTelegramHttp::new(&config.bot_token);
261 TelegramChannel::new(config, Box::new(http))
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267 use crate::channels::ChannelUser;
268 use std::sync::{Arc, Mutex};
269
270 struct MockTelegramHttp {
271 sent: Arc<Mutex<Vec<(i64, String)>>>,
272 updates: Vec<TelegramUpdate>,
273 }
274
275 impl MockTelegramHttp {
276 fn new() -> Self {
277 Self {
278 sent: Arc::new(Mutex::new(Vec::new())),
279 updates: Vec::new(),
280 }
281 }
282
283 fn with_updates(mut self, updates: Vec<TelegramUpdate>) -> Self {
284 self.updates = updates;
285 self
286 }
287 }
288
289 #[async_trait]
290 impl TelegramHttpClient for MockTelegramHttp {
291 async fn send_message(&self, chat_id: i64, text: &str) -> Result<String, String> {
292 self.sent.lock().unwrap().push((chat_id, text.to_string()));
293 Ok("msg-123".to_string())
294 }
295
296 async fn get_updates(&self, _offset: i64) -> Result<Vec<TelegramUpdate>, String> {
297 Ok(self.updates.clone())
298 }
299 }
300
301 #[tokio::test]
302 async fn test_telegram_connect_no_token() {
303 let mut ch =
304 TelegramChannel::new(TelegramConfig::default(), Box::new(MockTelegramHttp::new()));
305 let result = ch.connect().await;
306 assert!(result.is_err());
307 }
308
309 #[tokio::test]
310 async fn test_telegram_connect_with_token() {
311 let config = TelegramConfig {
312 bot_token: "123:ABC".into(),
313 ..Default::default()
314 };
315 let mut ch = TelegramChannel::new(config, Box::new(MockTelegramHttp::new()));
316 ch.connect().await.unwrap();
317 assert_eq!(ch.status(), ChannelStatus::Connected);
318 }
319
320 #[tokio::test]
321 async fn test_telegram_send_message() {
322 let config = TelegramConfig {
323 bot_token: "123:ABC".into(),
324 ..Default::default()
325 };
326 let http = MockTelegramHttp::new();
327 let sent = http.sent.clone();
328 let mut ch = TelegramChannel::new(config, Box::new(http));
329 ch.connect().await.unwrap();
330
331 let sender = ChannelUser::new("bot", ChannelType::Telegram);
332 let msg = ChannelMessage::text(ChannelType::Telegram, "12345", sender, "Hello Telegram!");
333 let id = ch.send_message(msg).await.unwrap();
334 assert_eq!(id.0, "msg-123");
335
336 let sent = sent.lock().unwrap();
337 assert_eq!(sent.len(), 1);
338 assert_eq!(sent[0].0, 12345);
339 assert_eq!(sent[0].1, "Hello Telegram!");
340 }
341
342 #[tokio::test]
343 async fn test_telegram_receive_messages() {
344 let config = TelegramConfig {
345 bot_token: "123:ABC".into(),
346 allowed_chat_ids: vec![100],
347 ..Default::default()
348 };
349 let http = MockTelegramHttp::new().with_updates(vec![
350 TelegramUpdate {
351 update_id: 1,
352 chat_id: 100,
353 from_id: 42,
354 from_name: "Alice".into(),
355 text: "hello".into(),
356 },
357 TelegramUpdate {
358 update_id: 2,
359 chat_id: 999, from_id: 99,
361 from_name: "Eve".into(),
362 text: "spam".into(),
363 },
364 ]);
365 let mut ch = TelegramChannel::new(config, Box::new(http));
366 ch.connect().await.unwrap();
367
368 let msgs = ch.receive_messages().await.unwrap();
369 assert_eq!(msgs.len(), 1);
370 assert_eq!(msgs[0].content.as_text(), Some("hello"));
371 }
372
373 #[test]
374 fn test_telegram_capabilities() {
375 let ch = TelegramChannel::new(TelegramConfig::default(), Box::new(MockTelegramHttp::new()));
376 let caps = ch.capabilities();
377 assert!(caps.supports_threads);
378 assert!(caps.supports_reactions);
379 assert!(caps.supports_files);
380 assert!(caps.supports_voice);
381 assert!(!caps.supports_video);
382 assert_eq!(caps.max_message_length, Some(4096));
383 }
384
385 #[test]
386 fn test_telegram_streaming_mode() {
387 let ch = TelegramChannel::new(TelegramConfig::default(), Box::new(MockTelegramHttp::new()));
388 assert_eq!(
389 ch.streaming_mode(),
390 StreamingMode::Polling { interval_ms: 30000 }
391 );
392 }
393}