talos_certifier/
talos_certifier_service.rs

1use std::sync::{
2    atomic::{AtomicBool, Ordering},
3    Arc,
4};
5
6use crate::{core::ServiceResult, SystemMessage};
7use futures_util::future::join_all;
8use log::{error, info};
9
10use crate::core::{System, SystemService};
11
12pub struct TalosCertifierServiceBuilder {
13    system: System,
14    certifier_service: Option<Box<dyn SystemService + Send + Sync>>,
15    services: Vec<Box<dyn SystemService + Send + Sync>>,
16}
17
18impl TalosCertifierServiceBuilder {
19    pub fn new(system: System) -> Self {
20        Self {
21            system,
22            certifier_service: None,
23            services: vec![],
24        }
25    }
26
27    pub fn add_adapter_service(mut self, service: Box<dyn SystemService + Send + Sync>) -> Self {
28        self.services.push(service);
29        self
30    }
31
32    pub fn add_health_check_service(mut self, hc_service: Box<dyn SystemService + Send + Sync>) -> Self {
33        self.services.push(hc_service);
34        self
35    }
36
37    pub fn add_certifier_service(mut self, certifier_service: Box<dyn SystemService + Send + Sync>) -> Self {
38        self.certifier_service = Some(certifier_service);
39        self
40    }
41
42    pub fn build(self) -> TalosCertifierService {
43        let mut services = self.services;
44        services.push(self.certifier_service.expect("Certifier Service is mandatory"));
45
46        TalosCertifierService {
47            system: self.system,
48            services,
49            shutdown_flag: Arc::new(AtomicBool::new(false)),
50        }
51    }
52}
53
54pub struct TalosCertifierService {
55    pub system: System,
56    pub services: Vec<Box<dyn SystemService + Send + Sync>>,
57    pub shutdown_flag: Arc<AtomicBool>,
58}
59
60impl TalosCertifierService {
61    pub async fn run(self) -> ServiceResult {
62        let service_handle = self.services.into_iter().map(|mut service| {
63            tokio::spawn({
64                let shutdown_notifier_cloned = self.system.system_notifier.clone();
65                let mut shutdown_receiver = shutdown_notifier_cloned.subscribe();
66                let shutdown_flag = Arc::clone(&self.shutdown_flag);
67
68                async move {
69                    let mut result: ServiceResult = Ok(());
70                    while !shutdown_flag.load(Ordering::Relaxed) {
71                        tokio::select! {
72                            svc_result = service.run() => {
73                                if let Err(service_error) = svc_result {
74                                    error!("Error found in service=({}) !!!! {:?}", service_error.service, service_error);
75                                    shutdown_notifier_cloned.send(SystemMessage::Shutdown).unwrap();
76                                };
77                            },
78                            msg = shutdown_receiver.recv() => {
79                                let message = msg.unwrap();
80
81                                match message {
82                                    SystemMessage::Shutdown => {
83                                        info!("Shutdown received");
84                                        let _ = &shutdown_flag.swap(true, Ordering::Relaxed);
85                                    },
86                                    SystemMessage::ShutdownWithError(service_error) => {
87                                        info!("Shutdown received due to error");
88                                        let _ = &shutdown_flag.swap(true, Ordering::Relaxed);
89                                         result = Err(service_error);
90                                    },
91
92                                    _ => ()
93                                }
94
95                            }
96                        }
97                    }
98                    result
99                }
100            })
101        });
102
103        let k = join_all(service_handle).await;
104
105        for res in k {
106            res.unwrap()?
107        }
108
109        Ok(())
110    }
111}