revue/state/worker/
mod.rs1mod channel;
66mod handle;
67mod pool;
68
69pub use channel::{WorkerChannel, WorkerCommand, WorkerMessage, WorkerReceiver, WorkerSender};
70pub use handle::{WorkerHandle, WorkerState};
71pub use pool::{Worker, WorkerPool};
72
73use std::future::Future;
74use std::pin::Pin;
75
76#[cfg(feature = "async")]
78mod shared_runtime {
79 use std::sync::OnceLock;
80 use tokio::runtime::{Handle, Runtime};
81
82 static RUNTIME: OnceLock<Runtime> = OnceLock::new();
83
84 pub fn handle() -> Result<Handle, String> {
94 if let Ok(handle) = Handle::try_current() {
96 return Ok(handle);
97 }
98
99 if let Some(runtime) = RUNTIME.get() {
102 Ok(runtime.handle().clone())
103 } else {
104 tokio::runtime::Builder::new_multi_thread()
106 .enable_all()
107 .worker_threads(
108 std::thread::available_parallelism()
109 .map(|n| n.get())
110 .unwrap_or(4),
111 )
112 .build()
113 .map(|runtime| {
114 if RUNTIME.set(runtime).is_err() {
116 }
118 RUNTIME
123 .get()
124 .unwrap_or_else(|| {
125 panic!("Shared runtime must be initialized after line 74")
126 })
127 .handle()
128 .clone()
129 })
130 .map_err(|e| format!("Failed to create tokio runtime: {}", e))
131 }
132 }
133}
134
135#[cfg(feature = "async")]
136pub(crate) use shared_runtime::handle as get_runtime_handle;
137
138pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
140
141pub type WorkerResult<T> = Result<T, WorkerError>;
143
144#[derive(Debug, Clone)]
146pub enum WorkerError {
147 Cancelled,
149 Panicked(String),
151 ChannelClosed,
153 Timeout,
155 Custom(String),
157 RuntimeCreationFailed(String),
159}
160
161impl std::fmt::Display for WorkerError {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 match self {
164 WorkerError::Cancelled => write!(f, "Worker task was cancelled"),
165 WorkerError::Panicked(msg) => write!(f, "Worker task panicked: {}", msg),
166 WorkerError::ChannelClosed => write!(f, "Worker channel closed"),
167 WorkerError::Timeout => write!(f, "Worker task timed out"),
168 WorkerError::Custom(msg) => write!(f, "Worker error: {}", msg),
169 WorkerError::RuntimeCreationFailed(msg) => {
170 write!(f, "Failed to create tokio runtime: {}", msg)
171 }
172 }
173 }
174}
175
176impl std::error::Error for WorkerError {}
177
178#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
180pub enum Priority {
181 Low,
183 #[default]
185 Normal,
186 High,
188}
189
190#[derive(Debug, Clone)]
192pub struct WorkerConfig {
193 pub threads: usize,
195 pub queue_capacity: usize,
197 pub default_timeout_ms: Option<u64>,
199}
200
201impl Default for WorkerConfig {
202 fn default() -> Self {
203 Self {
204 threads: std::thread::available_parallelism()
205 .map(|n| n.get())
206 .unwrap_or(4),
207 queue_capacity: 1000,
208 default_timeout_ms: None,
209 }
210 }
211}
212
213impl WorkerConfig {
214 pub fn with_threads(threads: usize) -> Self {
216 Self {
217 threads: threads.max(1),
218 ..Default::default()
219 }
220 }
221}
222
223pub fn run_blocking<F, T>(f: F) -> WorkerHandle<T>
225where
226 F: FnOnce() -> T + Send + 'static,
227 T: Send + 'static,
228{
229 WorkerHandle::spawn_blocking(f)
230}
231
232pub fn spawn<F, T>(future: F) -> WorkerHandle<T>
234where
235 F: Future<Output = T> + Send + 'static,
236 T: Send + 'static,
237{
238 WorkerHandle::spawn(future)
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244
245 #[test]
246 fn test_worker_error_display() {
247 assert_eq!(
248 format!("{}", WorkerError::Cancelled),
249 "Worker task was cancelled"
250 );
251 assert_eq!(
252 format!("{}", WorkerError::Panicked("test".to_string())),
253 "Worker task panicked: test"
254 );
255 assert_eq!(
256 format!("{}", WorkerError::ChannelClosed),
257 "Worker channel closed"
258 );
259 assert_eq!(format!("{}", WorkerError::Timeout), "Worker task timed out");
260 assert_eq!(
261 format!("{}", WorkerError::Custom("error".to_string())),
262 "Worker error: error"
263 );
264 assert_eq!(
265 format!(
266 "{}",
267 WorkerError::RuntimeCreationFailed("failed".to_string())
268 ),
269 "Failed to create tokio runtime: failed"
270 );
271 }
272
273 #[test]
274 fn test_priority_ordering() {
275 assert!(Priority::Low < Priority::Normal);
276 assert!(Priority::Normal < Priority::High);
277 assert!(Priority::Low < Priority::High);
278 assert_eq!(Priority::Normal, Priority::default());
279 }
280
281 #[test]
282 fn test_worker_config_default() {
283 let config = WorkerConfig::default();
284 assert!(config.threads >= 1);
285 assert_eq!(config.queue_capacity, 1000);
286 assert!(config.default_timeout_ms.is_none());
287 }
288
289 #[test]
290 fn test_worker_config_with_threads() {
291 let config = WorkerConfig::with_threads(4);
292 assert_eq!(config.threads, 4);
293 assert_eq!(config.queue_capacity, 1000);
294
295 let config = WorkerConfig::with_threads(0);
297 assert_eq!(config.threads, 1);
298 }
299
300 #[test]
301 fn test_run_blocking() {
302 let handle = run_blocking(|| 42);
303 drop(handle);
305 }
306
307 #[cfg(feature = "async")]
308 #[test]
309 fn test_shared_runtime_handle() {
310 let result1 = shared_runtime::handle();
312 assert!(result1.is_ok());
313
314 let result2 = shared_runtime::handle();
315 assert!(result2.is_ok());
316 }
317}