use std::sync::mpsc::{RecvTimeoutError, sync_channel};
use std::thread;
use std::time::{Duration, Instant};
use crate::halt::Halt;
const POLL_INTERVAL: Duration = Duration::from_millis(250);
#[derive(Debug)]
pub(crate) enum BoundedError {
Halted,
Timeout,
WorkerLost,
}
pub(crate) fn bounded_syscall<F, R>(
halt: Option<&Halt>,
timeout: Duration,
op: F,
) -> Result<R, BoundedError>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = sync_channel::<R>(0);
let _ = thread::Builder::new()
.name("freemkv-bounded-syscall".into())
.spawn(move || {
let _ = tx.send(op());
});
let deadline = Instant::now() + timeout;
loop {
let now = Instant::now();
let remaining = deadline.saturating_duration_since(now);
let slice = remaining.min(POLL_INTERVAL);
match rx.recv_timeout(slice) {
Ok(v) => return Ok(v),
Err(RecvTimeoutError::Timeout) => {
if let Some(h) = halt {
if h.is_cancelled() {
return Err(BoundedError::Halted);
}
}
if Instant::now() >= deadline {
return Err(BoundedError::Timeout);
}
}
Err(RecvTimeoutError::Disconnected) => {
return Err(BoundedError::WorkerLost);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
#[test]
fn op_completes_quickly() {
let r = bounded_syscall(None, Duration::from_secs(2), || 42u32);
assert!(matches!(r, Ok(42)));
}
#[test]
fn op_exceeds_timeout() {
let r = bounded_syscall(None, Duration::from_millis(300), || {
thread::sleep(Duration::from_secs(2));
0u32
});
assert!(matches!(r, Err(BoundedError::Timeout)));
}
#[test]
fn halt_fires_during_wait() {
let halt = Halt::new();
let halt2 = halt.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(300));
halt2.cancel();
});
let r = bounded_syscall(Some(&halt), Duration::from_secs(5), || {
thread::sleep(Duration::from_secs(5));
0u32
});
assert!(matches!(r, Err(BoundedError::Halted)));
}
#[test]
fn worker_panics() {
let r = bounded_syscall(None, Duration::from_secs(2), || -> u32 {
panic!("intentional test panic");
});
assert!(matches!(r, Err(BoundedError::WorkerLost)));
}
#[test]
fn halt_already_set_before_call_still_returns_halted() {
let halt = Halt::new();
halt.cancel();
let started = Instant::now();
let r = bounded_syscall(Some(&halt), Duration::from_secs(10), || {
thread::sleep(Duration::from_secs(10));
0u32
});
assert!(matches!(r, Err(BoundedError::Halted)));
assert!(
started.elapsed() < Duration::from_secs(2),
"halt-already-set took {:?}",
started.elapsed()
);
}
#[test]
fn ok_path_takes_no_halt_token() {
let flag = Arc::new(AtomicBool::new(false));
let f2 = flag.clone();
let r = bounded_syscall(None, Duration::from_secs(2), move || {
f2.store(true, Ordering::Relaxed);
"ok"
});
assert!(matches!(r, Ok("ok")));
assert!(flag.load(Ordering::Relaxed));
}
}