Skip to main content

rs_pkg/monitor/
monitor.rs

1use futures_util::future::BoxFuture;
2use std::{future::Future, sync::Arc};
3use tokio::sync::{
4    Mutex,
5    mpsc::{Receiver, Sender, error::SendError},
6};
7use tracing::debug;
8use tracing::{Instrument, debug_span};
9
10type WrappedFn = Arc<Mutex<Option<Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + Sync>>>>;
11
12#[derive(Clone)]
13pub struct Monitor {
14    name: String,
15    on_start: WrappedFn,
16    on_exit: WrappedFn,
17    done: Arc<Mutex<Receiver<()>>>,
18    close: Arc<Sender<()>>,
19}
20
21fn wrap<F, Fut>(f: F) -> WrappedFn
22where
23    F: FnOnce() -> Fut + Send + Sync + 'static,
24    Fut: Future<Output = ()> + Send + Sync + 'static,
25{
26    Arc::new(Mutex::new(Some(Box::new(|| Box::pin(f())))))
27}
28
29fn debug_task(msg: String) -> WrappedFn {
30    wrap(|| async move {
31        debug!("{}", msg);
32    })
33}
34
35impl Monitor {
36    pub fn new(name: &str) -> Self {
37        // 创建 monitor 开关
38        let (close, done) = tokio::sync::mpsc::channel(1);
39        let start = debug_task(format!("MONITOR START - {}", name));
40        let stop = debug_task(format!("MONITOR STOP - {}", name));
41        Self {
42            name: name.to_string(),
43            on_start: start,
44            on_exit: stop,
45            done: Arc::new(Mutex::new(done)),
46            close: Arc::new(close),
47        }
48    }
49
50    pub fn with_trigger(mut self, trigger: (Arc<Sender<()>>, Arc<Mutex<Receiver<()>>>)) -> Self {
51        (self.close, self.done) = trigger;
52        self
53    }
54
55    pub fn with_on_start<F, Fut>(mut self, task: F) -> Self
56    where
57        F: FnOnce() -> Fut + Send + Sync + 'static,
58        Fut: Future<Output = ()> + Send + Sync + 'static,
59    {
60        self.on_start = wrap(task);
61        self
62    }
63
64    pub fn with_on_exit<F, Fut>(mut self, task: F) -> Self
65    where
66        F: FnOnce() -> Fut + Send + Sync + 'static,
67        Fut: Future<Output = ()> + Send + Sync + 'static,
68    {
69        self.on_exit = wrap(task);
70        self
71    }
72
73    pub async fn run<F, Fut>(&self, task: F)
74    where
75        F: FnOnce(Receiver<()>) -> Fut + Send + Sync + 'static,
76        Fut: Future<Output = ()> + Send + Sync + 'static,
77    {
78        // 获取并执行 on_start
79        if let Some(on_start) = self.on_start.lock().await.take() {
80            on_start().instrument(debug_span!("start.call")).await;
81        };
82
83        // 为 task 创建一个接受关闭信号的管道
84        let (task_close, task_done) = tokio::sync::mpsc::channel(1);
85
86        let on_exit = self.on_exit.clone();
87        let done = self.done.clone();
88        _ = tokio::spawn(
89            async move {
90                // 如果 monitor 收到了关闭信号
91                let mut guard = done.lock().await;
92                guard.recv().await;
93                guard.close();
94
95                // 关闭 task
96                _ = task_close.send(()).await;
97
98                // 运行 on_exit
99                if let Some(on_exit_fn) = on_exit.lock().await.take() {
100                    on_exit_fn().await;
101                };
102            }
103            .instrument(debug_span!("exit")),
104        );
105
106        // 运行 task
107        _ = tokio::spawn(
108            async move {
109                task(task_done).await;
110            }
111            .instrument(debug_span!("call")),
112        );
113    }
114
115    pub async fn stop(&self) -> Result<(), SendError<()>> {
116        self.close.send(()).await
117    }
118
119    pub fn name(&self) -> String {
120        self.name.to_string()
121    }
122}
123
124impl Default for Monitor {
125    fn default() -> Self {
126        Self::new("default")
127    }
128}