capitan_lib/services/
isolated.rs

1use std::any::Any;
2
3use anyhow::Result as Res;
4use async_trait::async_trait;
5
6use super::{
7    dynamic::DynamicIsolatedService,
8    reactor::{IsolatedReactor, IsolatedReactorTrait},
9};
10
11/// Isolated services are services that can't be accessed from the outside.
12/// they are helpful as root services, services that hold other services.
13#[async_trait]
14pub trait IsolatedService: Any {
15    /// only runs once at the start
16    async fn init(&mut self) -> Res<()>;
17    /// loops
18    async fn main(&mut self) -> Res<()>;
19    /// runs after main if main or repeat did not return errors
20    async fn repeat(&mut self) -> Res<()>;
21    /// runs after main if main returned an error
22    async fn catch(&mut self, error: anyhow::Error) -> Res<()>;
23    /// run if catch was not successful
24    async fn abort(&mut self, error: anyhow::Error) -> Res<()>;
25
26    fn to_dyn(self) -> DynamicIsolatedService
27    where
28        Self: Sized + Sync + Send,
29    {
30        DynamicIsolatedService(Box::new(self))
31    }
32}
33
34use crate::{ignore_print_result, print_err};
35
36#[async_trait]
37impl<T: IsolatedService + Send + Sync> IsolatedReactorTrait<T> for IsolatedReactor {
38    async fn spawn_service(&self, mut service: T, id: usize) -> Res<()> {
39        let services = self.services.clone();
40        let channel = self.notifier_channel.0.clone();
41        let handle = tokio::spawn(async move {
42            if let Err(err) = service.init().await {
43                let alive = {
44                    let mut services = services.write().await;
45                    services.remove(&id);
46                    services.len() == 0
47                };
48                if let Err(e) = service.abort(err).await {
49                    channel.send(alive).ok();
50                    return Err(e);
51                };
52                channel.send(alive).ok();
53            }
54
55            let err = loop {
56                if let Err(e) = service.main().await {
57                    ignore_print_result!(service.catch(e).await, e, id);
58                }
59                if let Err(e) = service.repeat().await {
60                    ignore_print_result!(service.catch(e).await, e, id);
61                }
62            };
63
64            let alive = {
65                let mut services = services.write().await;
66                services.remove(&id);
67                services.len() == 0
68            };
69
70            if let Err(e) = service.abort(err).await {
71                channel.send(alive).ok();
72                return Err(e);
73            };
74
75            channel.send(alive).ok();
76
77            Ok(())
78        });
79
80        {
81            let mut services = self.services.write().await;
82            services.insert(id, handle);
83        }
84        Ok(())
85    }
86}