Skip to main content

server_watchdog/infrastructure/
client.rs

1pub mod common;
2pub mod telegram;
3
4use std::collections::HashMap;
5use std::error::Error;
6use std::sync::{Arc, Mutex};
7use async_trait::async_trait;
8use derive_new::new;
9use tokio::sync::mpsc;
10use tokio::sync::mpsc::Receiver;
11pub use common::*;
12use crate::application::client::{ClientLoader, MessageGateway};
13use crate::application::worker::Worker;
14use crate::domain::client::Message;
15use crate::infrastructure::{client};
16use crate::application::worker::WorkerRunner;
17use crate::domain::config::Config;
18use crate::domain::file_accessor::FileAccessor;
19
20#[derive(new, Clone)]
21pub struct MessageAdapter {
22    client_loader: Arc<dyn ClientLoader>
23}
24
25#[async_trait]
26impl MessageGateway for MessageAdapter {
27    async fn send_message(&self, client_name: &str, chat_id: &str, message: &str) {
28        let client = self.client_loader.find(client_name)
29            .expect(format!("client({client_name}) is not available").as_str());
30
31        let total_len = message.len();
32        
33        let mut cut_length = 0;
34        
35        while cut_length < total_len {
36            let end = std::cmp::min(cut_length + 4000, total_len);
37            let chunk = &message[cut_length..end];
38            
39            client.send_message(chat_id, chunk).await;
40            
41            cut_length = end;
42            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
43        }
44    }
45}
46
47#[derive(Clone, new)]
48pub struct ClientManager {
49    worker_runner: Arc<Mutex<WorkerRunner>>,
50    client_map: Arc<Mutex<HashMap<String, Box<dyn Client>>>>,
51    config_file_accessor: Arc<dyn FileAccessor<Config>>
52}
53
54
55#[async_trait]
56impl ClientLoader for ClientManager {
57    async fn load_clients(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
58        let clients = self.config_file_accessor.read().await?.clients;
59        let clients: Vec<Box<dyn Client>> = clients.into_iter()
60            .map(|client_config| {client::from(client_config)})
61            .filter(|option|{option.is_some()})
62            .map(|client| { client.unwrap() })
63            .collect();
64
65        let mut client_map = self.client_map.lock().unwrap();
66        client_map.clear();
67
68        for client in clients.into_iter() {
69            client_map.insert(client.get_name().to_string(), client);
70        }
71        Ok(())
72    }
73
74    fn find(&self, name: &str) -> Option<Box<dyn Client>> {
75        let client_map = self.client_map.lock().unwrap();
76        client_map.get(name).map(|c| dyn_clone::clone_box(&**c))
77    }
78
79    async fn run(&mut self) -> Receiver<Message> {
80        let (tx, rx) = mpsc::channel(16);
81        let mut clients: Vec<Box<dyn Client>> = self.client_map.lock().unwrap()
82            .values()
83            .map(|c| dyn_clone::clone_box(&**c))
84            .collect();
85
86        for client in clients.iter_mut() {
87            let tx = tx.clone();
88            client.subscribe(tx);
89        }
90
91        let workers: Vec<Box<dyn Worker>> = clients
92            .into_iter()
93            .map(|c| c as Box<dyn Worker>)
94            .collect();
95
96        self.worker_runner.lock().unwrap().run_batch(workers);
97
98        rx
99    }
100}