crush_core/plugin/
timeout.rs1use crate::cancel::CancellationToken;
8use crate::error::{Result, TimeoutError};
9use crossbeam::channel;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12use std::time::Duration;
13
14pub struct TimeoutGuard {
19 cancel_flag: Arc<AtomicBool>,
20}
21
22impl Drop for TimeoutGuard {
23 fn drop(&mut self) {
24 self.cancel_flag.store(true, Ordering::Release);
26 }
27}
28
29pub fn run_with_timeout<F, T>(timeout: Duration, operation: F) -> Result<T>
66where
67 F: FnOnce(Arc<AtomicBool>) -> Result<T> + Send + 'static,
68 T: Send + 'static,
69{
70 let effective_timeout = if timeout == Duration::from_secs(0) {
72 Duration::MAX
73 } else {
74 timeout
75 };
76
77 let cancel_flag = Arc::new(AtomicBool::new(false));
78 let cancel_flag_thread = Arc::clone(&cancel_flag);
79 let cancel_flag_guard = Arc::clone(&cancel_flag);
80
81 let (tx, rx) = channel::bounded(1);
82
83 std::thread::spawn(move || {
85 let _guard = TimeoutGuard {
86 cancel_flag: cancel_flag_guard,
87 };
88
89 let result = operation(cancel_flag_thread);
91 let _ = tx.send(result); });
93
94 match rx.recv_timeout(effective_timeout) {
96 Ok(result) => result,
97 Err(channel::RecvTimeoutError::Timeout) => {
98 eprintln!("Warning: Plugin operation timed out after {timeout:?}");
99 Err(TimeoutError::Timeout(timeout).into())
100 }
101 Err(channel::RecvTimeoutError::Disconnected) => {
102 eprintln!("Warning: Plugin thread panicked during execution");
103 Err(TimeoutError::PluginPanic.into())
104 }
105 }
106}
107
108pub fn run_with_timeout_and_cancel<F, T>(
131 timeout: Duration,
132 cancel_token: Option<Arc<dyn CancellationToken>>,
133 operation: F,
134) -> Result<T>
135where
136 F: FnOnce(Arc<AtomicBool>) -> Result<T> + Send + 'static,
137 T: Send + 'static,
138{
139 if let Some(ref token) = cancel_token {
141 if token.is_cancelled() {
142 return Err(crate::error::CrushError::Cancelled);
143 }
144 }
145
146 let effective_timeout = if timeout == Duration::from_secs(0) {
148 Duration::MAX
149 } else {
150 timeout
151 };
152
153 let cancel_flag = Arc::new(AtomicBool::new(false));
154 let cancel_flag_thread = Arc::clone(&cancel_flag);
155 let cancel_flag_guard = Arc::clone(&cancel_flag);
156 let cancel_flag_monitor = Arc::clone(&cancel_flag);
157
158 let monitor_handle = if let Some(token) = cancel_token {
160 let handle = std::thread::spawn(move || {
161 while !cancel_flag_monitor.load(Ordering::Acquire) {
163 if token.is_cancelled() {
164 cancel_flag_monitor.store(true, Ordering::Release);
166 break;
167 }
168 std::thread::sleep(Duration::from_micros(100));
170 }
171 });
172 Some(handle)
173 } else {
174 None
175 };
176
177 let (tx, rx) = channel::bounded(1);
178
179 std::thread::spawn(move || {
181 let _guard = TimeoutGuard {
182 cancel_flag: cancel_flag_guard,
183 };
184
185 let result = operation(cancel_flag_thread);
187 let _ = tx.send(result); });
189
190 let result = match rx.recv_timeout(effective_timeout) {
192 Ok(result) => {
193 match result {
195 Err(crate::error::CrushError::Plugin(crate::error::PluginError::Cancelled)) => {
196 Err(crate::error::CrushError::Cancelled)
197 }
198 other => other,
199 }
200 }
201 Err(channel::RecvTimeoutError::Timeout) => {
202 cancel_flag.store(true, Ordering::Release);
204 eprintln!("Warning: Plugin operation timed out after {timeout:?}");
205 Err(TimeoutError::Timeout(timeout).into())
206 }
207 Err(channel::RecvTimeoutError::Disconnected) => {
208 eprintln!("Warning: Plugin thread panicked during execution");
209 Err(TimeoutError::PluginPanic.into())
210 }
211 };
212
213 cancel_flag.store(true, Ordering::Release);
215 if let Some(handle) = monitor_handle {
216 let _ = handle.join(); }
218
219 result
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225 use crate::error::PluginError;
226
227 #[test]
228 #[allow(clippy::unwrap_used)]
229 fn test_operation_completes_within_timeout() {
230 let timeout = Duration::from_secs(1);
231
232 let result = run_with_timeout(timeout, |_cancel| {
233 Ok(42)
235 });
236
237 assert!(result.is_ok());
238 assert_eq!(result.unwrap(), 42);
239 }
240
241 #[test]
242 fn test_operation_respects_cancellation() {
243 let timeout = Duration::from_millis(50);
244
245 let result = run_with_timeout(timeout, |cancel_flag| {
246 for _ in 0..1000 {
248 if cancel_flag.load(Ordering::Acquire) {
249 return Err(PluginError::Cancelled.into());
250 }
251 std::thread::sleep(Duration::from_millis(10));
252 }
253 Ok(42)
254 });
255
256 assert!(result.is_err());
258 }
259
260 #[test]
261 #[allow(clippy::unwrap_used)]
262 fn test_zero_timeout_means_no_timeout() {
263 let timeout = Duration::from_secs(0);
264
265 let result = run_with_timeout(timeout, |_cancel| Ok(42));
266
267 assert!(result.is_ok());
269 assert_eq!(result.unwrap(), 42);
270 }
271
272 #[test]
273 fn test_timeout_guard_sets_flag_on_drop() {
274 let cancel_flag = Arc::new(AtomicBool::new(false));
275 {
276 let _guard = TimeoutGuard {
277 cancel_flag: Arc::clone(&cancel_flag),
278 };
279 assert!(!cancel_flag.load(Ordering::Acquire));
280 }
281 assert!(cancel_flag.load(Ordering::Acquire));
283 }
284
285 #[test]
286 #[allow(clippy::unwrap_used)]
287 fn test_run_with_timeout_basic_success() {
288 let timeout = Duration::from_secs(1);
289
290 let result = run_with_timeout(timeout, |_cancel| Ok(100));
291
292 assert!(result.is_ok());
293 assert_eq!(result.unwrap(), 100);
294 }
295
296 #[test]
297 #[allow(clippy::unwrap_used)]
298 fn test_run_with_timeout_operation_error() {
299 let timeout = Duration::from_secs(1);
300
301 let result: Result<i32> = run_with_timeout(timeout, |_cancel| {
302 Err(PluginError::OperationFailed("test error".to_string()).into())
303 });
304
305 assert!(result.is_err());
306 let err_msg = result.unwrap_err().to_string();
307 assert!(err_msg.contains("test error"));
308 }
309
310 #[test]
311 fn test_timeout_error_display() {
312 let timeout_err = TimeoutError::Timeout(Duration::from_secs(30));
313 assert!(timeout_err.to_string().contains("30"));
314
315 let panic_err = TimeoutError::PluginPanic;
316 assert!(panic_err.to_string().contains("panicked"));
317 }
318
319 #[test]
320 #[allow(clippy::unwrap_used)]
321 fn test_run_with_timeout_error_propagation() {
322 let timeout = Duration::from_secs(1);
323
324 let result: Result<i32> = run_with_timeout(timeout, |_cancel| {
325 Err(PluginError::OperationFailed("custom error".to_string()).into())
326 });
327
328 assert!(result.is_err());
329 let err_msg = result.unwrap_err().to_string();
330 assert!(err_msg.contains("custom error"));
331 }
332
333 #[test]
334 #[allow(clippy::unwrap_used)]
335 fn test_effective_timeout_conversion() {
336 let timeout = Duration::from_secs(0);
338
339 let result = run_with_timeout(timeout, |_cancel| {
341 std::thread::sleep(Duration::from_millis(10));
342 Ok("done".to_string())
343 });
344
345 assert!(result.is_ok());
346 assert_eq!(result.unwrap(), "done");
347 }
348}