talos_certifier/
talos_certifier_service.rs1use std::sync::{
2 atomic::{AtomicBool, Ordering},
3 Arc,
4};
5
6use crate::{core::ServiceResult, SystemMessage};
7use futures_util::future::join_all;
8use log::{error, info};
9
10use crate::core::{System, SystemService};
11
12pub struct TalosCertifierServiceBuilder {
13 system: System,
14 certifier_service: Option<Box<dyn SystemService + Send + Sync>>,
15 services: Vec<Box<dyn SystemService + Send + Sync>>,
16}
17
18impl TalosCertifierServiceBuilder {
19 pub fn new(system: System) -> Self {
20 Self {
21 system,
22 certifier_service: None,
23 services: vec![],
24 }
25 }
26
27 pub fn add_adapter_service(mut self, service: Box<dyn SystemService + Send + Sync>) -> Self {
28 self.services.push(service);
29 self
30 }
31
32 pub fn add_health_check_service(mut self, hc_service: Box<dyn SystemService + Send + Sync>) -> Self {
33 self.services.push(hc_service);
34 self
35 }
36
37 pub fn add_certifier_service(mut self, certifier_service: Box<dyn SystemService + Send + Sync>) -> Self {
38 self.certifier_service = Some(certifier_service);
39 self
40 }
41
42 pub fn build(self) -> TalosCertifierService {
43 let mut services = self.services;
44 services.push(self.certifier_service.expect("Certifier Service is mandatory"));
45
46 TalosCertifierService {
47 system: self.system,
48 services,
49 shutdown_flag: Arc::new(AtomicBool::new(false)),
50 }
51 }
52}
53
54pub struct TalosCertifierService {
55 pub system: System,
56 pub services: Vec<Box<dyn SystemService + Send + Sync>>,
57 pub shutdown_flag: Arc<AtomicBool>,
58}
59
60impl TalosCertifierService {
61 pub async fn run(self) -> ServiceResult {
62 let service_handle = self.services.into_iter().map(|mut service| {
63 tokio::spawn({
64 let shutdown_notifier_cloned = self.system.system_notifier.clone();
65 let mut shutdown_receiver = shutdown_notifier_cloned.subscribe();
66 let shutdown_flag = Arc::clone(&self.shutdown_flag);
67
68 async move {
69 let mut result: ServiceResult = Ok(());
70 while !shutdown_flag.load(Ordering::Relaxed) {
71 tokio::select! {
72 svc_result = service.run() => {
73 if let Err(service_error) = svc_result {
74 error!("Error found in service=({}) !!!! {:?}", service_error.service, service_error);
75 shutdown_notifier_cloned.send(SystemMessage::Shutdown).unwrap();
76 };
77 },
78 msg = shutdown_receiver.recv() => {
79 let message = msg.unwrap();
80
81 match message {
82 SystemMessage::Shutdown => {
83 info!("Shutdown received");
84 let _ = &shutdown_flag.swap(true, Ordering::Relaxed);
85 },
86 SystemMessage::ShutdownWithError(service_error) => {
87 info!("Shutdown received due to error");
88 let _ = &shutdown_flag.swap(true, Ordering::Relaxed);
89 result = Err(service_error);
90 },
91
92 _ => ()
93 }
94
95 }
96 }
97 }
98 result
99 }
100 })
101 });
102
103 let k = join_all(service_handle).await;
104
105 for res in k {
106 res.unwrap()?
107 }
108
109 Ok(())
110 }
111}