workflow_service/
runtime.rs1use crate::imports::*;
2
3struct Inner {
4 services: Mutex<Vec<Arc<dyn Service>>>,
5 is_running: Arc<AtomicBool>,
6 termination: Channel<()>,
7}
8
9impl Shutdown for Inner {
10 fn shutdown(&self) {
11 self.termination.try_send(()).unwrap();
12 }
13}
14
15#[derive(Clone)]
16pub struct Runtime {
17 inner: Arc<Inner>,
18}
19
20impl Default for Runtime {
21 fn default() -> Self {
22 Self {
23 inner: Arc::new(Inner {
24 services: Mutex::new(Vec::new()),
25 is_running: Arc::new(AtomicBool::new(false)),
26 termination: Channel::oneshot(),
27 }),
28 }
29 }
30}
31
32impl Runtime {
33 pub fn bind(&self, service: Arc<dyn Service>) {
34 self.inner.services.lock().unwrap().push(service);
35 }
36
37 fn services(&self) -> Vec<Arc<dyn Service>> {
38 self.inner.services.lock().unwrap().clone()
39 }
40
41 async fn start_services(&self) -> Result<()> {
42 let services = self.services();
43 let mut active = vec![];
44 for service in services {
45 let runtime = self.clone();
46 if debug() {
47 println!("✨ {}", service.name());
48 }
49 match service.clone().spawn(runtime).await {
50 Ok(_) => active.push(service),
51 Err(err) => {
52 log_error!("Service spawn error: {err}");
53 self.stop_services(Some(active.clone()));
54 self.join_services(Some(active)).await;
55 return Err(err);
56 }
57 }
58 }
59
60 Ok(())
61 }
62
63 fn stop_services(&self, services: Option<Vec<Arc<dyn Service>>>) {
64 services
65 .unwrap_or_else(|| self.services())
66 .into_iter()
67 .for_each(|service| {
68 if debug() {
69 println!("⛬ {}", service.name());
70 }
71 service.terminate();
72 });
73 }
74
75 async fn join_services(&self, services: Option<Vec<Arc<dyn Service>>>) {
76 let services = services
77 .unwrap_or_else(|| self.services())
78 .into_iter()
79 .rev();
80
81 if debug() {
82 for service in services {
83 let name = service.name();
84 println!("⚡ {name}");
85 service.join().await.expect("service join failure");
86 println!("💀 {name}");
87 }
88 } else {
89 let futures = services.map(|service| service.join()).collect::<Vec<_>>();
90 join_all(futures).await;
91 }
92 }
93
94 async fn start(&self) -> Result<()> {
96 self.inner.is_running.store(true, Ordering::SeqCst);
97 self.start_services().await
98 }
99
100 async fn shutdown(&self) {
102 if self.inner.is_running.load(Ordering::SeqCst) {
103 self.inner.is_running.store(false, Ordering::SeqCst);
104 self.stop_services(None);
105 self.join_services(None).await;
106 }
107 }
108
109 pub async fn run(&self) -> Result<()> {
110 self.start().await?;
111 let (finish_sender, finish_receiver) = oneshot();
112 let runtime = self.clone();
113 spawn(async move {
114 runtime.inner.termination.recv().await.unwrap();
115 runtime.shutdown().await;
116 finish_sender.send(()).await.unwrap();
117 });
118
119 finish_receiver.recv().await.unwrap();
120 Ok(())
121 }
122
123 pub fn terminate(&self) {
124 self.inner.termination.try_send(()).unwrap();
125 }
126}