Skip to main content

revue/state/worker/
mod.rs

1//! Worker system for background tasks
2//!
3//! Run CPU-intensive or I/O operations without blocking the UI.
4//! Built on tokio with worker pools for efficient task management.
5//!
6//! # Features
7//!
8//! - **Non-blocking UI** - Run heavy computations without freezing the UI
9//! - **Worker Pools** - Reusable thread pools for common operations
10//! - **Async/Await** - Full async/await support with tokio
11//! - **Easy API** - Simple spawn and check operations
12//! - **Platform Detection** - Auto-detects optimal worker count
13//!
14//! # Quick Start
15//!
16//! ## Worker Pool
17//!
18//! ```rust,ignore
19//! use revue::worker::WorkerPool;
20//!
21//! // Create pool with 4 workers
22//! let pool = WorkerPool::new(4);
23//!
24//! // Submit work
25//! pool.submit(|| {
26//!     // Heavy computation
27//!     expensive_calculation()
28//! });
29//!
30//! // Graceful shutdown
31//! pool.shutdown().await;
32//! ```
33//!
34//! ## Spawn Task
35//!
36//! ```rust,ignore
37//! use revue::worker::WorkerHandle;
38//!
39//! // Spawn a single background task
40//! let handle = WorkerHandle::spawn(async {
41//!     // Long-running operation
42//!     fetch_data_from_api().await
43//! });
44//!
45//! // Check if done
46//! if let Some(result) = handle.try_recv() {
47//!     // Use result
48//! }
49//! ```
50//!
51//! ## Task with Return Value
52//!
53//! ```rust,ignore
54//! use revue::worker::WorkerHandle;
55//!
56//! let handle = WorkerHandle::spawn(async {
57//!     // Return a value
58//!     compute_result()
59//! });
60//!
61//! // Get result when ready
62//! let result = handle.join().await;
63//! ```
64
65mod channel;
66mod handle;
67mod pool;
68
69pub use channel::{WorkerChannel, WorkerCommand, WorkerMessage, WorkerReceiver, WorkerSender};
70pub use handle::{WorkerHandle, WorkerState};
71pub use pool::{Worker, WorkerPool};
72
73use std::future::Future;
74use std::pin::Pin;
75
76/// Shared tokio runtime for async worker tasks
77#[cfg(feature = "async")]
78mod shared_runtime {
79    use std::sync::OnceLock;
80    use tokio::runtime::{Handle, Runtime};
81
82    static RUNTIME: OnceLock<Runtime> = OnceLock::new();
83
84    /// Get or create the shared runtime handle
85    ///
86    /// Returns an error string if runtime creation fails instead of panicking.
87    ///
88    /// # Errors
89    ///
90    /// Returns `Err(String)` if:
91    /// - The tokio runtime cannot be created (e.g., insufficient resources, system limits)
92    /// - The runtime thread pool cannot be initialized
93    pub fn handle() -> Result<Handle, String> {
94        // First, try to get the current runtime if we're already in one
95        if let Ok(handle) = Handle::try_current() {
96            return Ok(handle);
97        }
98
99        // Try to get or create the shared runtime
100        // If initialization failed, return an error instead of panicking
101        if let Some(runtime) = RUNTIME.get() {
102            Ok(runtime.handle().clone())
103        } else {
104            // Use multi_thread runtime to allow block_on() from within runtime context
105            tokio::runtime::Builder::new_multi_thread()
106                .enable_all()
107                .worker_threads(
108                    std::thread::available_parallelism()
109                        .map(|n| n.get())
110                        .unwrap_or(4),
111                )
112                .build()
113                .map(|runtime| {
114                    // Gracefully handle races where another thread sets the runtime first.
115                    if RUNTIME.set(runtime).is_err() {
116                        // Another thread initialized the runtime; fall back to the existing one.
117                    }
118                    // RUNTIME is guaranteed to be initialized here because:
119                    // 1. Either this thread just successfully set it (line 74 above)
120                    // 2. Or another thread set it (causing the Err case at line 74)
121                    // In both cases, RUNTIME.get() returns Some.
122                    RUNTIME
123                        .get()
124                        .unwrap_or_else(|| {
125                            panic!("Shared runtime must be initialized after line 74")
126                        })
127                        .handle()
128                        .clone()
129                })
130                .map_err(|e| format!("Failed to create tokio runtime: {}", e))
131        }
132    }
133}
134
135#[cfg(feature = "async")]
136pub(crate) use shared_runtime::handle as get_runtime_handle;
137
138/// A boxed future for worker tasks
139pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
140
141/// Worker task result
142pub type WorkerResult<T> = Result<T, WorkerError>;
143
144/// Worker error types
145#[derive(Debug, Clone)]
146pub enum WorkerError {
147    /// Task was cancelled
148    Cancelled,
149    /// Task panicked
150    Panicked(String),
151    /// Channel closed
152    ChannelClosed,
153    /// Timeout
154    Timeout,
155    /// Custom error
156    Custom(String),
157    /// Runtime creation failed
158    RuntimeCreationFailed(String),
159}
160
161impl std::fmt::Display for WorkerError {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        match self {
164            WorkerError::Cancelled => write!(f, "Worker task was cancelled"),
165            WorkerError::Panicked(msg) => write!(f, "Worker task panicked: {}", msg),
166            WorkerError::ChannelClosed => write!(f, "Worker channel closed"),
167            WorkerError::Timeout => write!(f, "Worker task timed out"),
168            WorkerError::Custom(msg) => write!(f, "Worker error: {}", msg),
169            WorkerError::RuntimeCreationFailed(msg) => {
170                write!(f, "Failed to create tokio runtime: {}", msg)
171            }
172        }
173    }
174}
175
176impl std::error::Error for WorkerError {}
177
178/// Worker task priority
179#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
180pub enum Priority {
181    /// Low priority (background)
182    Low,
183    /// Normal priority
184    #[default]
185    Normal,
186    /// High priority
187    High,
188}
189
190/// Worker configuration
191#[derive(Debug, Clone)]
192pub struct WorkerConfig {
193    /// Number of worker threads
194    pub threads: usize,
195    /// Task queue capacity
196    pub queue_capacity: usize,
197    /// Default timeout in milliseconds
198    pub default_timeout_ms: Option<u64>,
199}
200
201impl Default for WorkerConfig {
202    fn default() -> Self {
203        Self {
204            threads: std::thread::available_parallelism()
205                .map(|n| n.get())
206                .unwrap_or(4),
207            queue_capacity: 1000,
208            default_timeout_ms: None,
209        }
210    }
211}
212
213impl WorkerConfig {
214    /// Create with specific thread count
215    pub fn with_threads(threads: usize) -> Self {
216        Self {
217            threads: threads.max(1),
218            ..Default::default()
219        }
220    }
221}
222
223/// Convenience function to run a sync task in background
224pub fn run_blocking<F, T>(f: F) -> WorkerHandle<T>
225where
226    F: FnOnce() -> T + Send + 'static,
227    T: Send + 'static,
228{
229    WorkerHandle::spawn_blocking(f)
230}
231
232/// Convenience function to run an async task
233pub fn spawn<F, T>(future: F) -> WorkerHandle<T>
234where
235    F: Future<Output = T> + Send + 'static,
236    T: Send + 'static,
237{
238    WorkerHandle::spawn(future)
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244
245    #[test]
246    fn test_worker_error_display() {
247        assert_eq!(
248            format!("{}", WorkerError::Cancelled),
249            "Worker task was cancelled"
250        );
251        assert_eq!(
252            format!("{}", WorkerError::Panicked("test".to_string())),
253            "Worker task panicked: test"
254        );
255        assert_eq!(
256            format!("{}", WorkerError::ChannelClosed),
257            "Worker channel closed"
258        );
259        assert_eq!(format!("{}", WorkerError::Timeout), "Worker task timed out");
260        assert_eq!(
261            format!("{}", WorkerError::Custom("error".to_string())),
262            "Worker error: error"
263        );
264        assert_eq!(
265            format!(
266                "{}",
267                WorkerError::RuntimeCreationFailed("failed".to_string())
268            ),
269            "Failed to create tokio runtime: failed"
270        );
271    }
272
273    #[test]
274    fn test_priority_ordering() {
275        assert!(Priority::Low < Priority::Normal);
276        assert!(Priority::Normal < Priority::High);
277        assert!(Priority::Low < Priority::High);
278        assert_eq!(Priority::Normal, Priority::default());
279    }
280
281    #[test]
282    fn test_worker_config_default() {
283        let config = WorkerConfig::default();
284        assert!(config.threads >= 1);
285        assert_eq!(config.queue_capacity, 1000);
286        assert!(config.default_timeout_ms.is_none());
287    }
288
289    #[test]
290    fn test_worker_config_with_threads() {
291        let config = WorkerConfig::with_threads(4);
292        assert_eq!(config.threads, 4);
293        assert_eq!(config.queue_capacity, 1000);
294
295        // Minimum 1 thread
296        let config = WorkerConfig::with_threads(0);
297        assert_eq!(config.threads, 1);
298    }
299
300    #[test]
301    fn test_run_blocking() {
302        let handle = run_blocking(|| 42);
303        // Handle is created successfully
304        drop(handle);
305    }
306
307    #[cfg(feature = "async")]
308    #[test]
309    fn test_shared_runtime_handle() {
310        // Multiple calls should return handles
311        let result1 = shared_runtime::handle();
312        assert!(result1.is_ok());
313
314        let result2 = shared_runtime::handle();
315        assert!(result2.is_ok());
316    }
317}