Skip to main content

cloudillo_types/
worker.rs

1//! Worker pool. Handles synchronous tasks with 3 priority levels, configurable worker threads.
2
3use flume::{Receiver, Sender};
4use futures::channel::oneshot;
5use std::{sync::Arc, thread};
6
7use crate::prelude::*;
8
9#[derive(Clone, Copy, Debug)]
10pub enum Priority {
11	High,
12	Medium,
13	Low,
14}
15
16#[derive(Debug)]
17pub struct WorkerPool {
18	tx_high: Sender<Box<dyn FnOnce() + Send>>,
19	tx_med: Sender<Box<dyn FnOnce() + Send>>,
20	tx_low: Sender<Box<dyn FnOnce() + Send>>,
21}
22
23impl WorkerPool {
24	pub fn new(n1: usize, n2: usize, n3: usize) -> Self {
25		let (tx_high, rx_high) = flume::unbounded();
26		let (tx_med, rx_med) = flume::unbounded();
27		let (tx_low, rx_low) = flume::unbounded();
28
29		let rx_high = Arc::new(rx_high);
30		let rx_med = Arc::new(rx_med);
31		let rx_low = Arc::new(rx_low);
32
33		// Workers dedicated to High only
34		for _ in 0..n1 {
35			let rx_high = Arc::clone(&rx_high);
36			thread::spawn(move || worker_loop(vec![rx_high]));
37		}
38
39		// Workers for High + Medium
40		for _ in 0..n2 {
41			let rx_high = Arc::clone(&rx_high);
42			let rx_med = Arc::clone(&rx_med);
43			thread::spawn(move || worker_loop(vec![rx_high, rx_med]));
44		}
45
46		// Workers for High + Medium + Low
47		for _ in 0..n3 {
48			let rx_high = Arc::clone(&rx_high);
49			let rx_med = Arc::clone(&rx_med);
50			let rx_low = Arc::clone(&rx_low);
51			thread::spawn(move || worker_loop(vec![rx_high, rx_med, rx_low]));
52		}
53
54		Self { tx_high, tx_med, tx_low }
55	}
56
57	/// Submit a closure with arguments → returns a Future for the result
58	pub fn spawn<F, T>(
59		&self,
60		priority: Priority,
61		f: F,
62	) -> impl std::future::Future<Output = ClResult<T>>
63	where
64		F: FnOnce() -> T + Send + 'static,
65		T: Send + 'static,
66	{
67		let (res_tx, res_rx) = oneshot::channel();
68
69		let job = Box::new(move || {
70			let result = f();
71			let _ = res_tx.send(result);
72		});
73
74		match priority {
75			Priority::High => {
76				if self.tx_high.send(job).is_err() {
77					error!("Failed to send job to high priority worker queue");
78				}
79			}
80			Priority::Medium => {
81				if self.tx_med.send(job).is_err() {
82					error!("Failed to send job to medium priority worker queue");
83				}
84			}
85			Priority::Low => {
86				if self.tx_low.send(job).is_err() {
87					error!("Failed to send job to low priority worker queue");
88				}
89			}
90		}
91
92		async move {
93			res_rx.await.map_err(|_| {
94				error!("Worker dropped result channel (task may have panicked)");
95				Error::Internal("worker task failed".into())
96			})
97		}
98	}
99
100	pub fn run<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
101	where
102		F: FnOnce() -> T + Send + 'static,
103		T: Send + 'static,
104	{
105		info!("[RUN normal]");
106		let (res_tx, res_rx) = oneshot::channel();
107
108		let job = Box::new(move || {
109			let result = f();
110			let _ignore = res_tx.send(result);
111		});
112
113		if self.tx_med.send(job).is_err() {
114			error!("Failed to send job to medium priority worker queue");
115		}
116
117		async move {
118			res_rx.await.map_err(|_| {
119				error!("Worker dropped result channel (task may have panicked)");
120				Error::Internal("worker task failed".into())
121			})
122		}
123	}
124
125	pub fn run_immed<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
126	where
127		F: FnOnce() -> T + Send + 'static,
128		T: Send + 'static,
129	{
130		let (res_tx, res_rx) = oneshot::channel();
131
132		let job = Box::new(move || {
133			let result = f();
134			let _ignore = res_tx.send(result);
135		});
136
137		if self.tx_high.send(job).is_err() {
138			error!("Failed to send job to high priority worker queue");
139		}
140
141		async move {
142			res_rx.await.map_err(|_| {
143				error!("Worker dropped result channel (task may have panicked)");
144				Error::Internal("worker task failed".into())
145			})
146		}
147	}
148
149	pub fn run_slow<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
150	where
151		F: FnOnce() -> T + Send + 'static,
152		T: Send + 'static,
153	{
154		info!("[RUN slow]");
155		let (res_tx, res_rx) = oneshot::channel();
156
157		let job = Box::new(move || {
158			let result = f();
159			let _ignore = res_tx.send(result);
160		});
161
162		if self.tx_low.send(job).is_err() {
163			error!("Failed to send job to low priority worker queue");
164		}
165
166		async move {
167			res_rx.await.map_err(|_| {
168				error!("Worker dropped result channel (task may have panicked)");
169				Error::Internal("worker task failed".into())
170			})
171		}
172	}
173
174	/// Like `run`, but flattens `ClResult<ClResult<T>>` into `ClResult<T>`.
175	/// Use when the closure itself returns `ClResult<T>`.
176	pub fn try_run<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
177	where
178		F: FnOnce() -> ClResult<T> + Send + 'static,
179		T: Send + 'static,
180	{
181		let fut = self.run(f);
182		async move { fut.await? }
183	}
184
185	/// Like `run_immed`, but flattens `ClResult<ClResult<T>>` into `ClResult<T>`.
186	/// Use when the closure itself returns `ClResult<T>`.
187	pub fn try_run_immed<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
188	where
189		F: FnOnce() -> ClResult<T> + Send + 'static,
190		T: Send + 'static,
191	{
192		let fut = self.run_immed(f);
193		async move { fut.await? }
194	}
195
196	/// Like `run_slow`, but flattens `ClResult<ClResult<T>>` into `ClResult<T>`.
197	/// Use when the closure itself returns `ClResult<T>`.
198	pub fn try_run_slow<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
199	where
200		F: FnOnce() -> ClResult<T> + Send + 'static,
201		T: Send + 'static,
202	{
203		let fut = self.run_slow(f);
204		async move { fut.await? }
205	}
206}
207
208#[allow(clippy::type_complexity)]
209fn worker_loop(queues: Vec<Arc<Receiver<Box<dyn FnOnce() + Send>>>>) {
210	loop {
211		// Try higher-priority queues first (non-blocking)
212		let mut job = None;
213		for rx in &queues {
214			if let Ok(j) = rx.try_recv() {
215				job = Some(j);
216				break;
217			}
218		}
219
220		if let Some(job) = job {
221			if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(job)) {
222				error!("Worker thread caught panic: {:?}", e);
223			}
224			continue;
225		}
226
227		// Wait for next job
228		let mut selector = flume::Selector::new();
229		for rx in &queues {
230			selector = selector.recv(rx, |res| res);
231		}
232
233		let job: Result<Box<dyn FnOnce() + Send>, flume::RecvError> = selector.wait();
234		if let Ok(job) = job {
235			if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(job)) {
236				error!("Worker thread caught panic: {:?}", e);
237			}
238		}
239	}
240}
241
242// vim: ts=4