rs_pkg/worker/
worker.rs

1use crate::monitor::Monitor;
2use std::{future::Future, sync::Arc};
3use tokio::sync::{
4    Mutex,
5    mpsc::{
6        Receiver, Sender, channel,
7        error::{
8            SendError,
9            TryRecvError::{Disconnected, Empty},
10        },
11    },
12};
13use tracing::{Instrument, debug, debug_span};
14
15pub struct Worker<J> {
16    name: String,
17    work_count: Arc<Mutex<usize>>,
18    monitor: Monitor,
19    recv: Arc<Mutex<Receiver<J>>>,
20    send: Arc<Sender<J>>,
21    graceful: bool,
22}
23
24async fn handle<F, Fut, J>(
25    trigger: Arc<Mutex<Receiver<J>>>,
26    done: Arc<Mutex<Receiver<()>>>,
27    how: Arc<F>,
28    graceful: bool,
29) where
30    F: Fn(J) -> Fut + Send + Sync + 'static,
31    Fut: Future<Output = ()> + Send + Sync + 'static,
32    J: Send + Sync + 'static,
33{
34    match graceful {
35        false => {
36            tokio::spawn(
37                async move {
38                    let mut done = done.lock().await;
39                    loop {
40                        // 非阻塞检查信号
41                        match done.try_recv() {
42                            Ok(_) | Err(Disconnected) => {
43                                done.close();
44                                return;
45                            }
46
47                            Err(Empty) => {
48                                // 检查是否有新的工作项
49                                let mut guard = trigger.lock().await;
50                                if let Ok(item) = guard.try_recv() {
51                                    drop(guard); // 释放锁,避免在异步调用时持有锁
52                                    how(item).instrument(debug_span!("how")).await;
53                                }
54                            }
55                        }
56                    }
57                }
58                .instrument(debug_span!("handle")),
59            );
60        }
61
62        true => {
63            tokio::spawn(
64                async move {
65                    loop {
66                        // 检查是否有新的工作项
67                        let mut guard = trigger.lock().await;
68                        match guard.recv().await {
69                            Some(item) => {
70                                drop(guard); // 释放锁,避免在异步调用时持有锁
71                                how(item).instrument(debug_span!("how")).await;
72                            }
73
74                            None => return,
75                        }
76                    }
77                }
78                .instrument(debug_span!("handle")),
79            );
80        }
81    }
82}
83
84impl<J> Worker<J> {
85    pub fn new(name: &str, buf: usize) -> Self {
86        let (tx, rx) = channel(buf);
87        let work_count = Arc::new(Mutex::new(0));
88        Self {
89            name: name.to_string(),
90            work_count,
91            monitor: Monitor::new(name),
92            recv: Arc::new(Mutex::new(rx)),
93            graceful: false,
94            send: Arc::new(tx),
95        }
96    }
97
98    pub fn with_graceful(mut self, graceful: bool) -> Self {
99        self.graceful = graceful;
100        self
101    }
102
103    pub fn with_trigger(mut self, trigger: (Sender<J>, Receiver<J>)) -> Self {
104        let (send, recv) = trigger;
105        self.send = Arc::new(send);
106        self.recv = Arc::new(Mutex::new(recv));
107        self
108    }
109
110    pub fn get_sender(&self) -> Arc<Sender<J>> {
111        self.send.clone()
112    }
113
114    pub async fn send(&self, job: J) -> Result<(), SendError<J>> {
115        self.send.send(job).await
116    }
117
118    pub fn name(&self) -> String {
119        self.name.to_string()
120    }
121
122    pub async fn count(&self) -> usize {
123        let guard = self.work_count.lock().await;
124        *guard
125    }
126
127    pub async fn stop(&self) -> Result<(), SendError<()>> {
128        self.monitor.stop().await
129    }
130
131    pub async fn run<F, Fut>(&self, how: F)
132    where
133        F: Fn(J) -> Fut + Send + Sync + 'static,
134        Fut: Future<Output = ()> + Send + Sync + 'static,
135        J: Send + Sync + 'static,
136    {
137        debug!("WORKER START - {}", self.name);
138        let trigger = self.recv.clone();
139        let graceful = self.graceful;
140        let task = move |done: Receiver<()>| async move {
141            let done = Arc::new(Mutex::new(done));
142            let how = Arc::new(how);
143            handle(trigger, done, how, graceful)
144                // .instrument(debug_span!("handle"))
145                .await;
146        };
147
148        _ = self
149            .monitor
150            .run(task)
151            .instrument(debug_span!("monitor"))
152            .await;
153    }
154}