Skip to main content

cloudillo_types/
worker.rs

1// SPDX-FileCopyrightText: Szilárd Hajba
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
4//! Worker pool. Handles synchronous tasks with 3 priority levels, configurable worker threads.
5
6use flume::{Receiver, Sender};
7use futures::channel::oneshot;
8use std::{sync::Arc, thread};
9
10use crate::prelude::*;
11
12#[derive(Clone, Copy, Debug)]
13pub enum Priority {
14	High,
15	Medium,
16	Low,
17}
18
19#[derive(Debug)]
20pub struct WorkerPool {
21	high: Sender<Box<dyn FnOnce() + Send>>,
22	med: Sender<Box<dyn FnOnce() + Send>>,
23	low: Sender<Box<dyn FnOnce() + Send>>,
24}
25
26impl WorkerPool {
27	pub fn new(n1: usize, n2: usize, n3: usize) -> Self {
28		let (high, rx_high) = flume::unbounded();
29		let (med, rx_med) = flume::unbounded();
30		let (low, rx_low) = flume::unbounded();
31
32		let rx_high = Arc::new(rx_high);
33		let rx_med = Arc::new(rx_med);
34		let rx_low = Arc::new(rx_low);
35
36		// Workers dedicated to High only
37		for _ in 0..n1 {
38			let rx_high = Arc::clone(&rx_high);
39			thread::spawn(move || worker_loop(&[rx_high]));
40		}
41
42		// Workers for High + Medium
43		for _ in 0..n2 {
44			let rx_high = Arc::clone(&rx_high);
45			let rx_med = Arc::clone(&rx_med);
46			thread::spawn(move || worker_loop(&[rx_high, rx_med]));
47		}
48
49		// Workers for High + Medium + Low
50		for _ in 0..n3 {
51			let rx_high = Arc::clone(&rx_high);
52			let rx_med = Arc::clone(&rx_med);
53			let rx_low = Arc::clone(&rx_low);
54			thread::spawn(move || worker_loop(&[rx_high, rx_med, rx_low]));
55		}
56
57		Self { high, med, low }
58	}
59
60	/// Submit a closure with arguments → returns a Future for the result
61	pub fn spawn<F, T>(
62		&self,
63		priority: Priority,
64		f: F,
65	) -> impl std::future::Future<Output = ClResult<T>>
66	where
67		F: FnOnce() -> T + Send + 'static,
68		T: Send + 'static,
69	{
70		let (res_tx, res_rx) = oneshot::channel();
71
72		let job = Box::new(move || {
73			let result = f();
74			let _ = res_tx.send(result);
75		});
76
77		match priority {
78			Priority::High => {
79				if self.high.send(job).is_err() {
80					error!("Failed to send job to high priority worker queue");
81				}
82			}
83			Priority::Medium => {
84				if self.med.send(job).is_err() {
85					error!("Failed to send job to medium priority worker queue");
86				}
87			}
88			Priority::Low => {
89				if self.low.send(job).is_err() {
90					error!("Failed to send job to low priority worker queue");
91				}
92			}
93		}
94
95		async move {
96			res_rx.await.map_err(|_| {
97				error!("Worker dropped result channel (task may have panicked)");
98				Error::Internal("worker task failed".into())
99			})
100		}
101	}
102
103	pub fn run<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
104	where
105		F: FnOnce() -> T + Send + 'static,
106		T: Send + 'static,
107	{
108		info!("[RUN normal]");
109		let (res_tx, res_rx) = oneshot::channel();
110
111		let job = Box::new(move || {
112			let result = f();
113			let _ignore = res_tx.send(result);
114		});
115
116		if self.med.send(job).is_err() {
117			error!("Failed to send job to medium priority worker queue");
118		}
119
120		async move {
121			res_rx.await.map_err(|_| {
122				error!("Worker dropped result channel (task may have panicked)");
123				Error::Internal("worker task failed".into())
124			})
125		}
126	}
127
128	pub fn run_immed<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
129	where
130		F: FnOnce() -> T + Send + 'static,
131		T: Send + 'static,
132	{
133		let (res_tx, res_rx) = oneshot::channel();
134
135		let job = Box::new(move || {
136			let result = f();
137			let _ignore = res_tx.send(result);
138		});
139
140		if self.high.send(job).is_err() {
141			error!("Failed to send job to high priority worker queue");
142		}
143
144		async move {
145			res_rx.await.map_err(|_| {
146				error!("Worker dropped result channel (task may have panicked)");
147				Error::Internal("worker task failed".into())
148			})
149		}
150	}
151
152	pub fn run_slow<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
153	where
154		F: FnOnce() -> T + Send + 'static,
155		T: Send + 'static,
156	{
157		info!("[RUN slow]");
158		let (res_tx, res_rx) = oneshot::channel();
159
160		let job = Box::new(move || {
161			let result = f();
162			let _ignore = res_tx.send(result);
163		});
164
165		if self.low.send(job).is_err() {
166			error!("Failed to send job to low priority worker queue");
167		}
168
169		async move {
170			res_rx.await.map_err(|_| {
171				error!("Worker dropped result channel (task may have panicked)");
172				Error::Internal("worker task failed".into())
173			})
174		}
175	}
176
177	/// Like `run`, but flattens `ClResult<ClResult<T>>` into `ClResult<T>`.
178	/// Use when the closure itself returns `ClResult<T>`.
179	pub fn try_run<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
180	where
181		F: FnOnce() -> ClResult<T> + Send + 'static,
182		T: Send + 'static,
183	{
184		let fut = self.run(f);
185		async move { fut.await? }
186	}
187
188	/// Like `run_immed`, but flattens `ClResult<ClResult<T>>` into `ClResult<T>`.
189	/// Use when the closure itself returns `ClResult<T>`.
190	pub fn try_run_immed<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
191	where
192		F: FnOnce() -> ClResult<T> + Send + 'static,
193		T: Send + 'static,
194	{
195		let fut = self.run_immed(f);
196		async move { fut.await? }
197	}
198
199	/// Like `run_slow`, but flattens `ClResult<ClResult<T>>` into `ClResult<T>`.
200	/// Use when the closure itself returns `ClResult<T>`.
201	pub fn try_run_slow<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>>
202	where
203		F: FnOnce() -> ClResult<T> + Send + 'static,
204		T: Send + 'static,
205	{
206		let fut = self.run_slow(f);
207		async move { fut.await? }
208	}
209}
210
211type JobQueue = Arc<Receiver<Box<dyn FnOnce() + Send>>>;
212
213fn worker_loop(queues: &[JobQueue]) {
214	loop {
215		// Try higher-priority queues first (non-blocking)
216		let mut job = None;
217		for rx in queues {
218			if let Ok(j) = rx.try_recv() {
219				job = Some(j);
220				break;
221			}
222		}
223
224		if let Some(job) = job {
225			if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(job)) {
226				error!("Worker thread caught panic: {:?}", e);
227			}
228			continue;
229		}
230
231		// Wait for next job
232		let mut selector = flume::Selector::new();
233		for rx in queues {
234			selector = selector.recv(rx, |res| res);
235		}
236
237		let job: Result<Box<dyn FnOnce() + Send>, flume::RecvError> = selector.wait();
238		if let Ok(job) = job {
239			if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(job)) {
240				error!("Worker thread caught panic: {:?}", e);
241			}
242		}
243	}
244}
245
246// vim: ts=4