use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::time::{Duration, Instant};
pub const FFI_HANDLE_FREE_DEADLINE: Duration = Duration::from_secs(5);
pub struct HandleGuard {
freeing: AtomicBool,
active_ops: AtomicU32,
}
impl HandleGuard {
pub const fn new() -> Self {
Self {
freeing: AtomicBool::new(false),
active_ops: AtomicU32::new(0),
}
}
pub fn try_enter(&self) -> Option<HandleOp<'_>> {
self.active_ops.fetch_add(1, Ordering::SeqCst);
if self.freeing.load(Ordering::SeqCst) {
self.active_ops.fetch_sub(1, Ordering::AcqRel);
None
} else {
Some(HandleOp { core: self })
}
}
pub fn begin_free(&self, deadline: Duration) -> bool {
if self
.freeing
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return false;
}
let start = Instant::now();
while self.active_ops.load(Ordering::SeqCst) > 0 {
if start.elapsed() >= deadline {
return false;
}
std::thread::sleep(Duration::from_millis(1));
}
true
}
#[cfg(test)]
pub fn is_freeing(&self) -> bool {
self.freeing.load(Ordering::SeqCst)
}
}
impl Default for HandleGuard {
fn default() -> Self {
Self::new()
}
}
pub struct HandleOp<'a> {
core: &'a HandleGuard,
}
impl Drop for HandleOp<'_> {
fn drop(&mut self) {
self.core.active_ops.fetch_sub(1, Ordering::AcqRel);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
#[test]
fn try_enter_succeeds_and_drop_decrements() {
let g = HandleGuard::new();
{
let _op = g.try_enter().expect("fresh guard must accept ops");
assert_eq!(g.active_ops.load(Ordering::SeqCst), 1);
}
assert_eq!(g.active_ops.load(Ordering::SeqCst), 0);
assert!(g.begin_free(Duration::from_millis(50)));
}
#[test]
fn try_enter_after_free_returns_none() {
let g = HandleGuard::new();
assert!(g.begin_free(Duration::from_millis(50)));
assert!(g.try_enter().is_none());
assert_eq!(g.active_ops.load(Ordering::SeqCst), 0);
}
#[test]
fn begin_free_waits_for_inflight_op() {
let g = Arc::new(HandleGuard::new());
let g_op = g.clone();
let started = Arc::new(AtomicBool::new(false));
let started_op = started.clone();
let worker = std::thread::spawn(move || {
let op = g_op.try_enter().expect("op must enter before free");
started_op.store(true, Ordering::SeqCst);
std::thread::sleep(Duration::from_millis(50));
drop(op);
});
while !started.load(Ordering::SeqCst) {
std::thread::yield_now();
}
let t0 = Instant::now();
let drained = g.begin_free(Duration::from_secs(2));
let elapsed = t0.elapsed();
assert!(drained, "begin_free must drain within deadline");
assert!(
elapsed >= Duration::from_millis(40),
"begin_free returned in {:?} — must have waited for the in-flight op",
elapsed,
);
worker.join().unwrap();
}
#[test]
fn begin_free_times_out_when_op_outlasts_deadline() {
let g = Arc::new(HandleGuard::new());
let g_op = g.clone();
let release = Arc::new(AtomicBool::new(false));
let release_op = release.clone();
let worker = std::thread::spawn(move || {
let op = g_op.try_enter().expect("op must enter");
while !release_op.load(Ordering::SeqCst) {
std::thread::sleep(Duration::from_millis(1));
}
drop(op);
});
std::thread::sleep(Duration::from_millis(20));
let drained = g.begin_free(Duration::from_millis(50));
assert!(!drained, "deadline expired with op still in flight");
assert!(g.is_freeing());
assert!(g.try_enter().is_none());
release.store(true, Ordering::SeqCst);
worker.join().unwrap();
}
#[test]
fn begin_free_has_exactly_one_winner_under_concurrency() {
const ROUNDS: usize = 32;
for _ in 0..ROUNDS {
let g = Arc::new(HandleGuard::new());
let g1 = g.clone();
let g2 = g.clone();
let t1 = std::thread::spawn(move || g1.begin_free(Duration::from_millis(50)));
let t2 = std::thread::spawn(move || g2.begin_free(Duration::from_millis(50)));
let r1 = t1.join().unwrap();
let r2 = t2.join().unwrap();
assert!(
r1 ^ r2,
"exactly one caller must win begin_free; got r1={r1} r2={r2}",
);
}
}
#[test]
fn begin_free_returns_false_on_second_sequential_call() {
let g = HandleGuard::new();
assert!(g.begin_free(Duration::from_millis(50)));
assert!(
!g.begin_free(Duration::from_millis(50)),
"second begin_free must bail — only the first caller \
owns the right to take the inner",
);
}
#[test]
fn begin_free_returns_false_after_timed_out_first_call() {
let g = Arc::new(HandleGuard::new());
let g_op = g.clone();
let release = Arc::new(AtomicBool::new(false));
let release_op = release.clone();
let worker = std::thread::spawn(move || {
let op = g_op.try_enter().expect("op must enter");
while !release_op.load(Ordering::SeqCst) {
std::thread::sleep(Duration::from_millis(1));
}
drop(op);
});
std::thread::sleep(Duration::from_millis(20));
assert!(!g.begin_free(Duration::from_millis(40)));
release.store(true, Ordering::SeqCst);
worker.join().unwrap();
assert!(
!g.begin_free(Duration::from_millis(50)),
"second begin_free after a timed-out first call must bail",
);
}
}