cloudillo_types/
worker.rs1use flume::{Receiver, Sender};
7use futures::channel::oneshot;
8use std::{sync::Arc, thread};
9
10use crate::prelude::*;
11
12#[derive(Clone, Copy, Debug)]
13pub enum Priority {
14 High,
15 Medium,
16 Low,
17}
18
19#[derive(Debug)]
20pub struct WorkerPool {
21 high: Sender<Box<dyn FnOnce() + Send>>,
22 med: Sender<Box<dyn FnOnce() + Send>>,
23 low: Sender<Box<dyn FnOnce() + Send>>,
24}
25
26impl WorkerPool {
27 pub fn new(n1: usize, n2: usize, n3: usize) -> Self {
28 let (high, rx_high) = flume::unbounded();
29 let (med, rx_med) = flume::unbounded();
30 let (low, rx_low) = flume::unbounded();
31
32 let rx_high = Arc::new(rx_high);
33 let rx_med = Arc::new(rx_med);
34 let rx_low = Arc::new(rx_low);
35
36 for _ in 0..n1 {
38 let rx_high = Arc::clone(&rx_high);
39 thread::spawn(move || worker_loop(&[rx_high]));
40 }
41
42 for _ in 0..n2 {
44 let rx_high = Arc::clone(&rx_high);
45 let rx_med = Arc::clone(&rx_med);
46 thread::spawn(move || worker_loop(&[rx_high, rx_med]));
47 }
48
49 for _ in 0..n3 {
51 let rx_high = Arc::clone(&rx_high);
52 let rx_med = Arc::clone(&rx_med);
53 let rx_low = Arc::clone(&rx_low);
54 thread::spawn(move || worker_loop(&[rx_high, rx_med, rx_low]));
55 }
56
57 Self { high, med, low }
58 }
59
60 pub fn spawn<F, T>(
62 &self,
63 priority: Priority,
64 f: F,
65 ) -> impl std::future::Future<Output = ClResult<T>>
66 where
67 F: FnOnce() -> T + Send + 'static,
68 T: Send + 'static,
69 {
70 let (res_tx, res_rx) = oneshot::channel();
71
72 let job = Box::new(move || {
73 let result = f();
74 let _ = res_tx.send(result);
75 });
76
77 match priority {
78 Priority::High => {
79 if self.high.send(job).is_err() {
80 error!("Failed to send job to high priority worker queue");
81 }
82 }
83 Priority::Medium => {
84 if self.med.send(job).is_err() {
85 error!("Failed to send job to medium priority worker queue");
86 }
87 }
88 Priority::Low => {
89 if self.low.send(job).is_err() {
90 error!("Failed to send job to low priority worker queue");
91 }
92 }
93 }
94
95 async move {
96 res_rx.await.map_err(|_| {
97 error!("Worker dropped result channel (task may have panicked)");
98 Error::Internal("worker task failed".into())
99 })
100 }
101 }
102
103 pub fn run<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
104 where
105 F: FnOnce() -> T + Send + 'static,
106 T: Send + 'static,
107 {
108 info!("[RUN normal]");
109 let (res_tx, res_rx) = oneshot::channel();
110
111 let job = Box::new(move || {
112 let result = f();
113 let _ignore = res_tx.send(result);
114 });
115
116 if self.med.send(job).is_err() {
117 error!("Failed to send job to medium priority worker queue");
118 }
119
120 async move {
121 res_rx.await.map_err(|_| {
122 error!("Worker dropped result channel (task may have panicked)");
123 Error::Internal("worker task failed".into())
124 })
125 }
126 }
127
128 pub fn run_immed<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
129 where
130 F: FnOnce() -> T + Send + 'static,
131 T: Send + 'static,
132 {
133 let (res_tx, res_rx) = oneshot::channel();
134
135 let job = Box::new(move || {
136 let result = f();
137 let _ignore = res_tx.send(result);
138 });
139
140 if self.high.send(job).is_err() {
141 error!("Failed to send job to high priority worker queue");
142 }
143
144 async move {
145 res_rx.await.map_err(|_| {
146 error!("Worker dropped result channel (task may have panicked)");
147 Error::Internal("worker task failed".into())
148 })
149 }
150 }
151
152 pub fn run_slow<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
153 where
154 F: FnOnce() -> T + Send + 'static,
155 T: Send + 'static,
156 {
157 info!("[RUN slow]");
158 let (res_tx, res_rx) = oneshot::channel();
159
160 let job = Box::new(move || {
161 let result = f();
162 let _ignore = res_tx.send(result);
163 });
164
165 if self.low.send(job).is_err() {
166 error!("Failed to send job to low priority worker queue");
167 }
168
169 async move {
170 res_rx.await.map_err(|_| {
171 error!("Worker dropped result channel (task may have panicked)");
172 Error::Internal("worker task failed".into())
173 })
174 }
175 }
176
177 pub fn try_run<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
180 where
181 F: FnOnce() -> ClResult<T> + Send + 'static,
182 T: Send + 'static,
183 {
184 let fut = self.run(f);
185 async move { fut.await? }
186 }
187
188 pub fn try_run_immed<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
191 where
192 F: FnOnce() -> ClResult<T> + Send + 'static,
193 T: Send + 'static,
194 {
195 let fut = self.run_immed(f);
196 async move { fut.await? }
197 }
198
199 pub fn try_run_slow<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
202 where
203 F: FnOnce() -> ClResult<T> + Send + 'static,
204 T: Send + 'static,
205 {
206 let fut = self.run_slow(f);
207 async move { fut.await? }
208 }
209}
210
211type JobQueue = Arc<Receiver<Box<dyn FnOnce() + Send>>>;
212
213fn worker_loop(queues: &[JobQueue]) {
214 loop {
215 let mut job = None;
217 for rx in queues {
218 if let Ok(j) = rx.try_recv() {
219 job = Some(j);
220 break;
221 }
222 }
223
224 if let Some(job) = job {
225 if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(job)) {
226 error!("Worker thread caught panic: {:?}", e);
227 }
228 continue;
229 }
230
231 let mut selector = flume::Selector::new();
233 for rx in queues {
234 selector = selector.recv(rx, |res| res);
235 }
236
237 let job: Result<Box<dyn FnOnce() + Send>, flume::RecvError> = selector.wait();
238 if let Ok(job) = job {
239 if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(job)) {
240 error!("Worker thread caught panic: {:?}", e);
241 }
242 }
243 }
244}
245
246