server-watchdog 0.1.0

A server monitoring and remote control tool via messenger.
Documentation
pub mod dto;

use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use dto::SendMessageDto;
use log::{debug, error, trace, warn};
use tokio::sync::mpsc::{Sender};
use crate::application::worker::Worker;
use crate::domain;
use crate::infrastructure::client::common::Client;
use crate::infrastructure::client::telegram::dto::{GetUpdateDto, Message, TelegramResponse, Update};
use crate::infrastructure::common::api_client::ApiClient;

#[derive(Clone)]
pub struct TelegramClient {
    name: String,
    api_client: Arc<ApiClient>,
    offset: i64,
    tx: Option<Sender<domain::client::Message>>
}

impl TelegramClient {
    pub fn new(name: String, token: String) -> Self {
        trace!("TelegramClient::new(name: {}, token: ...)", &name);
        Self {
            name,
            api_client: Arc::new(ApiClient::new(format!("https://api.telegram.org/bot{token}"))),
            offset: 0,
            tx: None
        }
    }

    async fn get_update(&mut self) -> Result<Vec<Update>> {
        trace!("TelegramClient::get_update");
        let offset = self.offset;
        let dto = GetUpdateDto::new(offset);
        debug!("get_update request: {:?}", &dto);
        match self.api_client.post_json::<GetUpdateDto, TelegramResponse<Vec<Update>>>("getUpdates", &dto, None, None).await {
            Ok(updates) => {
                debug!("[TelegramClient] Ok: Successfully get update");
                if !updates.ok {
                    return Err(anyhow!("[TelegramClient] status: {} {}", updates.error_code.unwrap(), updates.description.unwrap()));
                }
                debug!("get_update response: {:?}", &updates.result);

                if let Some(new_offset) = updates.result.iter().map(|update: &Update|{update.update_id}).max() {
                    self.offset = new_offset + 1;
                    debug!("new offset: {}", &self.offset);
                }

                Ok(updates.result)
            },
            Err(e) => {
                Err(anyhow!("[TelegramClient] Err: {}", e))
            }
        }
    }

    async fn send_message_direct(&self, send_message_dto: SendMessageDto) -> bool {
        trace!("TelegramClient::send_message_direct");
        debug!("send_message_direct request: {:?}", &send_message_dto);
        let response = self.api_client
            .post_json::<SendMessageDto, TelegramResponse<Message>> (
                "sendMessage",
                &send_message_dto, None, None).await;

        if response.is_err() {
            error!("[Err]: {}", response.err().unwrap().to_string());
            return false
        }
        debug!("send_message_direct response: {:?}", &response);

        true
    }
}

#[async_trait]
impl Client for TelegramClient {

    async fn send_message(&self, chat_id: &str, data: &str) -> bool {
        trace!("Client::send_message(chat_id: {}, data: ...)", chat_id);
        self.send_message_direct(SendMessageDto::new(chat_id, data, None)).await
    }

    fn subscribe(&mut self, tx: Sender<domain::client::Message>) {
        trace!("Client::subscribe");
        self.tx = Some(tx);
    }
}

#[async_trait]
impl Worker for TelegramClient {
    async fn on_tick(&mut self) -> bool {
        trace!("Worker::on_tick for {}", &self.name);
        let updates = match self.get_update().await {
            Ok(updates) => {
                updates
            },
            Err(e) => {
                error!("[TelegramClient] Err: {e}");
                tokio::time::sleep(Duration::from_secs(5)).await;
                return true;
            }
        };
        debug!("{} updates received", updates.len());

        for update in updates {
            let (chat_id, data) = if let Some(msg) = update.message {
                (msg.chat.id.to_string(), msg.text.unwrap_or("".to_string()))
            } else if let Some(cb) = update.callback_query {
                let chat_id = match cb.message {
                    Some(message) => message.chat.id.to_string(),
                    None => continue
                };
                let text = cb.data.clone().unwrap_or_default();
                (chat_id, text)
            } else {
                continue;
            };
            let message = domain::client::Message::new(
                self.get_name().to_string(), chat_id, data
            );
            debug!("created message: {:?}", &message);

            if let Some(tx) = &self.tx {
                if let Err(e) = tx.send(message).await {
                    warn!("[TelegramClient] Err: {}", e);
                }
            }
        }
        true
    }

    fn get_name(&self) -> &str {
        trace!("Worker::get_name for {}", &self.name);
        self.name.as_str()
    }

    fn interval(&self) -> i32 {
        trace!("Worker::interval for {}", &self.name);
        5
    }
}

#[cfg(test)]
mod tests {
    use std::env;
    use super::*;
    use dotenv::dotenv;

    #[tokio::test]
    async fn get_update() {
        dotenv().ok();
        let token = env::var("TELEGRAM_TOKEN").unwrap();
        let mut telegram_client = TelegramClient::new("test_client".to_string(), token);
        let response = telegram_client.get_update().await;

        assert!(response.is_ok());
        println!("{:?}", response.unwrap());

        let response = telegram_client.get_update().await;

        assert!(response.is_ok());
        assert_eq!(response.unwrap().len(), 0);
    }

    #[tokio::test]
    async fn send_message() {
        dotenv().ok();
        let token = env::var("TELEGRAM_TOKEN").unwrap();
        let telegram_client = TelegramClient::new("test_client".to_string(), token);
         telegram_client.send_message(env::var("CHAT_ID").unwrap().as_str(), "test message").await;
    }
}