cloudillo_types/
worker.rs1use flume::{Receiver, Sender};
4use futures::channel::oneshot;
5use std::{sync::Arc, thread};
6
7use crate::prelude::*;
8
9#[derive(Clone, Copy, Debug)]
10pub enum Priority {
11 High,
12 Medium,
13 Low,
14}
15
16#[derive(Debug)]
17pub struct WorkerPool {
18 tx_high: Sender<Box<dyn FnOnce() + Send>>,
19 tx_med: Sender<Box<dyn FnOnce() + Send>>,
20 tx_low: Sender<Box<dyn FnOnce() + Send>>,
21}
22
23impl WorkerPool {
24 pub fn new(n1: usize, n2: usize, n3: usize) -> Self {
25 let (tx_high, rx_high) = flume::unbounded();
26 let (tx_med, rx_med) = flume::unbounded();
27 let (tx_low, rx_low) = flume::unbounded();
28
29 let rx_high = Arc::new(rx_high);
30 let rx_med = Arc::new(rx_med);
31 let rx_low = Arc::new(rx_low);
32
33 for _ in 0..n1 {
35 let rx_high = Arc::clone(&rx_high);
36 thread::spawn(move || worker_loop(vec![rx_high]));
37 }
38
39 for _ in 0..n2 {
41 let rx_high = Arc::clone(&rx_high);
42 let rx_med = Arc::clone(&rx_med);
43 thread::spawn(move || worker_loop(vec![rx_high, rx_med]));
44 }
45
46 for _ in 0..n3 {
48 let rx_high = Arc::clone(&rx_high);
49 let rx_med = Arc::clone(&rx_med);
50 let rx_low = Arc::clone(&rx_low);
51 thread::spawn(move || worker_loop(vec![rx_high, rx_med, rx_low]));
52 }
53
54 Self { tx_high, tx_med, tx_low }
55 }
56
57 pub fn spawn<F, T>(
59 &self,
60 priority: Priority,
61 f: F,
62 ) -> impl std::future::Future<Output = ClResult<T>>
63 where
64 F: FnOnce() -> T + Send + 'static,
65 T: Send + 'static,
66 {
67 let (res_tx, res_rx) = oneshot::channel();
68
69 let job = Box::new(move || {
70 let result = f();
71 let _ = res_tx.send(result);
72 });
73
74 match priority {
75 Priority::High => {
76 if self.tx_high.send(job).is_err() {
77 error!("Failed to send job to high priority worker queue");
78 }
79 }
80 Priority::Medium => {
81 if self.tx_med.send(job).is_err() {
82 error!("Failed to send job to medium priority worker queue");
83 }
84 }
85 Priority::Low => {
86 if self.tx_low.send(job).is_err() {
87 error!("Failed to send job to low priority worker queue");
88 }
89 }
90 }
91
92 async move {
93 res_rx.await.map_err(|_| {
94 error!("Worker dropped result channel (task may have panicked)");
95 Error::Internal("worker task failed".into())
96 })
97 }
98 }
99
100 pub fn run<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
101 where
102 F: FnOnce() -> T + Send + 'static,
103 T: Send + 'static,
104 {
105 info!("[RUN normal]");
106 let (res_tx, res_rx) = oneshot::channel();
107
108 let job = Box::new(move || {
109 let result = f();
110 let _ignore = res_tx.send(result);
111 });
112
113 if self.tx_med.send(job).is_err() {
114 error!("Failed to send job to medium priority worker queue");
115 }
116
117 async move {
118 res_rx.await.map_err(|_| {
119 error!("Worker dropped result channel (task may have panicked)");
120 Error::Internal("worker task failed".into())
121 })
122 }
123 }
124
125 pub fn run_immed<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
126 where
127 F: FnOnce() -> T + Send + 'static,
128 T: Send + 'static,
129 {
130 let (res_tx, res_rx) = oneshot::channel();
131
132 let job = Box::new(move || {
133 let result = f();
134 let _ignore = res_tx.send(result);
135 });
136
137 if self.tx_high.send(job).is_err() {
138 error!("Failed to send job to high priority worker queue");
139 }
140
141 async move {
142 res_rx.await.map_err(|_| {
143 error!("Worker dropped result channel (task may have panicked)");
144 Error::Internal("worker task failed".into())
145 })
146 }
147 }
148
149 pub fn run_slow<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
150 where
151 F: FnOnce() -> T + Send + 'static,
152 T: Send + 'static,
153 {
154 info!("[RUN slow]");
155 let (res_tx, res_rx) = oneshot::channel();
156
157 let job = Box::new(move || {
158 let result = f();
159 let _ignore = res_tx.send(result);
160 });
161
162 if self.tx_low.send(job).is_err() {
163 error!("Failed to send job to low priority worker queue");
164 }
165
166 async move {
167 res_rx.await.map_err(|_| {
168 error!("Worker dropped result channel (task may have panicked)");
169 Error::Internal("worker task failed".into())
170 })
171 }
172 }
173
174 pub fn try_run<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
177 where
178 F: FnOnce() -> ClResult<T> + Send + 'static,
179 T: Send + 'static,
180 {
181 let fut = self.run(f);
182 async move { fut.await? }
183 }
184
185 pub fn try_run_immed<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
188 where
189 F: FnOnce() -> ClResult<T> + Send + 'static,
190 T: Send + 'static,
191 {
192 let fut = self.run_immed(f);
193 async move { fut.await? }
194 }
195
196 pub fn try_run_slow<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
199 where
200 F: FnOnce() -> ClResult<T> + Send + 'static,
201 T: Send + 'static,
202 {
203 let fut = self.run_slow(f);
204 async move { fut.await? }
205 }
206}
207
208#[allow(clippy::type_complexity)]
209fn worker_loop(queues: Vec<Arc<Receiver<Box<dyn FnOnce() + Send>>>>) {
210 loop {
211 let mut job = None;
213 for rx in &queues {
214 if let Ok(j) = rx.try_recv() {
215 job = Some(j);
216 break;
217 }
218 }
219
220 if let Some(job) = job {
221 if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(job)) {
222 error!("Worker thread caught panic: {:?}", e);
223 }
224 continue;
225 }
226
227 let mut selector = flume::Selector::new();
229 for rx in &queues {
230 selector = selector.recv(rx, |res| res);
231 }
232
233 let job: Result<Box<dyn FnOnce() + Send>, flume::RecvError> = selector.wait();
234 if let Ok(job) = job {
235 if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(job)) {
236 error!("Worker thread caught panic: {:?}", e);
237 }
238 }
239 }
240}
241
242