capitan_lib/services/
isolated.rs1use std::any::Any;
2
3use anyhow::Result as Res;
4use async_trait::async_trait;
5
6use super::{
7 dynamic::DynamicIsolatedService,
8 reactor::{IsolatedReactor, IsolatedReactorTrait},
9};
10
11#[async_trait]
14pub trait IsolatedService: Any {
15 async fn init(&mut self) -> Res<()>;
17 async fn main(&mut self) -> Res<()>;
19 async fn repeat(&mut self) -> Res<()>;
21 async fn catch(&mut self, error: anyhow::Error) -> Res<()>;
23 async fn abort(&mut self, error: anyhow::Error) -> Res<()>;
25
26 fn to_dyn(self) -> DynamicIsolatedService
27 where
28 Self: Sized + Sync + Send,
29 {
30 DynamicIsolatedService(Box::new(self))
31 }
32}
33
34use crate::{ignore_print_result, print_err};
35
36#[async_trait]
37impl<T: IsolatedService + Send + Sync> IsolatedReactorTrait<T> for IsolatedReactor {
38 async fn spawn_service(&self, mut service: T, id: usize) -> Res<()> {
39 let services = self.services.clone();
40 let channel = self.notifier_channel.0.clone();
41 let handle = tokio::spawn(async move {
42 if let Err(err) = service.init().await {
43 let alive = {
44 let mut services = services.write().await;
45 services.remove(&id);
46 services.len() == 0
47 };
48 if let Err(e) = service.abort(err).await {
49 channel.send(alive).ok();
50 return Err(e);
51 };
52 channel.send(alive).ok();
53 }
54
55 let err = loop {
56 if let Err(e) = service.main().await {
57 ignore_print_result!(service.catch(e).await, e, id);
58 }
59 if let Err(e) = service.repeat().await {
60 ignore_print_result!(service.catch(e).await, e, id);
61 }
62 };
63
64 let alive = {
65 let mut services = services.write().await;
66 services.remove(&id);
67 services.len() == 0
68 };
69
70 if let Err(e) = service.abort(err).await {
71 channel.send(alive).ok();
72 return Err(e);
73 };
74
75 channel.send(alive).ok();
76
77 Ok(())
78 });
79
80 {
81 let mut services = self.services.write().await;
82 services.insert(id, handle);
83 }
84 Ok(())
85 }
86}