capitan_lib/services/
reactor.rs

1use std::{
2    collections::HashMap,
3    sync::{Arc, Weak},
4};
5
6use anyhow::Result as Res;
7use async_trait::async_trait;
8use tokio::{sync::RwLock, task::JoinHandle};
9
10use super::{IsolatedService, SharedService};
11
12#[async_trait]
13pub trait SharedReactorTrait<T: SharedService> {
14    async fn spawn_service(&self, service: T, id: usize) -> Res<()>;
15    async fn get_service(&self, id: usize) -> Option<Weak<T>>;
16}
17
18#[async_trait]
19pub trait IsolatedReactorTrait<T: IsolatedService> {
20    async fn spawn_service(&self, service: T, id: usize) -> Res<()>;
21}
22
23use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
24
25/// Contains and runs isolated services
26pub struct IsolatedReactor {
27    pub services: Arc<RwLock<HashMap<usize, JoinHandle<Res<()>>>>>,
28    pub notifier_channel: (UnboundedSender<bool>, UnboundedReceiver<bool>),
29}
30
31impl IsolatedReactor {
32    pub fn new() -> Self {
33        Self {
34            services: Arc::new(RwLock::new(HashMap::new())),
35            notifier_channel: unbounded_channel(),
36        }
37    }
38    pub async fn wait_all(&mut self) {
39        loop {
40            match self.notifier_channel.1.recv().await {
41                Some(done) => {
42                    if done {
43                        self.notifier_channel.1.close()
44                    }
45                }
46                None => break,
47            }
48        }
49    }
50}
51
52/// Contains and runs shared services
53pub struct SharedReactor<T: SharedService + Send + Sync> {
54    pub services: Arc<RwLock<HashMap<usize, Arc<(Arc<T>, JoinHandle<Res<()>>)>>>>,
55    pub notifier_channel: (UnboundedSender<bool>, UnboundedReceiver<bool>),
56}
57
58impl<T: SharedService + Send + Sync> SharedReactor<T> {
59    pub fn new() -> Self {
60        Self {
61            services: Arc::new(RwLock::new(HashMap::new())),
62            notifier_channel: unbounded_channel(),
63        }
64    }
65    pub async fn wait_all(&mut self) {
66        loop {
67            match self.notifier_channel.1.recv().await {
68                Some(done) => {
69                    if done {
70                        self.notifier_channel.1.close()
71                    }
72                }
73                None => break,
74            }
75        }
76    }
77}