capitan_lib/services/
shared.rs

1use super::{dynamic::DynamicSharedService, prelude::SharedR, SharedReactor};
2use anyhow::Result as Res;
3use async_trait::async_trait;
4use std::{
5    any::Any,
6    sync::{Arc, Weak},
7};
8
9/// Services with a reference to self. Can be shared among threads.
10/// Useful for monitoring services and others.
11#[async_trait]
12pub trait SharedService: Any {
13    /// only runs once at the start
14    async fn init(&self) -> Res<()>;
15    /// loops
16    async fn main(&self) -> Res<()>;
17    /// runs after main if main or repeat did not return errors
18    async fn repeat(&self) -> Res<()>;
19    /// runs after main if main returned an error
20    async fn catch(&self, error: anyhow::Error) -> Res<()>;
21    /// run if catch was not successful
22    async fn abort(&self, error: anyhow::Error) -> Res<()>;
23
24    fn to_dyn(self) -> (DynamicSharedService, Weak<Self>)
25    where
26        Self: Sized + Sync + Send,
27    {
28        let p = Arc::new(self);
29        let s = Arc::downgrade(&p);
30        (DynamicSharedService(p), s)
31    }
32}
33
34use crate::{ignore_print_result, print_err};
35
36#[async_trait]
37impl<T: SharedService + Send + Sync> SharedR<T> for SharedReactor<T> {
38    async fn spawn_service(&self, service: T, id: usize) -> Res<()> {
39        let service = Arc::new(service);
40        let services = self.services.clone();
41        let channel = self.notifier_channel.0.clone();
42        let p = service.clone();
43        let handle = tokio::spawn(async move {
44            if let Err(err) = p.init().await {
45                let alive = {
46                    let mut services = services.write().await;
47                    services.remove(&id);
48                    services.len() == 0
49                };
50                if let Err(e) = p.abort(err).await {
51                    channel.send(alive).ok();
52                    return Err(e);
53                };
54                channel.send(alive).ok();
55            }
56
57            let err = loop {
58                if let Err(e) = p.main().await {
59                    ignore_print_result!(p.catch(e).await, e, id);
60                }
61                if let Err(e) = p.repeat().await {
62                    ignore_print_result!(p.catch(e).await, e, id);
63                }
64            };
65
66            let alive = {
67                let mut services = services.write().await;
68                services.remove(&id);
69                services.len() == 0
70            };
71
72            if let Err(e) = p.abort(err).await {
73                channel.send(alive).ok();
74                return Err(e);
75            };
76
77            channel.send(alive).ok();
78
79            Ok(())
80        });
81
82        {
83            let mut services = self.services.write().await;
84            services.insert(id, Arc::new((service, handle)));
85        }
86
87        Ok(())
88    }
89
90    async fn get_service(&self, id: usize) -> Option<Weak<T>> {
91        Some(Arc::downgrade(&self.services.read().await.get(&id)?.0))
92    }
93}