rs_pkg/monitor/
monitor.rs1use futures_util::future::BoxFuture;
2use std::{future::Future, sync::Arc};
3use tokio::sync::{
4 Mutex,
5 mpsc::{Receiver, Sender, error::SendError},
6};
7use tracing::debug;
8use tracing::{Instrument, debug_span};
9
10type WrappedFn = Arc<Mutex<Option<Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + Sync>>>>;
11
12#[derive(Clone)]
13pub struct Monitor {
14 name: String,
15 on_start: WrappedFn,
16 on_exit: WrappedFn,
17 done: Arc<Mutex<Receiver<()>>>,
18 close: Arc<Sender<()>>,
19}
20
21fn wrap<F, Fut>(f: F) -> WrappedFn
22where
23 F: FnOnce() -> Fut + Send + Sync + 'static,
24 Fut: Future<Output = ()> + Send + Sync + 'static,
25{
26 Arc::new(Mutex::new(Some(Box::new(|| Box::pin(f())))))
27}
28
29fn debug_task(msg: String) -> WrappedFn {
30 wrap(|| async move {
31 debug!("{}", msg);
32 })
33}
34
35impl Monitor {
36 pub fn new(name: &str) -> Self {
37 let (close, done) = tokio::sync::mpsc::channel(1);
39 let start = debug_task(format!("MONITOR START - {}", name));
40 let stop = debug_task(format!("MONITOR STOP - {}", name));
41 Self {
42 name: name.to_string(),
43 on_start: start,
44 on_exit: stop,
45 done: Arc::new(Mutex::new(done)),
46 close: Arc::new(close),
47 }
48 }
49
50 pub fn with_trigger(mut self, trigger: (Arc<Sender<()>>, Arc<Mutex<Receiver<()>>>)) -> Self {
51 (self.close, self.done) = trigger;
52 self
53 }
54
55 pub fn with_on_start<F, Fut>(mut self, task: F) -> Self
56 where
57 F: FnOnce() -> Fut + Send + Sync + 'static,
58 Fut: Future<Output = ()> + Send + Sync + 'static,
59 {
60 self.on_start = wrap(task);
61 self
62 }
63
64 pub fn with_on_exit<F, Fut>(mut self, task: F) -> Self
65 where
66 F: FnOnce() -> Fut + Send + Sync + 'static,
67 Fut: Future<Output = ()> + Send + Sync + 'static,
68 {
69 self.on_exit = wrap(task);
70 self
71 }
72
73 pub async fn run<F, Fut>(&self, task: F)
74 where
75 F: FnOnce(Receiver<()>) -> Fut + Send + Sync + 'static,
76 Fut: Future<Output = ()> + Send + Sync + 'static,
77 {
78 if let Some(on_start) = self.on_start.lock().await.take() {
80 on_start().instrument(debug_span!("start.call")).await;
81 };
82
83 let (task_close, task_done) = tokio::sync::mpsc::channel(1);
85
86 let on_exit = self.on_exit.clone();
87 let done = self.done.clone();
88 _ = tokio::spawn(
89 async move {
90 let mut guard = done.lock().await;
92 guard.recv().await;
93 guard.close();
94
95 _ = task_close.send(()).await;
97
98 if let Some(on_exit_fn) = on_exit.lock().await.take() {
100 on_exit_fn().await;
101 };
102 }
103 .instrument(debug_span!("exit")),
104 );
105
106 _ = tokio::spawn(
108 async move {
109 task(task_done).await;
110 }
111 .instrument(debug_span!("call")),
112 );
113 }
114
115 pub async fn stop(&self) -> Result<(), SendError<()>> {
116 self.close.send(()).await
117 }
118
119 pub fn name(&self) -> String {
120 self.name.to_string()
121 }
122}
123
124impl Default for Monitor {
125 fn default() -> Self {
126 Self::new("default")
127 }
128}