1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use crate::imports::*;

struct Inner {
    services: Mutex<Vec<Arc<dyn Service>>>,
    is_running: Arc<AtomicBool>,
    termination: Channel<()>,
}

impl Shutdown for Inner {
    fn shutdown(&self) {
        self.termination.try_send(()).unwrap();
    }
}

#[derive(Clone)]
pub struct Runtime {
    inner: Arc<Inner>,
}

impl Default for Runtime {
    fn default() -> Self {
        Self {
            inner: Arc::new(Inner {
                services: Mutex::new(Vec::new()),
                is_running: Arc::new(AtomicBool::new(false)),
                termination: Channel::oneshot(),
            }),
        }
    }
}

impl Runtime {
    pub fn bind(&self, service: Arc<dyn Service>) {
        self.inner.services.lock().unwrap().push(service);
    }

    fn services(&self) -> Vec<Arc<dyn Service>> {
        self.inner.services.lock().unwrap().clone()
    }

    async fn start_services(&self) -> Result<()> {
        let services = self.services();
        let mut active = vec![];
        for service in services {
            let runtime = self.clone();
            if debug() {
                println!("✨ {}", service.name());
            }
            match service.clone().spawn(runtime).await {
                Ok(_) => active.push(service),
                Err(err) => {
                    log_error!("Service spawn error: {err}");
                    self.stop_services(Some(active.clone()));
                    self.join_services(Some(active)).await;
                    return Err(err);
                }
            }
        }

        Ok(())
    }

    fn stop_services(&self, services: Option<Vec<Arc<dyn Service>>>) {
        services
            .unwrap_or_else(|| self.services())
            .into_iter()
            .for_each(|service| {
                if debug() {
                    println!("⛬ {}", service.name());
                }
                service.terminate();
            });
    }

    async fn join_services(&self, services: Option<Vec<Arc<dyn Service>>>) {
        let services = services
            .unwrap_or_else(|| self.services())
            .into_iter()
            .rev();

        if debug() {
            for service in services {
                let name = service.name();
                println!("⚡ {name}");
                service.join().await.expect("service join failure");
                println!("💀 {name}");
            }
        } else {
            let futures = services.map(|service| service.join()).collect::<Vec<_>>();
            join_all(futures).await;
        }
    }

    /// Start the runtime runtime.
    async fn start(&self) -> Result<()> {
        self.inner.is_running.store(true, Ordering::SeqCst);
        self.start_services().await
    }

    /// Shutdown runtime runtime.
    async fn shutdown(&self) {
        if self.inner.is_running.load(Ordering::SeqCst) {
            self.inner.is_running.store(false, Ordering::SeqCst);
            self.stop_services(None);
            self.join_services(None).await;
        }
    }

    pub async fn run(&self) -> Result<()> {
        self.start().await?;
        let (finish_sender, finish_receiver) = oneshot();
        let runtime = self.clone();
        spawn(async move {
            runtime.inner.termination.recv().await.unwrap();
            runtime.shutdown().await;
            finish_sender.send(()).await.unwrap();
        });

        finish_receiver.recv().await.unwrap();
        Ok(())
    }

    pub fn terminate(&self) {
        self.inner.termination.try_send(()).unwrap();
    }
}