cloudillo_types/
worker.rs1use flume::{Receiver, Sender};
7use futures::channel::oneshot;
8use std::{sync::Arc, thread};
9
10use crate::prelude::*;
11
12#[derive(Clone, Copy, Debug)]
13pub enum Priority {
14 High,
15 Medium,
16 Low,
17}
18
19#[derive(Debug)]
20pub struct WorkerPool {
21 high: Sender<Box<dyn FnOnce() + Send>>,
22 med: Sender<Box<dyn FnOnce() + Send>>,
23 low: Sender<Box<dyn FnOnce() + Send>>,
24}
25
26impl WorkerPool {
27 pub fn new(n1: usize, n2: usize, n3: usize) -> Self {
28 let (high, rx_high) = flume::unbounded();
29 let (med, rx_med) = flume::unbounded();
30 let (low, rx_low) = flume::unbounded();
31
32 let rx_high = Arc::new(rx_high);
33 let rx_med = Arc::new(rx_med);
34 let rx_low = Arc::new(rx_low);
35
36 for _ in 0..n1 {
38 let rx_high = Arc::clone(&rx_high);
39 thread::spawn(move || worker_loop(&[rx_high]));
40 }
41
42 for _ in 0..n2 {
44 let rx_high = Arc::clone(&rx_high);
45 let rx_med = Arc::clone(&rx_med);
46 thread::spawn(move || worker_loop(&[rx_high, rx_med]));
47 }
48
49 for _ in 0..n3 {
51 let rx_high = Arc::clone(&rx_high);
52 let rx_med = Arc::clone(&rx_med);
53 let rx_low = Arc::clone(&rx_low);
54 thread::spawn(move || worker_loop(&[rx_high, rx_med, rx_low]));
55 }
56
57 Self { high, med, low }
58 }
59
60 pub fn spawn<F, T>(
62 &self,
63 priority: Priority,
64 f: F,
65 ) -> impl std::future::Future<Output = ClResult<T>> + use<F, T>
66 where
67 F: FnOnce() -> T + Send + 'static,
68 T: Send + 'static,
69 {
70 let (res_tx, res_rx) = oneshot::channel();
71
72 let job = Box::new(move || {
73 let result = f();
74 let _ = res_tx.send(result);
75 });
76
77 match priority {
78 Priority::High => {
79 if self.high.send(job).is_err() {
80 error!("Failed to send job to high priority worker queue");
81 }
82 }
83 Priority::Medium => {
84 if self.med.send(job).is_err() {
85 error!("Failed to send job to medium priority worker queue");
86 }
87 }
88 Priority::Low => {
89 if self.low.send(job).is_err() {
90 error!("Failed to send job to low priority worker queue");
91 }
92 }
93 }
94
95 async move {
96 res_rx.await.map_err(|_| {
97 error!("Worker dropped result channel (task may have panicked)");
98 Error::Internal("worker task failed".into())
99 })
100 }
101 }
102
103 pub fn run<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>> + use<F, T>
104 where
105 F: FnOnce() -> T + Send + 'static,
106 T: Send + 'static,
107 {
108 info!("[RUN normal]");
109 let (res_tx, res_rx) = oneshot::channel();
110
111 let job = Box::new(move || {
112 let result = f();
113 let _ignore = res_tx.send(result);
114 });
115
116 if self.med.send(job).is_err() {
117 error!("Failed to send job to medium priority worker queue");
118 }
119
120 async move {
121 res_rx.await.map_err(|_| {
122 error!("Worker dropped result channel (task may have panicked)");
123 Error::Internal("worker task failed".into())
124 })
125 }
126 }
127
128 pub fn run_immed<F, T>(
129 &self,
130 f: F,
131 ) -> impl std::future::Future<Output = ClResult<T>> + use<F, T>
132 where
133 F: FnOnce() -> T + Send + 'static,
134 T: Send + 'static,
135 {
136 let (res_tx, res_rx) = oneshot::channel();
137
138 let job = Box::new(move || {
139 let result = f();
140 let _ignore = res_tx.send(result);
141 });
142
143 if self.high.send(job).is_err() {
144 error!("Failed to send job to high priority worker queue");
145 }
146
147 async move {
148 res_rx.await.map_err(|_| {
149 error!("Worker dropped result channel (task may have panicked)");
150 Error::Internal("worker task failed".into())
151 })
152 }
153 }
154
155 pub fn run_slow<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>> + use<F, T>
156 where
157 F: FnOnce() -> T + Send + 'static,
158 T: Send + 'static,
159 {
160 info!("[RUN slow]");
161 let (res_tx, res_rx) = oneshot::channel();
162
163 let job = Box::new(move || {
164 let result = f();
165 let _ignore = res_tx.send(result);
166 });
167
168 if self.low.send(job).is_err() {
169 error!("Failed to send job to low priority worker queue");
170 }
171
172 async move {
173 res_rx.await.map_err(|_| {
174 error!("Worker dropped result channel (task may have panicked)");
175 Error::Internal("worker task failed".into())
176 })
177 }
178 }
179
180 pub fn try_run<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>> + use<F, T>
183 where
184 F: FnOnce() -> ClResult<T> + Send + 'static,
185 T: Send + 'static,
186 {
187 let fut = self.run(f);
188 async move { fut.await? }
189 }
190
191 pub fn try_run_immed<F, T>(
194 &self,
195 f: F,
196 ) -> impl std::future::Future<Output = ClResult<T>> + use<F, T>
197 where
198 F: FnOnce() -> ClResult<T> + Send + 'static,
199 T: Send + 'static,
200 {
201 let fut = self.run_immed(f);
202 async move { fut.await? }
203 }
204
205 pub fn try_run_slow<F, T>(
208 &self,
209 f: F,
210 ) -> impl std::future::Future<Output = ClResult<T>> + use<F, T>
211 where
212 F: FnOnce() -> ClResult<T> + Send + 'static,
213 T: Send + 'static,
214 {
215 let fut = self.run_slow(f);
216 async move { fut.await? }
217 }
218}
219
220type JobQueue = Arc<Receiver<Box<dyn FnOnce() + Send>>>;
221
222fn worker_loop(queues: &[JobQueue]) {
223 loop {
224 let mut job = None;
226 for rx in queues {
227 if let Ok(j) = rx.try_recv() {
228 job = Some(j);
229 break;
230 }
231 }
232
233 #[allow(clippy::collapsible_if)]
234 if let Some(job) = job {
235 if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(job)) {
236 error!("Worker thread caught panic: {:?}", e);
237 }
238 continue;
239 }
240
241 let mut selector = flume::Selector::new();
243 for rx in queues {
244 selector = selector.recv(rx, |res| res);
245 }
246
247 let job: Result<Box<dyn FnOnce() + Send>, flume::RecvError> = selector.wait();
248 if let Ok(job) = job
249 && let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(job))
250 {
251 error!("Worker thread caught panic: {:?}", e);
252 }
253 }
254}
255
256