1use crate::monitor::Monitor;
2use std::{future::Future, sync::Arc};
3use tokio::sync::{
4 Mutex,
5 mpsc::{
6 Receiver, Sender, channel,
7 error::{
8 SendError,
9 TryRecvError::{Disconnected, Empty},
10 },
11 },
12};
13use tracing::{Instrument, debug, debug_span};
14
15pub struct Worker<J> {
16 name: String,
17 work_count: Arc<Mutex<usize>>,
18 monitor: Monitor,
19 recv: Arc<Mutex<Receiver<J>>>,
20 send: Arc<Sender<J>>,
21 graceful: bool,
22}
23
24async fn handle<F, Fut, J>(
25 trigger: Arc<Mutex<Receiver<J>>>,
26 done: Arc<Mutex<Receiver<()>>>,
27 how: Arc<F>,
28 graceful: bool,
29) where
30 F: Fn(J) -> Fut + Send + Sync + 'static,
31 Fut: Future<Output = ()> + Send + Sync + 'static,
32 J: Send + Sync + 'static,
33{
34 match graceful {
35 false => {
36 tokio::spawn(
37 async move {
38 let mut done = done.lock().await;
39 loop {
40 match done.try_recv() {
42 Ok(_) | Err(Disconnected) => {
43 done.close();
44 return;
45 }
46
47 Err(Empty) => {
48 let mut guard = trigger.lock().await;
50 if let Ok(item) = guard.try_recv() {
51 drop(guard); how(item).instrument(debug_span!("how")).await;
53 }
54 }
55 }
56 }
57 }
58 .instrument(debug_span!("handle")),
59 );
60 }
61
62 true => {
63 tokio::spawn(
64 async move {
65 loop {
66 let mut guard = trigger.lock().await;
68 match guard.recv().await {
69 Some(item) => {
70 drop(guard); how(item).instrument(debug_span!("how")).await;
72 }
73
74 None => return,
75 }
76 }
77 }
78 .instrument(debug_span!("handle")),
79 );
80 }
81 }
82}
83
84impl<J> Worker<J> {
85 pub fn new(name: &str, buf: usize) -> Self {
86 let (tx, rx) = channel(buf);
87 let work_count = Arc::new(Mutex::new(0));
88 Self {
89 name: name.to_string(),
90 work_count,
91 monitor: Monitor::new(name),
92 recv: Arc::new(Mutex::new(rx)),
93 graceful: false,
94 send: Arc::new(tx),
95 }
96 }
97
98 pub fn with_graceful(mut self, graceful: bool) -> Self {
99 self.graceful = graceful;
100 self
101 }
102
103 pub fn with_trigger(mut self, trigger: (Sender<J>, Receiver<J>)) -> Self {
104 let (send, recv) = trigger;
105 self.send = Arc::new(send);
106 self.recv = Arc::new(Mutex::new(recv));
107 self
108 }
109
110 pub fn get_sender(&self) -> Arc<Sender<J>> {
111 self.send.clone()
112 }
113
114 pub async fn send(&self, job: J) -> Result<(), SendError<J>> {
115 self.send.send(job).await
116 }
117
118 pub fn name(&self) -> String {
119 self.name.to_string()
120 }
121
122 pub async fn count(&self) -> usize {
123 let guard = self.work_count.lock().await;
124 *guard
125 }
126
127 pub async fn stop(&self) -> Result<(), SendError<()>> {
128 self.monitor.stop().await
129 }
130
131 pub async fn run<F, Fut>(&self, how: F)
132 where
133 F: Fn(J) -> Fut + Send + Sync + 'static,
134 Fut: Future<Output = ()> + Send + Sync + 'static,
135 J: Send + Sync + 'static,
136 {
137 debug!("WORKER START - {}", self.name);
138 let trigger = self.recv.clone();
139 let graceful = self.graceful;
140 let task = move |done: Receiver<()>| async move {
141 let done = Arc::new(Mutex::new(done));
142 let how = Arc::new(how);
143 handle(trigger, done, how, graceful)
144 .await;
146 };
147
148 _ = self
149 .monitor
150 .run(task)
151 .instrument(debug_span!("monitor"))
152 .await;
153 }
154}