rs_pkg/worker/
worker.rs

1use crate::monitor::Monitor;
2use std::{future::Future, sync::Arc};
3use tokio::sync::{
4    Mutex,
5    mpsc::{
6        Receiver, Sender, channel,
7        error::{
8            SendError,
9            TryRecvError::{Disconnected, Empty},
10        },
11    },
12};
13use tracing::{Instrument, debug, debug_span};
14
15#[derive(Clone)]
16pub struct Worker<J> {
17    name: String,
18    work_count: Arc<Mutex<usize>>,
19    monitor: Monitor,
20    recv: Arc<Mutex<Receiver<J>>>,
21    send: Arc<Sender<J>>,
22    graceful: bool,
23}
24
25async fn handle<F, Fut, J>(
26    trigger: Arc<Mutex<Receiver<J>>>,
27    done: Arc<Mutex<Receiver<()>>>,
28    how: Arc<F>,
29    graceful: bool,
30) where
31    F: Fn(J) -> Fut + Send + Sync + 'static,
32    Fut: Future<Output = ()> + Send + 'static,
33    J: Send + Sync + 'static,
34{
35    match graceful {
36        false => {
37            tokio::spawn(
38                async move {
39                    let mut done = done.lock().await;
40                    loop {
41                        // 非阻塞检查信号
42                        match done.try_recv() {
43                            Ok(_) | Err(Disconnected) => {
44                                done.close();
45                                return;
46                            }
47
48                            Err(Empty) => {
49                                // 检查是否有新的工作项
50                                let mut guard = trigger.lock().await;
51                                if let Ok(item) = guard.try_recv() {
52                                    drop(guard); // 释放锁,避免在异步调用时持有锁
53                                    how(item).await;
54                                }
55                            }
56                        }
57                    }
58                }
59                .instrument(debug_span!("handle")),
60            );
61        }
62
63        true => {
64            tokio::spawn(
65                async move {
66                    loop {
67                        // 检查是否有新的工作项
68                        let mut guard = trigger.lock().await;
69                        match guard.recv().await {
70                            Some(item) => {
71                                drop(guard); // 释放锁,避免在异步调用时持有锁
72                                how(item).await;
73                            }
74
75                            None => return,
76                        }
77                    }
78                }
79                .instrument(debug_span!("grace")),
80            );
81        }
82    }
83}
84
85impl<J> Worker<J> {
86    pub fn new(name: &str, buf: usize) -> Self {
87        let (tx, rx) = channel(buf);
88        let work_count = Arc::new(Mutex::new(0));
89        Self {
90            name: name.to_string(),
91            work_count,
92            monitor: Monitor::new(name),
93            recv: Arc::new(Mutex::new(rx)),
94            graceful: false,
95            send: Arc::new(tx),
96        }
97    }
98
99    pub fn with_on_start<F, Fut>(mut self, task: F) -> Self
100    where
101        F: FnOnce() -> Fut + Send + Sync + 'static,
102        Fut: Future<Output = ()> + Send + Sync + 'static,
103    {
104        self.monitor = self.monitor.with_on_start(task);
105        self
106    }
107
108    pub fn with_on_exit<F, Fut>(mut self, task: F) -> Self
109    where
110        F: FnOnce() -> Fut + Send + Sync + 'static,
111        Fut: Future<Output = ()> + Send + Sync + 'static,
112    {
113        self.monitor = self.monitor.with_on_exit(task);
114        self
115    }
116
117    pub fn with_graceful(mut self, graceful: bool) -> Self {
118        self.graceful = graceful;
119        self
120    }
121
122    pub fn with_trigger(mut self, trigger: (Arc<Sender<J>>, Arc<Mutex<Receiver<J>>>)) -> Self {
123        let (send, recv) = trigger;
124        self.send = send;
125        self.recv = recv;
126        self
127    }
128
129    pub fn get_sender(&self) -> Arc<Sender<J>> {
130        self.send.clone()
131    }
132
133    pub async fn send(&self, job: J) -> Result<(), SendError<J>> {
134        self.send.send(job).await
135    }
136
137    pub fn name(&self) -> String {
138        self.name.to_string()
139    }
140
141    pub async fn count(&self) -> usize {
142        let guard = self.work_count.lock().await;
143        *guard
144    }
145
146    pub async fn stop(&self) -> Result<(), SendError<()>> {
147        self.monitor.stop().await
148    }
149
150    pub async fn run<F, Fut>(&self, how: F)
151    where
152        F: Fn(J) -> Fut + Send + Sync + 'static,
153        Fut: Future<Output = ()> + Send + 'static,
154        J: Send + Sync + 'static,
155    {
156        debug!("WORKER START - {}", self.name);
157        let trigger = self.recv.clone();
158        let graceful = self.graceful;
159        let how = Arc::new(how);
160        let task = move |done: Receiver<()>| async move {
161            let done = Arc::new(Mutex::new(done));
162            handle(trigger, done, how, graceful).await;
163        };
164
165        _ = self
166            .monitor
167            .run(task)
168            .instrument(debug_span!("monitor"))
169            .await;
170    }
171}