server_watchdog/infrastructure/client/
telegram.rs1pub mod dto;
2
3use std::sync::Arc;
4use std::time::Duration;
5use anyhow::{anyhow, Result};
6use async_trait::async_trait;
7use dto::SendMessageDto;
8use log::{debug, error, trace, warn};
9use tokio::sync::mpsc::{Sender};
10use crate::application::worker::Worker;
11use crate::domain;
12use crate::infrastructure::client::common::Client;
13use crate::infrastructure::client::telegram::dto::{GetUpdateDto, Message, TelegramResponse, Update};
14use crate::infrastructure::common::api_client::ApiClient;
15
16#[derive(Clone)]
17pub struct TelegramClient {
18 name: String,
19 api_client: Arc<ApiClient>,
20 offset: i64,
21 tx: Option<Sender<domain::client::Message>>
22}
23
24impl TelegramClient {
25 pub fn new(name: String, token: String) -> Self {
26 trace!("TelegramClient::new(name: {}, token: ...)", &name);
27 Self {
28 name,
29 api_client: Arc::new(ApiClient::new(format!("https://api.telegram.org/bot{token}"))),
30 offset: 0,
31 tx: None
32 }
33 }
34
35 async fn get_update(&mut self) -> Result<Vec<Update>> {
36 trace!("TelegramClient::get_update");
37 let offset = self.offset;
38 let dto = GetUpdateDto::new(offset);
39 debug!("get_update request: {:?}", &dto);
40 match self.api_client.post_json::<GetUpdateDto, TelegramResponse<Vec<Update>>>("getUpdates", &dto, None, None).await {
41 Ok(updates) => {
42 debug!("[TelegramClient] Ok: Successfully get update");
43 if !updates.ok {
44 return Err(anyhow!("[TelegramClient] status: {} {}", updates.error_code.unwrap(), updates.description.unwrap()));
45 }
46 debug!("get_update response: {:?}", &updates.result);
47
48 if let Some(new_offset) = updates.result.iter().map(|update: &Update|{update.update_id}).max() {
49 self.offset = new_offset + 1;
50 debug!("new offset: {}", &self.offset);
51 }
52
53 Ok(updates.result)
54 },
55 Err(e) => {
56 Err(anyhow!("[TelegramClient] Err: {}", e))
57 }
58 }
59 }
60
61 async fn send_message_direct(&self, send_message_dto: SendMessageDto) -> bool {
62 trace!("TelegramClient::send_message_direct");
63 debug!("send_message_direct request: {:?}", &send_message_dto);
64 let response = self.api_client
65 .post_json::<SendMessageDto, TelegramResponse<Message>> (
66 "sendMessage",
67 &send_message_dto, None, None).await;
68
69 if response.is_err() {
70 error!("[Err]: {}", response.err().unwrap().to_string());
71 return false
72 }
73 debug!("send_message_direct response: {:?}", &response);
74
75 true
76 }
77}
78
79#[async_trait]
80impl Client for TelegramClient {
81
82 async fn send_message(&self, chat_id: &str, data: &str) -> bool {
83 trace!("Client::send_message(chat_id: {}, data: ...)", chat_id);
84 self.send_message_direct(SendMessageDto::new(chat_id, data, None)).await
85 }
86
87 fn subscribe(&mut self, tx: Sender<domain::client::Message>) {
88 trace!("Client::subscribe");
89 self.tx = Some(tx);
90 }
91}
92
93#[async_trait]
94impl Worker for TelegramClient {
95 async fn on_tick(&mut self) -> bool {
96 trace!("Worker::on_tick for {}", &self.name);
97 let updates = match self.get_update().await {
98 Ok(updates) => {
99 updates
100 },
101 Err(e) => {
102 error!("[TelegramClient] Err: {e}");
103 tokio::time::sleep(Duration::from_secs(5)).await;
104 return true;
105 }
106 };
107 debug!("{} updates received", updates.len());
108
109 for update in updates {
110 let (chat_id, data) = if let Some(msg) = update.message {
111 (msg.chat.id.to_string(), msg.text.unwrap_or("".to_string()))
112 } else if let Some(cb) = update.callback_query {
113 let chat_id = match cb.message {
114 Some(message) => message.chat.id.to_string(),
115 None => continue
116 };
117 let text = cb.data.clone().unwrap_or_default();
118 (chat_id, text)
119 } else {
120 continue;
121 };
122 let message = domain::client::Message::new(
123 self.get_name().to_string(), chat_id, data
124 );
125 debug!("created message: {:?}", &message);
126
127 if let Some(tx) = &self.tx {
128 if let Err(e) = tx.send(message).await {
129 warn!("[TelegramClient] Err: {}", e);
130 }
131 }
132 }
133 true
134 }
135
136 fn get_name(&self) -> &str {
137 trace!("Worker::get_name for {}", &self.name);
138 self.name.as_str()
139 }
140
141 fn interval(&self) -> i32 {
142 trace!("Worker::interval for {}", &self.name);
143 5
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use std::env;
150 use super::*;
151 use dotenv::dotenv;
152
153 #[tokio::test]
154 async fn get_update() {
155 dotenv().ok();
156 let token = env::var("TELEGRAM_TOKEN").unwrap();
157 let mut telegram_client = TelegramClient::new("test_client".to_string(), token);
158 let response = telegram_client.get_update().await;
159
160 assert!(response.is_ok());
161 println!("{:?}", response.unwrap());
162
163 let response = telegram_client.get_update().await;
164
165 assert!(response.is_ok());
166 assert_eq!(response.unwrap().len(), 0);
167 }
168
169 #[tokio::test]
170 async fn send_message() {
171 dotenv().ok();
172 let token = env::var("TELEGRAM_TOKEN").unwrap();
173 let telegram_client = TelegramClient::new("test_client".to_string(), token);
174 telegram_client.send_message(env::var("CHAT_ID").unwrap().as_str(), "test message").await;
175 }
176}