capitan_lib/services/
shared.rs1use super::{dynamic::DynamicSharedService, prelude::SharedR, SharedReactor};
2use anyhow::Result as Res;
3use async_trait::async_trait;
4use std::{
5 any::Any,
6 sync::{Arc, Weak},
7};
8
9#[async_trait]
12pub trait SharedService: Any {
13 async fn init(&self) -> Res<()>;
15 async fn main(&self) -> Res<()>;
17 async fn repeat(&self) -> Res<()>;
19 async fn catch(&self, error: anyhow::Error) -> Res<()>;
21 async fn abort(&self, error: anyhow::Error) -> Res<()>;
23
24 fn to_dyn(self) -> (DynamicSharedService, Weak<Self>)
25 where
26 Self: Sized + Sync + Send,
27 {
28 let p = Arc::new(self);
29 let s = Arc::downgrade(&p);
30 (DynamicSharedService(p), s)
31 }
32}
33
34use crate::{ignore_print_result, print_err};
35
36#[async_trait]
37impl<T: SharedService + Send + Sync> SharedR<T> for SharedReactor<T> {
38 async fn spawn_service(&self, service: T, id: usize) -> Res<()> {
39 let service = Arc::new(service);
40 let services = self.services.clone();
41 let channel = self.notifier_channel.0.clone();
42 let p = service.clone();
43 let handle = tokio::spawn(async move {
44 if let Err(err) = p.init().await {
45 let alive = {
46 let mut services = services.write().await;
47 services.remove(&id);
48 services.len() == 0
49 };
50 if let Err(e) = p.abort(err).await {
51 channel.send(alive).ok();
52 return Err(e);
53 };
54 channel.send(alive).ok();
55 }
56
57 let err = loop {
58 if let Err(e) = p.main().await {
59 ignore_print_result!(p.catch(e).await, e, id);
60 }
61 if let Err(e) = p.repeat().await {
62 ignore_print_result!(p.catch(e).await, e, id);
63 }
64 };
65
66 let alive = {
67 let mut services = services.write().await;
68 services.remove(&id);
69 services.len() == 0
70 };
71
72 if let Err(e) = p.abort(err).await {
73 channel.send(alive).ok();
74 return Err(e);
75 };
76
77 channel.send(alive).ok();
78
79 Ok(())
80 });
81
82 {
83 let mut services = self.services.write().await;
84 services.insert(id, Arc::new((service, handle)));
85 }
86
87 Ok(())
88 }
89
90 async fn get_service(&self, id: usize) -> Option<Weak<T>> {
91 Some(Arc::downgrade(&self.services.read().await.get(&id)?.0))
92 }
93}