workflow_service/
runtime.rs

1use crate::imports::*;
2
3struct Inner {
4    services: Mutex<Vec<Arc<dyn Service>>>,
5    is_running: Arc<AtomicBool>,
6    termination: Channel<()>,
7}
8
9impl Shutdown for Inner {
10    fn shutdown(&self) {
11        self.termination.try_send(()).unwrap();
12    }
13}
14
15#[derive(Clone)]
16pub struct Runtime {
17    inner: Arc<Inner>,
18}
19
20impl Default for Runtime {
21    fn default() -> Self {
22        Self {
23            inner: Arc::new(Inner {
24                services: Mutex::new(Vec::new()),
25                is_running: Arc::new(AtomicBool::new(false)),
26                termination: Channel::oneshot(),
27            }),
28        }
29    }
30}
31
32impl Runtime {
33    pub fn bind(&self, service: Arc<dyn Service>) {
34        self.inner.services.lock().unwrap().push(service);
35    }
36
37    fn services(&self) -> Vec<Arc<dyn Service>> {
38        self.inner.services.lock().unwrap().clone()
39    }
40
41    async fn start_services(&self) -> Result<()> {
42        let services = self.services();
43        let mut active = vec![];
44        for service in services {
45            let runtime = self.clone();
46            if debug() {
47                println!("✨ {}", service.name());
48            }
49            match service.clone().spawn(runtime).await {
50                Ok(_) => active.push(service),
51                Err(err) => {
52                    log_error!("Service spawn error: {err}");
53                    self.stop_services(Some(active.clone()));
54                    self.join_services(Some(active)).await;
55                    return Err(err);
56                }
57            }
58        }
59
60        Ok(())
61    }
62
63    fn stop_services(&self, services: Option<Vec<Arc<dyn Service>>>) {
64        services
65            .unwrap_or_else(|| self.services())
66            .into_iter()
67            .for_each(|service| {
68                if debug() {
69                    println!("⛬ {}", service.name());
70                }
71                service.terminate();
72            });
73    }
74
75    async fn join_services(&self, services: Option<Vec<Arc<dyn Service>>>) {
76        let services = services
77            .unwrap_or_else(|| self.services())
78            .into_iter()
79            .rev();
80
81        if debug() {
82            for service in services {
83                let name = service.name();
84                println!("⚡ {name}");
85                service.join().await.expect("service join failure");
86                println!("💀 {name}");
87            }
88        } else {
89            let futures = services.map(|service| service.join()).collect::<Vec<_>>();
90            join_all(futures).await;
91        }
92    }
93
94    /// Start the runtime runtime.
95    async fn start(&self) -> Result<()> {
96        self.inner.is_running.store(true, Ordering::SeqCst);
97        self.start_services().await
98    }
99
100    /// Shutdown runtime runtime.
101    async fn shutdown(&self) {
102        if self.inner.is_running.load(Ordering::SeqCst) {
103            self.inner.is_running.store(false, Ordering::SeqCst);
104            self.stop_services(None);
105            self.join_services(None).await;
106        }
107    }
108
109    pub async fn run(&self) -> Result<()> {
110        self.start().await?;
111        let (finish_sender, finish_receiver) = oneshot();
112        let runtime = self.clone();
113        spawn(async move {
114            runtime.inner.termination.recv().await.unwrap();
115            runtime.shutdown().await;
116            finish_sender.send(()).await.unwrap();
117        });
118
119        finish_receiver.recv().await.unwrap();
120        Ok(())
121    }
122
123    pub fn terminate(&self) {
124        self.inner.termination.try_send(()).unwrap();
125    }
126}