use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
pub struct CombinedAbortSignal {
pub signal: Arc<AtomicBool>,
pub cleanup: Box<dyn Fn() + Send + Sync>,
}
pub fn create_combined_abort_signal(
signal: Option<&Arc<AtomicBool>>,
opts: Option<CombinedAbortSignalOpts>,
) -> CombinedAbortSignal {
let signal_b = opts.as_ref().and_then(|o| o.signal_b.as_ref());
let timeout_ms = opts.as_ref().and_then(|o| o.timeout_ms);
let combined = Arc::new(AtomicBool::new(false));
let is_aborted = signal.map(|s| s.load(Ordering::SeqCst)).unwrap_or(false)
|| signal_b.map(|s| s.load(Ordering::SeqCst)).unwrap_or(false);
if is_aborted {
combined.store(true, Ordering::SeqCst);
return CombinedAbortSignal {
signal: combined,
cleanup: Box::new(|| {}),
};
}
let combined_clone = combined.clone();
let signal_clone = signal.map(|s| s.clone());
let signal_b_clone = signal_b.cloned();
let abort_closure = Box::new(move || {
combined_clone.store(true, Ordering::SeqCst);
});
let mut abort_handles: Vec<Box<dyn Fn() + Send + Sync>> = Vec::new();
let timer_handle: Option<thread::JoinHandle<()>> = if let Some(ms) = timeout_ms {
let combined_timer = combined.clone();
Some(thread::spawn(move || {
thread::sleep(Duration::from_millis(ms));
combined_timer.store(true, Ordering::SeqCst);
}))
} else {
None
};
let timer_handle = std::sync::Mutex::new(timer_handle);
let cleanup = Box::new(move || {
if let Ok(mut handle) = timer_handle.lock() {
if let Some(h) = handle.take() {
let _ = h.join();
}
}
});
CombinedAbortSignal {
signal: combined,
cleanup,
}
}
#[derive(Debug, Clone, Default)]
pub struct CombinedAbortSignalOpts {
pub signal_b: Option<Arc<AtomicBool>>,
pub timeout_ms: Option<u64>,
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
#[test]
fn test_combined_abort_signal_no_opts() {
let result = create_combined_abort_signal(None, None);
assert!(!result.signal.load(Ordering::SeqCst));
}
#[test]
fn test_combined_abort_signal_timeout() {
let result = create_combined_abort_signal(
None,
Some(CombinedAbortSignalOpts {
signal_b: None,
timeout_ms: Some(10),
}),
);
assert!(!result.signal.load(Ordering::SeqCst));
thread::sleep(Duration::from_millis(20));
assert!(result.signal.load(Ordering::SeqCst));
}
#[test]
fn test_combined_abort_signal_already_aborted() {
let aborted = Arc::new(AtomicBool::new(true));
let result = create_combined_abort_signal(Some(&aborted), None);
assert!(result.signal.load(Ordering::SeqCst));
}
#[test]
fn test_combined_abort_signal_cleanup() {
let result = create_combined_abort_signal(
None,
Some(CombinedAbortSignalOpts {
signal_b: None,
timeout_ms: Some(100),
}),
);
(result.cleanup)();
}
}