use std::sync::atomic::{AtomicU64, Ordering};
use crate::sync_primitives::Mutex;
pub struct QsbrDomain {
epoch: AtomicU64,
retired: Mutex<Vec<RetiredEntry>>,
}
struct RetiredEntry {
epoch: u64,
dropper: Option<Box<dyn FnOnce() + Send>>,
}
impl Default for QsbrDomain {
fn default() -> Self {
Self::new()
}
}
impl QsbrDomain {
#[must_use]
pub fn new() -> Self {
Self {
epoch: AtomicU64::new(0),
retired: Mutex::new(Vec::new()),
}
}
#[must_use]
pub fn guard(&self) -> QsbrGuard<'_> {
QsbrGuard {
domain: self,
epoch_taken: self.epoch.load(Ordering::Acquire),
}
}
#[must_use]
pub fn current_epoch(&self) -> u64 {
self.epoch.load(Ordering::Acquire)
}
pub fn advance_epoch(&self) {
self.epoch.fetch_add(1, Ordering::AcqRel);
}
pub fn retire<T: Send + 'static>(&self, obj: T) {
let epoch = self.epoch.load(Ordering::Acquire);
let dropper: Box<dyn FnOnce() + Send> = Box::new(move || {
drop(obj);
});
let mut queue = self.retired.lock();
queue.push(RetiredEntry {
epoch,
dropper: Some(dropper),
});
}
pub fn try_reclaim(&self) -> usize {
let current = self.epoch.load(Ordering::Acquire);
let mut to_drop: Vec<RetiredEntry> = Vec::new();
{
let mut queue = self.retired.lock();
let mut i = 0;
while i < queue.len() {
if queue[i].epoch < current {
to_drop.push(queue.swap_remove(i));
} else {
i += 1;
}
}
}
let reclaimed = to_drop.len();
for mut entry in to_drop {
if let Some(dropper) = entry.dropper.take() {
dropper();
}
}
reclaimed
}
#[must_use]
pub fn pending_retired_count(&self) -> usize {
self.retired.lock().len()
}
}
pub struct QsbrGuard<'a> {
domain: &'a QsbrDomain,
epoch_taken: u64,
}
impl QsbrGuard<'_> {
#[must_use]
pub fn epoch(&self) -> u64 {
self.epoch_taken
}
#[must_use]
pub fn domain(&self) -> &QsbrDomain {
self.domain
}
}
impl Drop for QsbrGuard<'_> {
fn drop(&mut self) {
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
use super::*;
struct DropSpy {
live: Arc<AtomicUsize>,
}
impl DropSpy {
fn new(live: Arc<AtomicUsize>) -> Self {
live.fetch_add(1, Ordering::SeqCst);
Self { live }
}
}
impl Drop for DropSpy {
fn drop(&mut self) {
self.live.fetch_sub(1, Ordering::SeqCst);
}
}
#[test]
fn retire_without_advance_does_not_drop() {
let domain = QsbrDomain::new();
let live = Arc::new(AtomicUsize::new(0));
domain.retire(DropSpy::new(live.clone()));
assert_eq!(live.load(Ordering::SeqCst), 1);
assert_eq!(domain.pending_retired_count(), 1);
let reclaimed = domain.try_reclaim();
assert_eq!(reclaimed, 0);
assert_eq!(live.load(Ordering::SeqCst), 1);
assert_eq!(domain.pending_retired_count(), 1);
}
#[test]
fn advance_epoch_then_reclaim_drops() {
let domain = QsbrDomain::new();
let live = Arc::new(AtomicUsize::new(0));
domain.retire(DropSpy::new(live.clone()));
domain.retire(DropSpy::new(live.clone()));
assert_eq!(live.load(Ordering::SeqCst), 2);
domain.advance_epoch();
let reclaimed = domain.try_reclaim();
assert_eq!(reclaimed, 2);
assert_eq!(live.load(Ordering::SeqCst), 0);
assert_eq!(domain.pending_retired_count(), 0);
domain.retire(DropSpy::new(live.clone()));
assert_eq!(live.load(Ordering::SeqCst), 1);
assert_eq!(domain.try_reclaim(), 0);
assert_eq!(live.load(Ordering::SeqCst), 1);
domain.advance_epoch();
assert_eq!(domain.try_reclaim(), 1);
assert_eq!(live.load(Ordering::SeqCst), 0);
}
#[test]
fn guard_records_current_epoch() {
let domain = QsbrDomain::new();
let g0 = domain.guard();
assert_eq!(g0.epoch(), 0);
drop(g0);
domain.advance_epoch();
let g1 = domain.guard();
assert_eq!(g1.epoch(), 1);
assert_eq!(g1.domain().current_epoch(), 1);
domain.advance_epoch();
assert_eq!(g1.epoch(), 1);
assert_eq!(domain.current_epoch(), 2);
}
#[test]
fn multithreaded_readers_and_retirer() {
let domain = Arc::new(QsbrDomain::new());
let live = Arc::new(AtomicUsize::new(0));
const N_RETIRED: usize = 32;
const N_READERS: usize = 4;
let readers_started = Arc::new(AtomicUsize::new(0));
let allow_readers_finish = Arc::new(AtomicUsize::new(0));
thread::scope(|s| {
let mut reader_handles = Vec::new();
for _ in 0..N_READERS {
let domain = domain.clone();
let readers_started = readers_started.clone();
let allow_readers_finish = allow_readers_finish.clone();
reader_handles.push(s.spawn(move || {
let guard = domain.guard();
let epoch_when_taken = guard.epoch();
readers_started.fetch_add(1, Ordering::SeqCst);
while allow_readers_finish.load(Ordering::SeqCst) == 0 {
thread::sleep(Duration::from_millis(1));
}
epoch_when_taken
}));
}
let retirer = {
let domain = domain.clone();
let live = live.clone();
let readers_started = readers_started.clone();
let allow_readers_finish = allow_readers_finish.clone();
s.spawn(move || {
while readers_started.load(Ordering::SeqCst) < N_READERS {
thread::sleep(Duration::from_millis(1));
}
for _ in 0..N_RETIRED {
domain.retire(DropSpy::new(live.clone()));
}
assert_eq!(live.load(Ordering::SeqCst), N_RETIRED);
let pre = domain.try_reclaim();
assert_eq!(pre, 0, "nothing should reclaim pre-advance");
assert_eq!(live.load(Ordering::SeqCst), N_RETIRED);
domain.advance_epoch();
allow_readers_finish.store(1, Ordering::SeqCst);
})
};
let reader_epochs: Vec<u64> = reader_handles
.into_iter()
.map(|h| h.join().expect("reader thread panicked"))
.collect();
retirer.join().expect("retirer thread panicked");
for e in &reader_epochs {
assert_eq!(*e, 0, "reader saw unexpected epoch {e}");
}
let reclaimed = domain.try_reclaim();
assert_eq!(reclaimed, N_RETIRED);
assert_eq!(live.load(Ordering::SeqCst), 0);
assert_eq!(domain.pending_retired_count(), 0);
});
}
#[test]
fn mixed_epoch_retirement_is_partial() {
let domain = QsbrDomain::new();
let live = Arc::new(AtomicUsize::new(0));
domain.retire(DropSpy::new(live.clone()));
domain.retire(DropSpy::new(live.clone()));
assert_eq!(live.load(Ordering::SeqCst), 2);
domain.advance_epoch();
domain.retire(DropSpy::new(live.clone()));
assert_eq!(live.load(Ordering::SeqCst), 3);
let reclaimed = domain.try_reclaim();
assert_eq!(reclaimed, 2);
assert_eq!(live.load(Ordering::SeqCst), 1);
assert_eq!(domain.pending_retired_count(), 1);
domain.advance_epoch(); let reclaimed2 = domain.try_reclaim();
assert_eq!(reclaimed2, 1);
assert_eq!(live.load(Ordering::SeqCst), 0);
}
}