Skip to main content

cloudillo_types/
worker.rs

1// SPDX-FileCopyrightText: Szilárd Hajba
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
4//! Worker pool. Handles synchronous tasks with 3 priority levels, configurable worker threads.
5
6use flume::{Receiver, Sender};
7use futures::channel::oneshot;
8use std::{sync::Arc, thread};
9
10use crate::prelude::*;
11
12#[derive(Clone, Copy, Debug)]
13pub enum Priority {
14	High,
15	Medium,
16	Low,
17}
18
19#[derive(Debug)]
20pub struct WorkerPool {
21	high: Sender<Box<dyn FnOnce() + Send>>,
22	med: Sender<Box<dyn FnOnce() + Send>>,
23	low: Sender<Box<dyn FnOnce() + Send>>,
24}
25
26impl WorkerPool {
27	pub fn new(n1: usize, n2: usize, n3: usize) -> Self {
28		let (high, rx_high) = flume::unbounded();
29		let (med, rx_med) = flume::unbounded();
30		let (low, rx_low) = flume::unbounded();
31
32		let rx_high = Arc::new(rx_high);
33		let rx_med = Arc::new(rx_med);
34		let rx_low = Arc::new(rx_low);
35
36		// Workers dedicated to High only
37		for _ in 0..n1 {
38			let rx_high = Arc::clone(&rx_high);
39			thread::spawn(move || worker_loop(&[rx_high]));
40		}
41
42		// Workers for High + Medium
43		for _ in 0..n2 {
44			let rx_high = Arc::clone(&rx_high);
45			let rx_med = Arc::clone(&rx_med);
46			thread::spawn(move || worker_loop(&[rx_high, rx_med]));
47		}
48
49		// Workers for High + Medium + Low
50		for _ in 0..n3 {
51			let rx_high = Arc::clone(&rx_high);
52			let rx_med = Arc::clone(&rx_med);
53			let rx_low = Arc::clone(&rx_low);
54			thread::spawn(move || worker_loop(&[rx_high, rx_med, rx_low]));
55		}
56
57		Self { high, med, low }
58	}
59
60	/// Submit a closure with arguments → returns a Future for the result
61	pub fn spawn<F, T>(
62		&self,
63		priority: Priority,
64		f: F,
65	) -> impl std::future::Future<Output = ClResult<T>> + use<F, T>
66	where
67		F: FnOnce() -> T + Send + 'static,
68		T: Send + 'static,
69	{
70		let (res_tx, res_rx) = oneshot::channel();
71
72		let job = Box::new(move || {
73			let result = f();
74			let _ = res_tx.send(result);
75		});
76
77		match priority {
78			Priority::High => {
79				if self.high.send(job).is_err() {
80					error!("Failed to send job to high priority worker queue");
81				}
82			}
83			Priority::Medium => {
84				if self.med.send(job).is_err() {
85					error!("Failed to send job to medium priority worker queue");
86				}
87			}
88			Priority::Low => {
89				if self.low.send(job).is_err() {
90					error!("Failed to send job to low priority worker queue");
91				}
92			}
93		}
94
95		async move {
96			res_rx.await.map_err(|_| {
97				error!("Worker dropped result channel (task may have panicked)");
98				Error::Internal("worker task failed".into())
99			})
100		}
101	}
102
103	pub fn run<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>> + use<F, T>
104	where
105		F: FnOnce() -> T + Send + 'static,
106		T: Send + 'static,
107	{
108		info!("[RUN normal]");
109		let (res_tx, res_rx) = oneshot::channel();
110
111		let job = Box::new(move || {
112			let result = f();
113			let _ignore = res_tx.send(result);
114		});
115
116		if self.med.send(job).is_err() {
117			error!("Failed to send job to medium priority worker queue");
118		}
119
120		async move {
121			res_rx.await.map_err(|_| {
122				error!("Worker dropped result channel (task may have panicked)");
123				Error::Internal("worker task failed".into())
124			})
125		}
126	}
127
128	pub fn run_immed<F, T>(
129		&self,
130		f: F,
131	) -> impl std::future::Future<Output = ClResult<T>> + use<F, T>
132	where
133		F: FnOnce() -> T + Send + 'static,
134		T: Send + 'static,
135	{
136		let (res_tx, res_rx) = oneshot::channel();
137
138		let job = Box::new(move || {
139			let result = f();
140			let _ignore = res_tx.send(result);
141		});
142
143		if self.high.send(job).is_err() {
144			error!("Failed to send job to high priority worker queue");
145		}
146
147		async move {
148			res_rx.await.map_err(|_| {
149				error!("Worker dropped result channel (task may have panicked)");
150				Error::Internal("worker task failed".into())
151			})
152		}
153	}
154
155	pub fn run_slow<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>> + use<F, T>
156	where
157		F: FnOnce() -> T + Send + 'static,
158		T: Send + 'static,
159	{
160		info!("[RUN slow]");
161		let (res_tx, res_rx) = oneshot::channel();
162
163		let job = Box::new(move || {
164			let result = f();
165			let _ignore = res_tx.send(result);
166		});
167
168		if self.low.send(job).is_err() {
169			error!("Failed to send job to low priority worker queue");
170		}
171
172		async move {
173			res_rx.await.map_err(|_| {
174				error!("Worker dropped result channel (task may have panicked)");
175				Error::Internal("worker task failed".into())
176			})
177		}
178	}
179
180	/// Like `run`, but flattens `ClResult<ClResult<T>>` into `ClResult<T>`.
181	/// Use when the closure itself returns `ClResult<T>`.
182	pub fn try_run<F, T>(&self, f: F) -> impl std::future::Future<Output = ClResult<T>> + use<F, T>
183	where
184		F: FnOnce() -> ClResult<T> + Send + 'static,
185		T: Send + 'static,
186	{
187		let fut = self.run(f);
188		async move { fut.await? }
189	}
190
191	/// Like `run_immed`, but flattens `ClResult<ClResult<T>>` into `ClResult<T>`.
192	/// Use when the closure itself returns `ClResult<T>`.
193	pub fn try_run_immed<F, T>(
194		&self,
195		f: F,
196	) -> impl std::future::Future<Output = ClResult<T>> + use<F, T>
197	where
198		F: FnOnce() -> ClResult<T> + Send + 'static,
199		T: Send + 'static,
200	{
201		let fut = self.run_immed(f);
202		async move { fut.await? }
203	}
204
205	/// Like `run_slow`, but flattens `ClResult<ClResult<T>>` into `ClResult<T>`.
206	/// Use when the closure itself returns `ClResult<T>`.
207	pub fn try_run_slow<F, T>(
208		&self,
209		f: F,
210	) -> impl std::future::Future<Output = ClResult<T>> + use<F, T>
211	where
212		F: FnOnce() -> ClResult<T> + Send + 'static,
213		T: Send + 'static,
214	{
215		let fut = self.run_slow(f);
216		async move { fut.await? }
217	}
218}
219
220type JobQueue = Arc<Receiver<Box<dyn FnOnce() + Send>>>;
221
222fn worker_loop(queues: &[JobQueue]) {
223	loop {
224		// Try higher-priority queues first (non-blocking)
225		let mut job = None;
226		for rx in queues {
227			if let Ok(j) = rx.try_recv() {
228				job = Some(j);
229				break;
230			}
231		}
232
233		#[allow(clippy::collapsible_if)]
234		if let Some(job) = job {
235			if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(job)) {
236				error!("Worker thread caught panic: {:?}", e);
237			}
238			continue;
239		}
240
241		// Wait for next job
242		let mut selector = flume::Selector::new();
243		for rx in queues {
244			selector = selector.recv(rx, |res| res);
245		}
246
247		let job: Result<Box<dyn FnOnce() + Send>, flume::RecvError> = selector.wait();
248		if let Ok(job) = job
249			&& let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(job))
250		{
251			error!("Worker thread caught panic: {:?}", e);
252		}
253	}
254}
255
256// vim: ts=4