rs_pkg/monitor/
monitor.rs1use std::{future::Future, pin::Pin, sync::Arc};
2use tokio::sync::{
3 Mutex,
4 mpsc::{Receiver, Sender, error::SendError},
5};
6use tracing::debug;
7use tracing::{Instrument, debug_span};
8
9type WrappedFn = Arc<
10 Mutex<
11 Option<
12 Box<
13 dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>
14 + Send
15 + Sync
16 + 'static,
17 >,
18 >,
19 >,
20>;
21
22pub struct Monitor {
23 name: String,
24 on_start: WrappedFn,
25 on_exit: WrappedFn,
26 done: Arc<Mutex<Receiver<()>>>,
27 close: Arc<Sender<()>>,
28}
29
30fn wrap<F, Fut>(f: F) -> WrappedFn
31where
32 F: FnOnce() -> Fut + Send + Sync + 'static,
33 Fut: Future<Output = ()> + Send + Sync + 'static,
34{
35 Arc::new(Mutex::new(Some(Box::new(|| Box::pin(f())))))
36}
37
38fn debug_task(msg: String) -> WrappedFn {
39 wrap(|| async move {
40 let msg = msg.to_string();
41 debug!("{}", msg);
42 })
43}
44
45impl Monitor {
46 pub fn new(name: &str) -> Self {
47 let (close, done) = tokio::sync::mpsc::channel(1);
49 let start = debug_task(format!("MONITOR START - {}", name));
50 let stop = debug_task(format!("MONITOR STOP - {}", name));
51 Self {
52 name: name.to_string(),
53 on_start: start,
54 on_exit: stop,
55 done: Arc::new(Mutex::new(done)),
56 close: Arc::new(close),
57 }
58 }
59
60 pub fn with_on_start<F, Fut>(mut self, task: F) -> Self
61 where
62 F: FnOnce() -> Fut + Send + Sync + 'static,
63 Fut: Future<Output = ()> + Send + Sync + 'static,
64 {
65 self.on_start = wrap(task);
66 self
67 }
68
69 pub fn with_on_exit<F, Fut>(mut self, task: F) -> Self
70 where
71 F: FnOnce() -> Fut + Send + Sync + 'static,
72 Fut: Future<Output = ()> + Send + Sync + 'static,
73 {
74 self.on_exit = wrap(task);
75 self
76 }
77
78 pub async fn run<F, Fut>(&self, task: F)
79 where
80 F: FnOnce(Receiver<()>) -> Fut + Send + Sync + 'static,
81 Fut: Future<Output = ()> + Send + Sync + 'static,
82 {
83 if let Some(on_start) = self.on_start.lock().await.take() {
85 on_start().instrument(debug_span!("start.call")).await;
86 };
87
88 let (task_close, task_done) = tokio::sync::mpsc::channel(1);
90
91 let on_exit = self.on_exit.clone();
92 let done = self.done.clone();
93 _ = tokio::spawn(
94 async move {
95 let mut guard = done.lock().await;
97 guard.recv().await;
98 guard.close();
99
100 _ = task_close.send(()).await;
102
103 if let Some(on_exit_fn) = on_exit.lock().await.take() {
105 on_exit_fn().instrument(debug_span!("call")).await;
106 };
107 }
108 .instrument(debug_span!("exit")),
109 );
110
111 _ = tokio::spawn(
113 async move {
114 task(task_done).instrument(debug_span!("call")).await;
115 }
116 .instrument(debug_span!("spawn")),
117 );
118 }
119
120 pub async fn stop(&self) -> Result<(), SendError<()>> {
121 self.close.send(()).await
122 }
123
124 pub fn name(&self) -> String {
125 self.name.to_string()
126 }
127}
128
129impl Default for Monitor {
130 fn default() -> Self {
131 Self::new("default")
132 }
133}