Skip to main content

cloudillo_types/
worker.rs

1//! Worker pool. Handles synchronous tasks with 3 priority levels, configurable worker threads.
2
3use flume::{Receiver, Sender};
4use futures::channel::oneshot;
5use std::{sync::Arc, thread};
6
7use crate::prelude::*;
8
9#[derive(Clone, Copy, Debug)]
10pub enum Priority {
11	High,
12	Medium,
13	Low,
14}
15
16#[derive(Debug)]
17pub struct WorkerPool {
18	high: Sender<Box<dyn FnOnce() + Send>>,
19	med: Sender<Box<dyn FnOnce() + Send>>,
20	low: Sender<Box<dyn FnOnce() + Send>>,
21}
22
23impl WorkerPool {
24	pub fn new(n1: usize, n2: usize, n3: usize) -> Self {
25		let (high, rx_high) = flume::unbounded();
26		let (med, rx_med) = flume::unbounded();
27		let (low, rx_low) = flume::unbounded();
28
29		let rx_high = Arc::new(rx_high);
30		let rx_med = Arc::new(rx_med);
31		let rx_low = Arc::new(rx_low);
32
33		// Workers dedicated to High only
34		for _ in 0..n1 {
35			let rx_high = Arc::clone(&rx_high);
36			thread::spawn(move || worker_loop(&[rx_high]));
37		}
38
39		// Workers for High + Medium
40		for _ in 0..n2 {
41			let rx_high = Arc::clone(&rx_high);
42			let rx_med = Arc::clone(&rx_med);
43			thread::spawn(move || worker_loop(&[rx_high, rx_med]));
44		}
45
46		// Workers for High + Medium + Low
47		for _ in 0..n3 {
48			let rx_high = Arc::clone(&rx_high);
49			let rx_med = Arc::clone(&rx_med);
50			let rx_low = Arc::clone(&rx_low);
51			thread::spawn(move || worker_loop(&[rx_high, rx_med, rx_low]));
52		}
53
54		Self { high, med, low }
55	}
56
57	/// Submit a closure with arguments → returns a Future for the result
58	pub fn spawn<F, T>(
59		&self,
60		priority: Priority,
61		f: F,
62	) -> impl std::future::Future<Output = ClResult<T>>
63	where
64		F: FnOnce() -> T + Send + 'static,
65		T: Send + 'static,
66	{
67		let (res_tx, res_rx) = oneshot::channel();
68
69		let job = Box::new(move || {
70			let result = f();
71			let _ = res_tx.send(result);
72		});
73
74		match priority {
75			Priority::High => {
76				if self.high.send(job).is_err() {
77					error!("Failed to send job to high priority worker queue");
78				}
79			}
80			Priority::Medium => {
81				if self.med.send(job).is_err() {
82					error!("Failed to send job to medium priority worker queue");
83				}
84			}
85			Priority::Low => {
86				if self.low.send(job).is_err() {
87					error!("Failed to send job to low priority worker queue");
88				}
89			}
90		}
91
92		async move {
93			res_rx.await.map_err(|_| {
94				error!("Worker dropped result channel (task may have panicked)");
95				Error::Internal("worker task failed".into())
96			})
97		}
98	}
99
100	pub fn run<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
101	where
102		F: FnOnce() -> T + Send + 'static,
103		T: Send + 'static,
104	{
105		info!("[RUN normal]");
106		let (res_tx, res_rx) = oneshot::channel();
107
108		let job = Box::new(move || {
109			let result = f();
110			let _ignore = res_tx.send(result);
111		});
112
113		if self.med.send(job).is_err() {
114			error!("Failed to send job to medium priority worker queue");
115		}
116
117		async move {
118			res_rx.await.map_err(|_| {
119				error!("Worker dropped result channel (task may have panicked)");
120				Error::Internal("worker task failed".into())
121			})
122		}
123	}
124
125	pub fn run_immed<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
126	where
127		F: FnOnce() -> T + Send + 'static,
128		T: Send + 'static,
129	{
130		let (res_tx, res_rx) = oneshot::channel();
131
132		let job = Box::new(move || {
133			let result = f();
134			let _ignore = res_tx.send(result);
135		});
136
137		if self.high.send(job).is_err() {
138			error!("Failed to send job to high priority worker queue");
139		}
140
141		async move {
142			res_rx.await.map_err(|_| {
143				error!("Worker dropped result channel (task may have panicked)");
144				Error::Internal("worker task failed".into())
145			})
146		}
147	}
148
149	pub fn run_slow<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
150	where
151		F: FnOnce() -> T + Send + 'static,
152		T: Send + 'static,
153	{
154		info!("[RUN slow]");
155		let (res_tx, res_rx) = oneshot::channel();
156
157		let job = Box::new(move || {
158			let result = f();
159			let _ignore = res_tx.send(result);
160		});
161
162		if self.low.send(job).is_err() {
163			error!("Failed to send job to low priority worker queue");
164		}
165
166		async move {
167			res_rx.await.map_err(|_| {
168				error!("Worker dropped result channel (task may have panicked)");
169				Error::Internal("worker task failed".into())
170			})
171		}
172	}
173
174	/// Like `run`, but flattens `ClResult<ClResult<T>>` into `ClResult<T>`.
175	/// Use when the closure itself returns `ClResult<T>`.
176	pub fn try_run<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
177	where
178		F: FnOnce() -> ClResult<T> + Send + 'static,
179		T: Send + 'static,
180	{
181		let fut = self.run(f);
182		async move { fut.await? }
183	}
184
185	/// Like `run_immed`, but flattens `ClResult<ClResult<T>>` into `ClResult<T>`.
186	/// Use when the closure itself returns `ClResult<T>`.
187	pub fn try_run_immed<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
188	where
189		F: FnOnce() -> ClResult<T> + Send + 'static,
190		T: Send + 'static,
191	{
192		let fut = self.run_immed(f);
193		async move { fut.await? }
194	}
195
196	/// Like `run_slow`, but flattens `ClResult<ClResult<T>>` into `ClResult<T>`.
197	/// Use when the closure itself returns `ClResult<T>`.
198	pub fn try_run_slow<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
199	where
200		F: FnOnce() -> ClResult<T> + Send + 'static,
201		T: Send + 'static,
202	{
203		let fut = self.run_slow(f);
204		async move { fut.await? }
205	}
206}
207
208type JobQueue = Arc<Receiver<Box<dyn FnOnce() + Send>>>;
209
210fn worker_loop(queues: &[JobQueue]) {
211	loop {
212		// Try higher-priority queues first (non-blocking)
213		let mut job = None;
214		for rx in queues {
215			if let Ok(j) = rx.try_recv() {
216				job = Some(j);
217				break;
218			}
219		}
220
221		if let Some(job) = job {
222			if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(job)) {
223				error!("Worker thread caught panic: {:?}", e);
224			}
225			continue;
226		}
227
228		// Wait for next job
229		let mut selector = flume::Selector::new();
230		for rx in queues {
231			selector = selector.recv(rx, |res| res);
232		}
233
234		let job: Result<Box<dyn FnOnce() + Send>, flume::RecvError> = selector.wait();
235		if let Ok(job) = job {
236			if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(job)) {
237				error!("Worker thread caught panic: {:?}", e);
238			}
239		}
240	}
241}
242
243// vim: ts=4