rs_pkg/monitor/
monitor.rs

1use std::{future::Future, pin::Pin, sync::Arc};
2use tokio::sync::{
3    Mutex,
4    mpsc::{Receiver, Sender, error::SendError},
5};
6use tracing::debug;
7use tracing::{Instrument, debug_span};
8
9type WrappedFn = Arc<
10    Mutex<
11        Option<
12            Box<
13                dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>
14                    + Send
15                    + Sync
16                    + 'static,
17            >,
18        >,
19    >,
20>;
21
22pub struct Monitor {
23    name: String,
24    on_start: WrappedFn,
25    on_exit: WrappedFn,
26    done: Arc<Mutex<Receiver<()>>>,
27    close: Arc<Sender<()>>,
28}
29
30fn wrap<F, Fut>(f: F) -> WrappedFn
31where
32    F: FnOnce() -> Fut + Send + Sync + 'static,
33    Fut: Future<Output = ()> + Send + Sync + 'static,
34{
35    Arc::new(Mutex::new(Some(Box::new(|| Box::pin(f())))))
36}
37
38fn debug_task(msg: String) -> WrappedFn {
39    wrap(|| async move {
40        let msg = msg.to_string();
41        debug!("{}", msg);
42    })
43}
44
45impl Monitor {
46    pub fn new(name: &str) -> Self {
47        // 创建 monitor 开关
48        let (close, done) = tokio::sync::mpsc::channel(1);
49        let start = debug_task(format!("MONITOR START - {}", name));
50        let stop = debug_task(format!("MONITOR STOP - {}", name));
51        Self {
52            name: name.to_string(),
53            on_start: start,
54            on_exit: stop,
55            done: Arc::new(Mutex::new(done)),
56            close: Arc::new(close),
57        }
58    }
59
60    pub fn with_on_start<F, Fut>(mut self, task: F) -> Self
61    where
62        F: FnOnce() -> Fut + Send + Sync + 'static,
63        Fut: Future<Output = ()> + Send + Sync + 'static,
64    {
65        self.on_start = wrap(task);
66        self
67    }
68
69    pub fn with_on_exit<F, Fut>(mut self, task: F) -> Self
70    where
71        F: FnOnce() -> Fut + Send + Sync + 'static,
72        Fut: Future<Output = ()> + Send + Sync + 'static,
73    {
74        self.on_exit = wrap(task);
75        self
76    }
77
78    pub async fn run<F, Fut>(&self, task: F)
79    where
80        F: FnOnce(Receiver<()>) -> Fut + Send + Sync + 'static,
81        Fut: Future<Output = ()> + Send + Sync + 'static,
82    {
83        // 获取并执行 on_start
84        if let Some(on_start) = self.on_start.lock().await.take() {
85            on_start().instrument(debug_span!("start.call")).await;
86        };
87
88        // 为 task 创建一个接受关闭信号的管道
89        let (task_close, task_done) = tokio::sync::mpsc::channel(1);
90
91        let on_exit = self.on_exit.clone();
92        let done = self.done.clone();
93        _ = tokio::spawn(
94            async move {
95                // 如果 monitor 收到了关闭信号
96                let mut guard = done.lock().await;
97                guard.recv().await;
98                guard.close();
99
100                // 关闭 task
101                _ = task_close.send(()).await;
102
103                // 运行 on_exit
104                if let Some(on_exit_fn) = on_exit.lock().await.take() {
105                    on_exit_fn().instrument(debug_span!("call")).await;
106                };
107            }
108            .instrument(debug_span!("exit")),
109        );
110
111        // 运行 task
112        _ = tokio::spawn(
113            async move {
114                task(task_done).instrument(debug_span!("call")).await;
115            }
116            .instrument(debug_span!("spawn")),
117        );
118    }
119
120    pub async fn stop(&self) -> Result<(), SendError<()>> {
121        self.close.send(()).await
122    }
123
124    pub fn name(&self) -> String {
125        self.name.to_string()
126    }
127}
128
129impl Default for Monitor {
130    fn default() -> Self {
131        Self::new("default")
132    }
133}