server_watchdog/infrastructure/
client.rs1pub mod common;
2pub mod telegram;
3
4use std::collections::HashMap;
5use std::error::Error;
6use std::sync::{Arc, Mutex};
7use async_trait::async_trait;
8use derive_new::new;
9use tokio::sync::mpsc;
10use tokio::sync::mpsc::Receiver;
11pub use common::*;
12use crate::application::client::{ClientLoader, MessageGateway};
13use crate::application::worker::Worker;
14use crate::domain::client::Message;
15use crate::infrastructure::{client};
16use crate::application::worker::WorkerRunner;
17use crate::domain::config::Config;
18use crate::domain::file_accessor::FileAccessor;
19
20#[derive(new, Clone)]
21pub struct MessageAdapter {
22 client_loader: Arc<dyn ClientLoader>
23}
24
25#[async_trait]
26impl MessageGateway for MessageAdapter {
27 async fn send_message(&self, client_name: &str, chat_id: &str, message: &str) {
28 let client = self.client_loader.find(client_name)
29 .expect(format!("client({client_name}) is not available").as_str());
30
31 let total_len = message.len();
32
33 let mut cut_length = 0;
34
35 while cut_length < total_len {
36 let end = std::cmp::min(cut_length + 4000, total_len);
37 let chunk = &message[cut_length..end];
38
39 client.send_message(chat_id, chunk).await;
40
41 cut_length = end;
42 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
43 }
44 }
45}
46
47#[derive(Clone, new)]
48pub struct ClientManager {
49 worker_runner: Arc<Mutex<WorkerRunner>>,
50 client_map: Arc<Mutex<HashMap<String, Box<dyn Client>>>>,
51 config_file_accessor: Arc<dyn FileAccessor<Config>>
52}
53
54
55#[async_trait]
56impl ClientLoader for ClientManager {
57 async fn load_clients(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
58 let clients = self.config_file_accessor.read().await?.clients;
59 let clients: Vec<Box<dyn Client>> = clients.into_iter()
60 .map(|client_config| {client::from(client_config)})
61 .filter(|option|{option.is_some()})
62 .map(|client| { client.unwrap() })
63 .collect();
64
65 let mut client_map = self.client_map.lock().unwrap();
66 client_map.clear();
67
68 for client in clients.into_iter() {
69 client_map.insert(client.get_name().to_string(), client);
70 }
71 Ok(())
72 }
73
74 fn find(&self, name: &str) -> Option<Box<dyn Client>> {
75 let client_map = self.client_map.lock().unwrap();
76 client_map.get(name).map(|c| dyn_clone::clone_box(&**c))
77 }
78
79 async fn run(&mut self) -> Receiver<Message> {
80 let (tx, rx) = mpsc::channel(16);
81 let mut clients: Vec<Box<dyn Client>> = self.client_map.lock().unwrap()
82 .values()
83 .map(|c| dyn_clone::clone_box(&**c))
84 .collect();
85
86 for client in clients.iter_mut() {
87 let tx = tx.clone();
88 client.subscribe(tx);
89 }
90
91 let workers: Vec<Box<dyn Worker>> = clients
92 .into_iter()
93 .map(|c| c as Box<dyn Worker>)
94 .collect();
95
96 self.worker_runner.lock().unwrap().run_batch(workers);
97
98 rx
99 }
100}