1use crate::monitor::Monitor;
2use std::{future::Future, sync::Arc};
3use tokio::sync::{
4 Mutex,
5 mpsc::{
6 Receiver, Sender, channel,
7 error::{
8 SendError,
9 TryRecvError::{Disconnected, Empty},
10 },
11 },
12};
13use tracing::{Instrument, debug, debug_span};
14
15#[derive(Clone)]
16pub struct Worker<J> {
17 name: String,
18 work_count: Arc<Mutex<usize>>,
19 monitor: Monitor,
20 recv: Arc<Mutex<Receiver<J>>>,
21 send: Arc<Sender<J>>,
22 graceful: bool,
23}
24
25async fn handle<F, Fut, J>(
26 trigger: Arc<Mutex<Receiver<J>>>,
27 done: Arc<Mutex<Receiver<()>>>,
28 how: Arc<F>,
29 graceful: bool,
30) where
31 F: Fn(J) -> Fut + Send + Sync + 'static,
32 Fut: Future<Output = ()> + Send + 'static,
33 J: Send + Sync + 'static,
34{
35 match graceful {
36 false => {
37 tokio::spawn(
38 async move {
39 let mut done = done.lock().await;
40 loop {
41 match done.try_recv() {
43 Ok(_) | Err(Disconnected) => {
44 done.close();
45 return;
46 }
47
48 Err(Empty) => {
49 let mut guard = trigger.lock().await;
51 if let Ok(item) = guard.try_recv() {
52 drop(guard); how(item).await;
54 }
55 }
56 }
57 }
58 }
59 .instrument(debug_span!("handle")),
60 );
61 }
62
63 true => {
64 tokio::spawn(
65 async move {
66 loop {
67 let mut guard = trigger.lock().await;
69 match guard.recv().await {
70 Some(item) => {
71 drop(guard); how(item).await;
73 }
74
75 None => return,
76 }
77 }
78 }
79 .instrument(debug_span!("grace")),
80 );
81 }
82 }
83}
84
85impl<J> Worker<J> {
86 pub fn new(name: &str, buf: usize) -> Self {
87 let (tx, rx) = channel(buf);
88 let work_count = Arc::new(Mutex::new(0));
89 Self {
90 name: name.to_string(),
91 work_count,
92 monitor: Monitor::new(name),
93 recv: Arc::new(Mutex::new(rx)),
94 graceful: false,
95 send: Arc::new(tx),
96 }
97 }
98
99 pub fn with_on_start<F, Fut>(mut self, task: F) -> Self
100 where
101 F: FnOnce() -> Fut + Send + Sync + 'static,
102 Fut: Future<Output = ()> + Send + Sync + 'static,
103 {
104 self.monitor = self.monitor.with_on_start(task);
105 self
106 }
107
108 pub fn with_on_exit<F, Fut>(mut self, task: F) -> Self
109 where
110 F: FnOnce() -> Fut + Send + Sync + 'static,
111 Fut: Future<Output = ()> + Send + Sync + 'static,
112 {
113 self.monitor = self.monitor.with_on_exit(task);
114 self
115 }
116
117 pub fn with_graceful(mut self, graceful: bool) -> Self {
118 self.graceful = graceful;
119 self
120 }
121
122 pub fn with_trigger(mut self, trigger: (Arc<Sender<J>>, Arc<Mutex<Receiver<J>>>)) -> Self {
123 let (send, recv) = trigger;
124 self.send = send;
125 self.recv = recv;
126 self
127 }
128
129 pub fn get_sender(&self) -> Arc<Sender<J>> {
130 self.send.clone()
131 }
132
133 pub async fn send(&self, job: J) -> Result<(), SendError<J>> {
134 self.send.send(job).await
135 }
136
137 pub fn name(&self) -> String {
138 self.name.to_string()
139 }
140
141 pub async fn count(&self) -> usize {
142 let guard = self.work_count.lock().await;
143 *guard
144 }
145
146 pub async fn stop(&self) -> Result<(), SendError<()>> {
147 self.monitor.stop().await
148 }
149
150 pub async fn run<F, Fut>(&self, how: F)
151 where
152 F: Fn(J) -> Fut + Send + Sync + 'static,
153 Fut: Future<Output = ()> + Send + 'static,
154 J: Send + Sync + 'static,
155 {
156 debug!("WORKER START - {}", self.name);
157 let trigger = self.recv.clone();
158 let graceful = self.graceful;
159 let how = Arc::new(how);
160 let task = move |done: Receiver<()>| async move {
161 let done = Arc::new(Mutex::new(done));
162 handle(trigger, done, how, graceful).await;
163 };
164
165 _ = self
166 .monitor
167 .run(task)
168 .instrument(debug_span!("monitor"))
169 .await;
170 }
171}