Skip to main content

crush_core/plugin/
timeout.rs

1//! Timeout protection for plugin operations
2//!
3//! Implements thread-based timeout enforcement with cooperative cancellation.
4//! Uses crossbeam channels for reliable timeout detection and `Arc<AtomicBool>`
5//! for cooperative cancellation within plugins.
6
7use crate::cancel::CancellationToken;
8use crate::error::{Result, TimeoutError};
9use crossbeam::channel;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12use std::time::Duration;
13
14/// RAII guard that sets cancellation flag on drop
15///
16/// When this guard is dropped (either normally or due to panic), it sets
17/// the cancellation flag to signal the plugin to stop processing.
18pub struct TimeoutGuard {
19    cancel_flag: Arc<AtomicBool>,
20}
21
22impl Drop for TimeoutGuard {
23    fn drop(&mut self) {
24        // Signal cancellation when guard is dropped (timeout or panic)
25        self.cancel_flag.store(true, Ordering::Release);
26    }
27}
28
29/// Run an operation with timeout protection
30///
31/// Spawns the operation in a dedicated thread and enforces the specified timeout.
32/// If the operation doesn't complete within the timeout, the cancellation flag
33/// is set and an error is returned.
34///
35/// # Arguments
36///
37/// * `timeout` - Maximum duration to wait for operation completion (0 = no timeout)
38/// * `operation` - The operation to run (receives cancellation flag)
39///
40/// # Returns
41///
42/// The operation's result if it completes within timeout, otherwise a timeout error
43///
44/// # Errors
45///
46/// Returns an error if:
47/// - Operation times out
48/// - Plugin thread panics during execution
49/// - Operation returns an error
50///
51/// # Examples
52///
53/// ```no_run
54/// use crush_core::plugin::timeout::run_with_timeout;
55/// use std::sync::Arc;
56/// use std::sync::atomic::AtomicBool;
57/// use std::time::Duration;
58///
59/// let timeout = Duration::from_secs(5);
60/// let result = run_with_timeout(timeout, |cancel_flag| {
61///     // Operation code here
62///     Ok(vec![1, 2, 3])
63/// });
64/// ```
65pub fn run_with_timeout<F, T>(timeout: Duration, operation: F) -> Result<T>
66where
67    F: FnOnce(Arc<AtomicBool>) -> Result<T> + Send + 'static,
68    T: Send + 'static,
69{
70    // Timeout of 0 means no timeout - use Duration::MAX for effectively infinite wait
71    let effective_timeout = if timeout == Duration::from_secs(0) {
72        Duration::MAX
73    } else {
74        timeout
75    };
76
77    let cancel_flag = Arc::new(AtomicBool::new(false));
78    let cancel_flag_thread = Arc::clone(&cancel_flag);
79    let cancel_flag_guard = Arc::clone(&cancel_flag);
80
81    let (tx, rx) = channel::bounded(1);
82
83    // Spawn operation in dedicated thread
84    std::thread::spawn(move || {
85        let _guard = TimeoutGuard {
86            cancel_flag: cancel_flag_guard,
87        };
88
89        // Run operation and send result
90        let result = operation(cancel_flag_thread);
91        let _ = tx.send(result); // Ignore send errors (receiver might have timed out)
92    });
93
94    // Wait for completion or timeout
95    match rx.recv_timeout(effective_timeout) {
96        Ok(result) => result,
97        Err(channel::RecvTimeoutError::Timeout) => {
98            eprintln!("Warning: Plugin operation timed out after {timeout:?}");
99            Err(TimeoutError::Timeout(timeout).into())
100        }
101        Err(channel::RecvTimeoutError::Disconnected) => {
102            eprintln!("Warning: Plugin thread panicked during execution");
103            Err(TimeoutError::PluginPanic.into())
104        }
105    }
106}
107
108/// Run an operation with timeout protection and external cancellation support
109///
110/// This version supports both timeout-based cancellation and external cancellation
111/// via a `CancellationToken` (e.g., for Ctrl+C handling).
112///
113/// # Arguments
114///
115/// * `timeout` - Maximum duration to wait for operation completion (0 = no timeout)
116/// * `cancel_token` - Optional external cancellation token
117/// * `operation` - The operation to run (receives cancellation flag)
118///
119/// # Returns
120///
121/// The operation's result if it completes, otherwise a timeout or cancellation error
122///
123/// # Errors
124///
125/// Returns an error if:
126/// - Operation times out
127/// - External cancellation is triggered
128/// - Plugin thread panics during execution
129/// - Operation returns an error
130pub fn run_with_timeout_and_cancel<F, T>(
131    timeout: Duration,
132    cancel_token: Option<Arc<dyn CancellationToken>>,
133    operation: F,
134) -> Result<T>
135where
136    F: FnOnce(Arc<AtomicBool>) -> Result<T> + Send + 'static,
137    T: Send + 'static,
138{
139    // Check if already cancelled before starting
140    if let Some(ref token) = cancel_token {
141        if token.is_cancelled() {
142            return Err(crate::error::CrushError::Cancelled);
143        }
144    }
145
146    // Timeout of 0 means no timeout - use Duration::MAX for effectively infinite wait
147    let effective_timeout = if timeout == Duration::from_secs(0) {
148        Duration::MAX
149    } else {
150        timeout
151    };
152
153    let cancel_flag = Arc::new(AtomicBool::new(false));
154    let cancel_flag_thread = Arc::clone(&cancel_flag);
155    let cancel_flag_guard = Arc::clone(&cancel_flag);
156    let cancel_flag_monitor = Arc::clone(&cancel_flag);
157
158    // Spawn a monitor thread for external cancellation token
159    let monitor_handle = if let Some(token) = cancel_token {
160        let handle = std::thread::spawn(move || {
161            // Poll the external token very frequently for responsive cancellation
162            while !cancel_flag_monitor.load(Ordering::Acquire) {
163                if token.is_cancelled() {
164                    // External cancellation requested - signal the plugin
165                    cancel_flag_monitor.store(true, Ordering::Release);
166                    break;
167                }
168                // Use a very short sleep for fast response
169                std::thread::sleep(Duration::from_micros(100));
170            }
171        });
172        Some(handle)
173    } else {
174        None
175    };
176
177    let (tx, rx) = channel::bounded(1);
178
179    // Spawn operation in dedicated thread
180    std::thread::spawn(move || {
181        let _guard = TimeoutGuard {
182            cancel_flag: cancel_flag_guard,
183        };
184
185        // Run operation and send result
186        let result = operation(cancel_flag_thread);
187        let _ = tx.send(result); // Ignore send errors (receiver might have timed out)
188    });
189
190    // Wait for completion or timeout
191    let result = match rx.recv_timeout(effective_timeout) {
192        Ok(result) => {
193            // Convert PluginError::Cancelled to CrushError::Cancelled
194            match result {
195                Err(crate::error::CrushError::Plugin(crate::error::PluginError::Cancelled)) => {
196                    Err(crate::error::CrushError::Cancelled)
197                }
198                other => other,
199            }
200        }
201        Err(channel::RecvTimeoutError::Timeout) => {
202            // Signal cancellation to the operation
203            cancel_flag.store(true, Ordering::Release);
204            eprintln!("Warning: Plugin operation timed out after {timeout:?}");
205            Err(TimeoutError::Timeout(timeout).into())
206        }
207        Err(channel::RecvTimeoutError::Disconnected) => {
208            eprintln!("Warning: Plugin thread panicked during execution");
209            Err(TimeoutError::PluginPanic.into())
210        }
211    };
212
213    // Stop the monitor thread if it's still running
214    cancel_flag.store(true, Ordering::Release);
215    if let Some(handle) = monitor_handle {
216        let _ = handle.join(); // Wait for monitor to finish
217    }
218
219    result
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use crate::error::PluginError;
226
227    #[test]
228    #[allow(clippy::unwrap_used)]
229    fn test_operation_completes_within_timeout() {
230        let timeout = Duration::from_secs(1);
231
232        let result = run_with_timeout(timeout, |_cancel| {
233            // Fast operation
234            Ok(42)
235        });
236
237        assert!(result.is_ok());
238        assert_eq!(result.unwrap(), 42);
239    }
240
241    #[test]
242    fn test_operation_respects_cancellation() {
243        let timeout = Duration::from_millis(50);
244
245        let result = run_with_timeout(timeout, |cancel_flag| {
246            // Simulate slow operation that checks cancellation
247            for _ in 0..1000 {
248                if cancel_flag.load(Ordering::Acquire) {
249                    return Err(PluginError::Cancelled.into());
250                }
251                std::thread::sleep(Duration::from_millis(10));
252            }
253            Ok(42)
254        });
255
256        // Should either timeout or be cancelled
257        assert!(result.is_err());
258    }
259
260    #[test]
261    #[allow(clippy::unwrap_used)]
262    fn test_zero_timeout_means_no_timeout() {
263        let timeout = Duration::from_secs(0);
264
265        let result = run_with_timeout(timeout, |_cancel| Ok(42));
266
267        // Zero timeout means no timeout - operation should succeed
268        assert!(result.is_ok());
269        assert_eq!(result.unwrap(), 42);
270    }
271
272    #[test]
273    fn test_timeout_guard_sets_flag_on_drop() {
274        let cancel_flag = Arc::new(AtomicBool::new(false));
275        {
276            let _guard = TimeoutGuard {
277                cancel_flag: Arc::clone(&cancel_flag),
278            };
279            assert!(!cancel_flag.load(Ordering::Acquire));
280        }
281        // Flag should be set after guard is dropped
282        assert!(cancel_flag.load(Ordering::Acquire));
283    }
284
285    #[test]
286    #[allow(clippy::unwrap_used)]
287    fn test_run_with_timeout_basic_success() {
288        let timeout = Duration::from_secs(1);
289
290        let result = run_with_timeout(timeout, |_cancel| Ok(100));
291
292        assert!(result.is_ok());
293        assert_eq!(result.unwrap(), 100);
294    }
295
296    #[test]
297    #[allow(clippy::unwrap_used)]
298    fn test_run_with_timeout_operation_error() {
299        let timeout = Duration::from_secs(1);
300
301        let result: Result<i32> = run_with_timeout(timeout, |_cancel| {
302            Err(PluginError::OperationFailed("test error".to_string()).into())
303        });
304
305        assert!(result.is_err());
306        let err_msg = result.unwrap_err().to_string();
307        assert!(err_msg.contains("test error"));
308    }
309
310    #[test]
311    fn test_timeout_error_display() {
312        let timeout_err = TimeoutError::Timeout(Duration::from_secs(30));
313        assert!(timeout_err.to_string().contains("30"));
314
315        let panic_err = TimeoutError::PluginPanic;
316        assert!(panic_err.to_string().contains("panicked"));
317    }
318
319    #[test]
320    #[allow(clippy::unwrap_used)]
321    fn test_run_with_timeout_error_propagation() {
322        let timeout = Duration::from_secs(1);
323
324        let result: Result<i32> = run_with_timeout(timeout, |_cancel| {
325            Err(PluginError::OperationFailed("custom error".to_string()).into())
326        });
327
328        assert!(result.is_err());
329        let err_msg = result.unwrap_err().to_string();
330        assert!(err_msg.contains("custom error"));
331    }
332
333    #[test]
334    #[allow(clippy::unwrap_used)]
335    fn test_effective_timeout_conversion() {
336        // Test that 0 timeout becomes Duration::MAX internally
337        let timeout = Duration::from_secs(0);
338
339        // This should complete successfully even with "infinite" effective timeout
340        let result = run_with_timeout(timeout, |_cancel| {
341            std::thread::sleep(Duration::from_millis(10));
342            Ok("done".to_string())
343        });
344
345        assert!(result.is_ok());
346        assert_eq!(result.unwrap(), "done");
347    }
348}