server_watchdog/application/event/
checker.rs1use std::pin::Pin;
2use std::sync::Arc;
3use std::time::Duration;
4use derive_new::new;
5use tokio::sync::mpsc::Sender;
6use tokio_stream::StreamExt;
7use crate::application::event::dto::EventMessage;
8use crate::application::server::ServerManager;
9use crate::domain::config::Config;
10use crate::domain::event::{Event, EventKind};
11use crate::domain::file_accessor::FileAccessor;
12
13#[derive(new)]
14pub struct GeneralEventChecker {
15 config_file_accessor: Arc<dyn FileAccessor<Config>>,
16 server_manager: Arc<dyn ServerManager>,
17 tx: Sender<EventMessage>,
18 health_event_checker: Box<dyn EventChecker>,
19 log_event_checker: Box<dyn EventChecker>
20}
21
22impl GeneralEventChecker {
23
24 pub async fn init(&self) {
25 let config = self.config_file_accessor.read()
26 .await.unwrap();
27 let events: Vec<Event> = config.events.into_iter()
28 .map(|event_config|{Event::from(event_config)})
29 .collect();
30 for event in events {
31 self.check(event);
32 }
33 }
34
35 fn check(&self, event: Event) {
36 match &event.event_kind {
37 EventKind::Health {server_name: _, keyword: _} => {
38 self.health_event_checker
39 .check(event, self.server_manager.clone(), self.tx.clone())
40 },
41 EventKind::Log {server_name: _, keyword: _} => {
42 self.log_event_checker
43 .check(event, self.server_manager.clone(), self.tx.clone())
44 },
45 EventKind::None => return
46 }
47 }
48}
49
50pub trait EventChecker {
51 fn check(&self, event: Event, server_manager: Arc<dyn ServerManager>, tx: Sender<EventMessage>);
52}
53
54#[derive(new)]
55pub struct HealthEventChecker;
56
57impl EventChecker for HealthEventChecker {
58 fn check(&self, event: Event, server_manager: Arc<dyn ServerManager>, tx: Sender<EventMessage>) {
59 if let EventKind::Health { server_name, keyword } = event.event_kind {
60 let event_name = event.name;
61 tokio::spawn(async move {
62 loop {
63 let health = server_manager.healthcheck(server_name.as_str()).await;
64 if health.to_string().contains(keyword.as_str()) {
65 let _ = tx.send(EventMessage {
66 event_name: event_name.clone(),
67 text: format!("Keyword '{}' found in health check of server '{}'", keyword, server_name),
68 }).await;
69 }
70 tokio::time::sleep(Duration::from_secs(30)).await;
71 }
72 });
73 }
74 }
75}
76
77#[derive(new)]
78pub struct LogEventChecker;
79
80impl EventChecker for LogEventChecker {
81 fn check(&self, event: Event, server_manager: Arc<dyn ServerManager>, tx: Sender<EventMessage>) {
82
83 if let EventKind::Log {server_name, keyword} = event.event_kind {
84 let event_name = event.name;
85 tokio::spawn(async move {
86 let stream = server_manager.logs_stream(server_name.as_str()).await;
87
88 if let Some(stream) = stream {
89 let mut stream = Pin::from(stream);
90 while let Some(line) = stream.next().await {
91 if line.contains(keyword.as_str()) {
92 let _ = tx.send(EventMessage {
93 event_name: event_name.clone(),
94 text: format!("Keyword '{}' found in logs of server '{}'\nLog: {}", keyword, server_name, line),
95 }).await;
96 }
97 }
98 }
99 });
100 }
101 }
102}