Skip to main content

server_watchdog/application/event/
checker.rs

1use std::pin::Pin;
2use std::sync::Arc;
3use std::time::Duration;
4use derive_new::new;
5use tokio::sync::mpsc::Sender;
6use tokio_stream::StreamExt;
7use crate::application::event::dto::EventMessage;
8use crate::application::server::ServerManager;
9use crate::domain::config::Config;
10use crate::domain::event::{Event, EventKind};
11use crate::domain::file_accessor::FileAccessor;
12
13#[derive(new)]
14pub struct GeneralEventChecker {
15    config_file_accessor: Arc<dyn FileAccessor<Config>>,
16    server_manager: Arc<dyn ServerManager>,
17    tx: Sender<EventMessage>,
18    health_event_checker: Box<dyn EventChecker>,
19    log_event_checker: Box<dyn EventChecker>
20}
21
22impl GeneralEventChecker {
23
24    pub async fn init(&self) {
25        let config = self.config_file_accessor.read()
26            .await.unwrap();
27        let events: Vec<Event> = config.events.into_iter()
28            .map(|event_config|{Event::from(event_config)})
29            .collect();
30        for event in events {
31            self.check(event);
32        }
33    }
34
35    fn check(&self, event: Event) {
36        match &event.event_kind {
37            EventKind::Health {server_name: _, keyword: _} => {
38                self.health_event_checker
39                    .check(event, self.server_manager.clone(), self.tx.clone())
40            },
41            EventKind::Log {server_name: _, keyword: _} => {
42                self.log_event_checker
43                    .check(event, self.server_manager.clone(), self.tx.clone())
44            },
45            EventKind::None => return
46        }
47    }
48}
49
50pub trait EventChecker {
51    fn check(&self, event: Event, server_manager: Arc<dyn ServerManager>, tx: Sender<EventMessage>);
52}
53
54#[derive(new)]
55pub struct HealthEventChecker;
56
57impl EventChecker for HealthEventChecker {
58    fn check(&self, event: Event, server_manager: Arc<dyn ServerManager>, tx: Sender<EventMessage>) {
59        if let EventKind::Health { server_name, keyword } = event.event_kind {
60            let event_name = event.name;
61            tokio::spawn(async move {
62                loop {
63                    let health = server_manager.healthcheck(server_name.as_str()).await;
64                    if health.to_string().contains(keyword.as_str()) {
65                        let _ = tx.send(EventMessage {
66                            event_name: event_name.clone(),
67                            text: format!("Keyword '{}' found in health check of server '{}'", keyword, server_name),
68                        }).await;
69                    }
70                    tokio::time::sleep(Duration::from_secs(30)).await;
71                }
72            });
73        }
74    }
75}
76
77#[derive(new)]
78pub struct LogEventChecker;
79
80impl EventChecker for LogEventChecker {
81    fn check(&self, event: Event, server_manager: Arc<dyn ServerManager>, tx: Sender<EventMessage>) {
82
83        if let EventKind::Log {server_name, keyword} = event.event_kind {
84            let event_name = event.name;
85            tokio::spawn(async move {
86                let stream = server_manager.logs_stream(server_name.as_str()).await;
87
88                if let Some(stream) = stream {
89                    let mut stream = Pin::from(stream);
90                    while let Some(line) = stream.next().await {
91                        if line.contains(keyword.as_str()) {
92                            let _ = tx.send(EventMessage {
93                                event_name: event_name.clone(),
94                                text: format!("Keyword '{}' found in logs of server '{}'\nLog: {}", keyword, server_name, line),
95                            }).await;
96                        }
97                    }
98                }
99            });
100        }
101    }
102}