Skip to main content

server_watchdog/infrastructure/client/
telegram.rs

1pub mod dto;
2
3use std::sync::Arc;
4use std::time::Duration;
5use anyhow::{anyhow, Result};
6use async_trait::async_trait;
7use dto::SendMessageDto;
8use log::{debug, error, trace, warn};
9use tokio::sync::mpsc::{Sender};
10use crate::application::worker::Worker;
11use crate::domain;
12use crate::infrastructure::client::common::Client;
13use crate::infrastructure::client::telegram::dto::{GetUpdateDto, Message, TelegramResponse, Update};
14use crate::infrastructure::common::api_client::ApiClient;
15
16#[derive(Clone)]
17pub struct TelegramClient {
18    name: String,
19    api_client: Arc<ApiClient>,
20    offset: i64,
21    tx: Option<Sender<domain::client::Message>>
22}
23
24impl TelegramClient {
25    pub fn new(name: String, token: String) -> Self {
26        trace!("TelegramClient::new(name: {}, token: ...)", &name);
27        Self {
28            name,
29            api_client: Arc::new(ApiClient::new(format!("https://api.telegram.org/bot{token}"))),
30            offset: 0,
31            tx: None
32        }
33    }
34
35    async fn get_update(&mut self) -> Result<Vec<Update>> {
36        trace!("TelegramClient::get_update");
37        let offset = self.offset;
38        let dto = GetUpdateDto::new(offset);
39        debug!("get_update request: {:?}", &dto);
40        match self.api_client.post_json::<GetUpdateDto, TelegramResponse<Vec<Update>>>("getUpdates", &dto, None, None).await {
41            Ok(updates) => {
42                debug!("[TelegramClient] Ok: Successfully get update");
43                if !updates.ok {
44                    return Err(anyhow!("[TelegramClient] status: {} {}", updates.error_code.unwrap(), updates.description.unwrap()));
45                }
46                debug!("get_update response: {:?}", &updates.result);
47
48                if let Some(new_offset) = updates.result.iter().map(|update: &Update|{update.update_id}).max() {
49                    self.offset = new_offset + 1;
50                    debug!("new offset: {}", &self.offset);
51                }
52
53                Ok(updates.result)
54            },
55            Err(e) => {
56                Err(anyhow!("[TelegramClient] Err: {}", e))
57            }
58        }
59    }
60
61    async fn send_message_direct(&self, send_message_dto: SendMessageDto) -> bool {
62        trace!("TelegramClient::send_message_direct");
63        debug!("send_message_direct request: {:?}", &send_message_dto);
64        let response = self.api_client
65            .post_json::<SendMessageDto, TelegramResponse<Message>> (
66                "sendMessage",
67                &send_message_dto, None, None).await;
68
69        if response.is_err() {
70            error!("[Err]: {}", response.err().unwrap().to_string());
71            return false
72        }
73        debug!("send_message_direct response: {:?}", &response);
74
75        true
76    }
77}
78
79#[async_trait]
80impl Client for TelegramClient {
81
82    async fn send_message(&self, chat_id: &str, data: &str) -> bool {
83        trace!("Client::send_message(chat_id: {}, data: ...)", chat_id);
84        self.send_message_direct(SendMessageDto::new(chat_id, data, None)).await
85    }
86
87    fn subscribe(&mut self, tx: Sender<domain::client::Message>) {
88        trace!("Client::subscribe");
89        self.tx = Some(tx);
90    }
91}
92
93#[async_trait]
94impl Worker for TelegramClient {
95    async fn on_tick(&mut self) -> bool {
96        trace!("Worker::on_tick for {}", &self.name);
97        let updates = match self.get_update().await {
98            Ok(updates) => {
99                updates
100            },
101            Err(e) => {
102                error!("[TelegramClient] Err: {e}");
103                tokio::time::sleep(Duration::from_secs(5)).await;
104                return true;
105            }
106        };
107        debug!("{} updates received", updates.len());
108
109        for update in updates {
110            let (chat_id, data) = if let Some(msg) = update.message {
111                (msg.chat.id.to_string(), msg.text.unwrap_or("".to_string()))
112            } else if let Some(cb) = update.callback_query {
113                let chat_id = match cb.message {
114                    Some(message) => message.chat.id.to_string(),
115                    None => continue
116                };
117                let text = cb.data.clone().unwrap_or_default();
118                (chat_id, text)
119            } else {
120                continue;
121            };
122            let message = domain::client::Message::new(
123                self.get_name().to_string(), chat_id, data
124            );
125            debug!("created message: {:?}", &message);
126
127            if let Some(tx) = &self.tx {
128                if let Err(e) = tx.send(message).await {
129                    warn!("[TelegramClient] Err: {}", e);
130                }
131            }
132        }
133        true
134    }
135
136    fn get_name(&self) -> &str {
137        trace!("Worker::get_name for {}", &self.name);
138        self.name.as_str()
139    }
140
141    fn interval(&self) -> i32 {
142        trace!("Worker::interval for {}", &self.name);
143        5
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use std::env;
150    use super::*;
151    use dotenv::dotenv;
152
153    #[tokio::test]
154    async fn get_update() {
155        dotenv().ok();
156        let token = env::var("TELEGRAM_TOKEN").unwrap();
157        let mut telegram_client = TelegramClient::new("test_client".to_string(), token);
158        let response = telegram_client.get_update().await;
159
160        assert!(response.is_ok());
161        println!("{:?}", response.unwrap());
162
163        let response = telegram_client.get_update().await;
164
165        assert!(response.is_ok());
166        assert_eq!(response.unwrap().len(), 0);
167    }
168
169    #[tokio::test]
170    async fn send_message() {
171        dotenv().ok();
172        let token = env::var("TELEGRAM_TOKEN").unwrap();
173        let telegram_client = TelegramClient::new("test_client".to_string(), token);
174         telegram_client.send_message(env::var("CHAT_ID").unwrap().as_str(), "test message").await;
175    }
176}