use std::{
collections::HashMap,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::{Duration, Instant},
};
use crossbeam_epoch::{self as epoch, Guard};
use fsqlite_types::sync_primitives::Mutex;
use serde::Serialize;
pub static GLOBAL_EBR_METRICS: EbrMetrics = EbrMetrics::new();
pub struct EbrMetrics {
pub retirements_deferred_total: AtomicU64,
pub flush_calls_total: AtomicU64,
pub guards_pinned_total: AtomicU64,
pub guards_unpinned_total: AtomicU64,
pub stale_reader_warnings_total: AtomicU64,
pub active_guards_high_water: AtomicU64,
pub max_chain_length_observed: AtomicU64,
pub chain_length_samples_total: AtomicU64,
pub chain_length_sum_total: AtomicU64,
pub gc_freed_count: AtomicU64,
pub gc_blocked_count: AtomicU64,
}
impl EbrMetrics {
#[must_use]
pub const fn new() -> Self {
Self {
retirements_deferred_total: AtomicU64::new(0),
flush_calls_total: AtomicU64::new(0),
guards_pinned_total: AtomicU64::new(0),
guards_unpinned_total: AtomicU64::new(0),
stale_reader_warnings_total: AtomicU64::new(0),
active_guards_high_water: AtomicU64::new(0),
max_chain_length_observed: AtomicU64::new(0),
chain_length_samples_total: AtomicU64::new(0),
chain_length_sum_total: AtomicU64::new(0),
gc_freed_count: AtomicU64::new(0),
gc_blocked_count: AtomicU64::new(0),
}
}
pub fn record_retirement_deferred(&self) {
self.retirements_deferred_total
.fetch_add(1, Ordering::Relaxed);
}
pub fn record_flush(&self) {
self.flush_calls_total.fetch_add(1, Ordering::Relaxed);
}
pub fn record_guard_pinned(&self, current_active: u64) {
self.guards_pinned_total.fetch_add(1, Ordering::Relaxed);
self.active_guards_high_water
.fetch_max(current_active, Ordering::Relaxed);
}
pub fn record_guard_unpinned(&self) {
self.guards_unpinned_total.fetch_add(1, Ordering::Relaxed);
}
pub fn record_stale_warnings(&self, count: u64) {
self.stale_reader_warnings_total
.fetch_add(count, Ordering::Relaxed);
}
pub fn record_chain_length_sample(&self, chain_len: u64) {
self.chain_length_samples_total
.fetch_add(1, Ordering::Relaxed);
self.chain_length_sum_total
.fetch_add(chain_len, Ordering::Relaxed);
self.max_chain_length_observed
.fetch_max(chain_len, Ordering::Relaxed);
}
pub fn record_gc_freed(&self, count: u64) {
self.gc_freed_count.fetch_add(count, Ordering::Relaxed);
}
pub fn record_gc_blocked(&self) {
self.gc_blocked_count.fetch_add(1, Ordering::Relaxed);
}
#[must_use]
pub fn snapshot(&self) -> EbrMetricsSnapshot {
EbrMetricsSnapshot {
retirements_deferred_total: self.retirements_deferred_total.load(Ordering::Relaxed),
flush_calls_total: self.flush_calls_total.load(Ordering::Relaxed),
guards_pinned_total: self.guards_pinned_total.load(Ordering::Relaxed),
guards_unpinned_total: self.guards_unpinned_total.load(Ordering::Relaxed),
stale_reader_warnings_total: self.stale_reader_warnings_total.load(Ordering::Relaxed),
active_guards_high_water: self.active_guards_high_water.load(Ordering::Relaxed),
max_chain_length_observed: self.max_chain_length_observed.load(Ordering::Relaxed),
chain_length_samples_total: self.chain_length_samples_total.load(Ordering::Relaxed),
chain_length_sum_total: self.chain_length_sum_total.load(Ordering::Relaxed),
gc_freed_count: self.gc_freed_count.load(Ordering::Relaxed),
gc_blocked_count: self.gc_blocked_count.load(Ordering::Relaxed),
}
}
pub fn reset(&self) {
self.retirements_deferred_total.store(0, Ordering::Relaxed);
self.flush_calls_total.store(0, Ordering::Relaxed);
self.guards_pinned_total.store(0, Ordering::Relaxed);
self.guards_unpinned_total.store(0, Ordering::Relaxed);
self.stale_reader_warnings_total.store(0, Ordering::Relaxed);
self.active_guards_high_water.store(0, Ordering::Relaxed);
self.max_chain_length_observed.store(0, Ordering::Relaxed);
self.chain_length_samples_total.store(0, Ordering::Relaxed);
self.chain_length_sum_total.store(0, Ordering::Relaxed);
self.gc_freed_count.store(0, Ordering::Relaxed);
self.gc_blocked_count.store(0, Ordering::Relaxed);
}
}
impl Default for EbrMetrics {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub struct EbrMetricsSnapshot {
pub retirements_deferred_total: u64,
pub flush_calls_total: u64,
pub guards_pinned_total: u64,
pub guards_unpinned_total: u64,
pub stale_reader_warnings_total: u64,
pub active_guards_high_water: u64,
pub max_chain_length_observed: u64,
pub chain_length_samples_total: u64,
pub chain_length_sum_total: u64,
pub gc_freed_count: u64,
pub gc_blocked_count: u64,
}
impl EbrMetricsSnapshot {
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn avg_chain_length(self) -> f64 {
if self.chain_length_samples_total == 0 {
0.0
} else {
self.chain_length_sum_total as f64 / self.chain_length_samples_total as f64
}
}
}
impl std::fmt::Display for EbrMetricsSnapshot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ebr(retired={} flushed={} pinned={} unpinned={} stale_warn={} hw={} chain_max={} chain_avg={:.2} gc_freed={} gc_blocked={})",
self.retirements_deferred_total,
self.flush_calls_total,
self.guards_pinned_total,
self.guards_unpinned_total,
self.stale_reader_warnings_total,
self.active_guards_high_water,
self.max_chain_length_observed,
self.avg_chain_length(),
self.gc_freed_count,
self.gc_blocked_count,
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StaleReaderConfig {
pub warn_after: Duration,
pub warn_every: Duration,
}
impl Default for StaleReaderConfig {
fn default() -> Self {
Self {
warn_after: Duration::from_secs(30),
warn_every: Duration::from_secs(5),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ReaderPinSnapshot {
pub guard_id: u64,
pub pinned_for: Duration,
}
#[derive(Debug, Clone, Copy)]
struct ReaderPinState {
pinned_at: Instant,
last_warned_at: Option<Instant>,
}
#[derive(Debug)]
pub struct VersionGuardRegistry {
stale_reader: StaleReaderConfig,
next_guard_id: AtomicU64,
active: Mutex<HashMap<u64, ReaderPinState>>,
}
impl VersionGuardRegistry {
#[must_use]
pub fn new(stale_reader: StaleReaderConfig) -> Self {
Self {
stale_reader,
next_guard_id: AtomicU64::new(1),
active: Mutex::new(HashMap::new()),
}
}
#[must_use]
pub const fn stale_reader_config(&self) -> StaleReaderConfig {
self.stale_reader
}
#[must_use]
pub fn active_guard_count(&self) -> usize {
self.active.lock().len()
}
#[must_use]
pub fn stale_reader_snapshots(&self, now: Instant) -> Vec<ReaderPinSnapshot> {
self.active
.lock()
.iter()
.filter_map(|(&guard_id, state)| {
let pinned_for = now.saturating_duration_since(state.pinned_at);
if pinned_for >= self.stale_reader.warn_after {
Some(ReaderPinSnapshot {
guard_id,
pinned_for,
})
} else {
None
}
})
.collect()
}
pub fn warn_on_stale_readers(&self, now: Instant) -> usize {
let mut warned = 0_usize;
let mut active = self.active.lock();
for (&guard_id, state) in active.iter_mut() {
let pinned_for = now.saturating_duration_since(state.pinned_at);
if pinned_for < self.stale_reader.warn_after {
continue;
}
let should_warn = state.last_warned_at.is_none_or(|last| {
now.saturating_duration_since(last) >= self.stale_reader.warn_every
});
if should_warn {
tracing::warn!(
guard_id,
pinned_for_ms = pinned_for.as_millis(),
stale_warn_after_ms = self.stale_reader.warn_after.as_millis(),
"stale MVCC reader pin is blocking epoch advancement"
);
state.last_warned_at = Some(now);
warned += 1;
}
}
drop(active);
if warned > 0 {
GLOBAL_EBR_METRICS.record_stale_warnings(warned as u64);
}
warned
}
fn register_guard(&self, pinned_at: Instant) -> u64 {
let guard_id = self.next_guard_id.fetch_add(1, Ordering::Relaxed);
self.active.lock().insert(
guard_id,
ReaderPinState {
pinned_at,
last_warned_at: None,
},
);
guard_id
}
fn unregister_guard(&self, guard_id: u64) -> Option<Duration> {
self.active
.lock()
.remove(&guard_id)
.map(|state| state.pinned_at.elapsed())
}
}
impl Default for VersionGuardRegistry {
fn default() -> Self {
Self::new(StaleReaderConfig::default())
}
}
#[derive(Debug)]
pub struct VersionGuard {
registry: Arc<VersionGuardRegistry>,
guard_id: u64,
pinned_at: Instant,
guard: Guard,
}
impl VersionGuard {
#[must_use]
pub fn pin(registry: Arc<VersionGuardRegistry>) -> Self {
let pinned_at = Instant::now();
let guard_id = registry.register_guard(pinned_at);
let guard = epoch::pin();
let active_count = registry.active_guard_count() as u64;
GLOBAL_EBR_METRICS.record_guard_pinned(active_count);
tracing::trace!(
target: "fsqlite_mvcc::ebr",
guard_id,
active_guards = active_count,
"epoch guard pinned"
);
Self {
registry,
guard_id,
pinned_at,
guard,
}
}
#[must_use]
pub const fn guard_id(&self) -> u64 {
self.guard_id
}
#[must_use]
pub fn pinned_for(&self) -> Duration {
self.pinned_at.elapsed()
}
pub fn defer_retire<T>(&self, retired: T)
where
T: Send + 'static,
{
GLOBAL_EBR_METRICS.record_retirement_deferred();
self.guard.defer(move || drop(retired));
}
pub fn defer_retire_with<F, R>(&self, retire: F)
where
F: FnOnce() -> R + Send + 'static,
{
GLOBAL_EBR_METRICS.record_retirement_deferred();
self.guard.defer(retire);
}
pub fn flush(&self) {
GLOBAL_EBR_METRICS.record_flush();
self.guard.flush();
}
}
impl Drop for VersionGuard {
fn drop(&mut self) {
GLOBAL_EBR_METRICS.record_guard_unpinned();
let pinned_for = self
.registry
.unregister_guard(self.guard_id)
.unwrap_or_else(|| self.pinned_at.elapsed());
tracing::trace!(
target: "fsqlite_mvcc::ebr",
guard_id = self.guard_id,
pinned_for_us = pinned_for.as_micros(),
"epoch guard unpinned"
);
if pinned_for >= self.registry.stale_reader_config().warn_after {
tracing::warn!(
guard_id = self.guard_id,
pinned_for_ms = pinned_for.as_millis(),
stale_warn_after_ms = self.registry.stale_reader_config().warn_after.as_millis(),
"MVCC reader pin ended after stale threshold"
);
}
}
}
#[derive(Debug)]
pub struct VersionGuardTicket {
registry: Arc<VersionGuardRegistry>,
guard_id: u64,
pinned_at: Instant,
}
impl VersionGuardTicket {
#[must_use]
pub fn register(registry: Arc<VersionGuardRegistry>) -> Self {
let pinned_at = Instant::now();
let guard_id = registry.register_guard(pinned_at);
let active_count = registry.active_guard_count() as u64;
GLOBAL_EBR_METRICS.record_guard_pinned(active_count);
tracing::trace!(
target: "fsqlite_mvcc::ebr",
guard_id,
active_guards = active_count,
"epoch ticket registered"
);
Self {
registry,
guard_id,
pinned_at,
}
}
#[must_use]
pub const fn guard_id(&self) -> u64 {
self.guard_id
}
#[must_use]
pub fn registered_for(&self) -> Duration {
self.pinned_at.elapsed()
}
#[must_use]
pub fn registry(&self) -> &Arc<VersionGuardRegistry> {
&self.registry
}
pub fn defer_retire<T: Send + 'static>(&self, retired: T) {
GLOBAL_EBR_METRICS.record_retirement_deferred();
let guard = epoch::pin();
guard.defer(move || drop(retired));
guard.flush();
}
pub fn defer_retire_with<F, R>(&self, retire: F)
where
F: FnOnce() -> R + Send + 'static,
{
GLOBAL_EBR_METRICS.record_retirement_deferred();
let guard = epoch::pin();
guard.defer(retire);
guard.flush();
}
}
impl Drop for VersionGuardTicket {
fn drop(&mut self) {
GLOBAL_EBR_METRICS.record_guard_unpinned();
let pinned_for = self
.registry
.unregister_guard(self.guard_id)
.unwrap_or_else(|| self.pinned_at.elapsed());
tracing::trace!(
target: "fsqlite_mvcc::ebr",
guard_id = self.guard_id,
registered_for_us = pinned_for.as_micros(),
"epoch ticket unregistered"
);
if pinned_for >= self.registry.stale_reader_config().warn_after {
tracing::warn!(
guard_id = self.guard_id,
pinned_for_ms = pinned_for.as_millis(),
stale_warn_after_ms = self.registry.stale_reader_config().warn_after.as_millis(),
"MVCC reader registration ended after stale threshold"
);
}
}
}
use crate::core_types::VersionIdx;
#[derive(Debug)]
pub struct EbrRetireQueue {
pending: Mutex<Vec<VersionIdx>>,
oldest_retire_epoch: AtomicU64,
total_retired: AtomicU64,
total_recycled: AtomicU64,
}
impl EbrRetireQueue {
#[must_use]
pub fn new() -> Self {
Self {
pending: Mutex::new(Vec::new()),
oldest_retire_epoch: AtomicU64::new(0),
total_retired: AtomicU64::new(0),
total_recycled: AtomicU64::new(0),
}
}
pub fn retire(&self, idx: VersionIdx, current_epoch: u64) {
let mut pending = self.pending.lock();
if pending.is_empty() {
self.oldest_retire_epoch
.store(current_epoch, Ordering::Relaxed);
}
pending.push(idx);
drop(pending);
self.total_retired.fetch_add(1, Ordering::Relaxed);
GLOBAL_EBR_METRICS.record_retirement_deferred();
}
pub fn retire_batch(&self, indices: impl IntoIterator<Item = VersionIdx>, current_epoch: u64) {
let mut pending = self.pending.lock();
let was_empty = pending.is_empty();
let mut count = 0_u64;
for idx in indices {
pending.push(idx);
count += 1;
}
if was_empty && count > 0 {
self.oldest_retire_epoch
.store(current_epoch, Ordering::Relaxed);
}
drop(pending);
self.total_retired.fetch_add(count, Ordering::Relaxed);
for _ in 0..count {
GLOBAL_EBR_METRICS.record_retirement_deferred();
}
}
#[must_use]
pub fn drain_if_safe(&self, current_epoch: u64, min_epoch_gap: u64) -> Vec<VersionIdx> {
let oldest = self.oldest_retire_epoch.load(Ordering::Relaxed);
let mut pending = self.pending.lock();
if pending.is_empty() {
return Vec::new();
}
if current_epoch.saturating_sub(oldest) < min_epoch_gap {
return Vec::new();
}
let drained = std::mem::take(&mut *pending);
drop(pending);
let count = drained.len() as u64;
if count > 0 {
self.total_recycled.fetch_add(count, Ordering::Relaxed);
GLOBAL_EBR_METRICS.record_gc_freed(count);
}
drained
}
#[must_use]
pub fn force_drain(&self) -> Vec<VersionIdx> {
let mut pending = self.pending.lock();
let drained = std::mem::take(&mut *pending);
drop(pending);
let count = drained.len() as u64;
if count > 0 {
self.oldest_retire_epoch.store(0, Ordering::Relaxed);
self.total_recycled.fetch_add(count, Ordering::Relaxed);
GLOBAL_EBR_METRICS.record_gc_freed(count);
}
drained
}
#[must_use]
pub fn pending_count(&self) -> usize {
self.pending.lock().len()
}
#[must_use]
pub fn total_retired(&self) -> u64 {
self.total_retired.load(Ordering::Relaxed)
}
#[must_use]
pub fn total_recycled(&self) -> u64 {
self.total_recycled.load(Ordering::Relaxed)
}
}
impl Default for EbrRetireQueue {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use std::{
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
thread,
time::{Duration, Instant},
};
use crossbeam_epoch as epoch;
use proptest::{prelude::*, test_runner::Config as ProptestConfig};
use super::{
EbrMetrics, GLOBAL_EBR_METRICS, StaleReaderConfig, VersionGuard, VersionGuardRegistry,
VersionGuardTicket,
};
#[test]
fn version_guard_registers_and_unregisters() {
let registry = Arc::new(VersionGuardRegistry::new(StaleReaderConfig {
warn_after: Duration::from_secs(60),
warn_every: Duration::from_secs(10),
}));
assert_eq!(registry.active_guard_count(), 0);
{
let guard = VersionGuard::pin(Arc::clone(®istry));
assert_eq!(registry.active_guard_count(), 1);
assert!(guard.pinned_for() < Duration::from_secs(1));
}
assert_eq!(registry.active_guard_count(), 0);
}
#[test]
fn nested_version_guards_pin_and_unpin_independently() {
let registry = Arc::new(VersionGuardRegistry::default());
let before = GLOBAL_EBR_METRICS.snapshot();
{
let outer = VersionGuard::pin(Arc::clone(®istry));
assert_eq!(registry.active_guard_count(), 1);
assert!(outer.pinned_for() < Duration::from_secs(1));
{
let inner = VersionGuard::pin(Arc::clone(®istry));
assert_eq!(registry.active_guard_count(), 2);
assert!(inner.pinned_for() < Duration::from_secs(1));
}
assert_eq!(
registry.active_guard_count(),
1,
"dropping inner guard must keep outer guard active"
);
}
assert_eq!(registry.active_guard_count(), 0);
let after = GLOBAL_EBR_METRICS.snapshot();
assert!(
after.guards_pinned_total >= before.guards_pinned_total + 2,
"nested guards should register two pin events"
);
assert!(
after.guards_unpinned_total >= before.guards_unpinned_total + 2,
"nested guards should register two unpin events"
);
}
#[test]
fn stale_reader_snapshots_report_long_pins() {
let registry = Arc::new(VersionGuardRegistry::new(StaleReaderConfig {
warn_after: Duration::from_millis(5),
warn_every: Duration::from_millis(5),
}));
let _guard = VersionGuard::pin(Arc::clone(®istry));
thread::sleep(Duration::from_millis(10));
let stale = registry.stale_reader_snapshots(Instant::now());
assert_eq!(stale.len(), 1);
assert!(stale[0].pinned_for >= Duration::from_millis(5));
}
#[test]
fn stale_reader_warning_is_rate_limited() {
let registry = Arc::new(VersionGuardRegistry::new(StaleReaderConfig {
warn_after: Duration::ZERO,
warn_every: Duration::from_millis(5),
}));
let _guard = VersionGuard::pin(Arc::clone(®istry));
let base = Instant::now();
assert_eq!(registry.warn_on_stale_readers(base), 1);
assert_eq!(
registry.warn_on_stale_readers(base + Duration::from_millis(1)),
0
);
assert_eq!(
registry.warn_on_stale_readers(base + Duration::from_millis(6)),
1
);
}
#[derive(Clone)]
struct DropCounter(Arc<AtomicUsize>);
impl Drop for DropCounter {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
#[test]
fn deferred_retirement_executes_after_unpin() {
let registry = Arc::new(VersionGuardRegistry::default());
let dropped = Arc::new(AtomicUsize::new(0));
{
let guard = VersionGuard::pin(Arc::clone(®istry));
guard.defer_retire(DropCounter(Arc::clone(&dropped)));
guard.flush();
assert_eq!(dropped.load(Ordering::SeqCst), 0);
}
let deadline = Instant::now() + Duration::from_secs(2);
while dropped.load(Ordering::SeqCst) < 1 && Instant::now() < deadline {
let flush_guard = epoch::pin();
flush_guard.flush();
thread::yield_now();
thread::sleep(Duration::from_micros(50));
}
assert_eq!(
dropped.load(Ordering::SeqCst),
1,
"deferred retirement should reclaim after guard drop"
);
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 2_500,
.. ProptestConfig::default()
})]
#[test]
fn prop_deferred_retire_respects_pin_lifetime_and_eventually_reclaims(
deferred_count in 1_u8..33,
) {
let registry = Arc::new(VersionGuardRegistry::default());
let dropped = Arc::new(AtomicUsize::new(0));
let expected = usize::from(deferred_count);
{
let guard = VersionGuard::pin(Arc::clone(®istry));
for _ in 0..expected {
guard.defer_retire(DropCounter(Arc::clone(&dropped)));
}
guard.flush();
prop_assert_eq!(dropped.load(Ordering::SeqCst), 0);
}
let deadline = Instant::now() + Duration::from_secs(2);
while dropped.load(Ordering::SeqCst) < expected && Instant::now() < deadline {
let flush_guard = epoch::pin();
flush_guard.flush();
thread::yield_now();
thread::sleep(Duration::from_micros(50));
}
prop_assert_eq!(dropped.load(Ordering::SeqCst), expected);
}
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 1_000,
.. ProptestConfig::default()
})]
#[test]
fn prop_thread_termination_does_not_lose_deferred_retirements(
deferred_count in 1_u8..17,
) {
let registry = Arc::new(VersionGuardRegistry::default());
let dropped = Arc::new(AtomicUsize::new(0));
let expected = usize::from(deferred_count);
let worker_registry = Arc::clone(®istry);
let worker_dropped = Arc::clone(&dropped);
let worker = thread::spawn(move || {
let ticket = VersionGuardTicket::register(worker_registry);
for _ in 0..expected {
ticket.defer_retire(DropCounter(Arc::clone(&worker_dropped)));
}
});
worker.join().expect("worker thread must not panic");
prop_assert_eq!(registry.active_guard_count(), 0);
let deadline = Instant::now() + Duration::from_secs(2);
while dropped.load(Ordering::SeqCst) < expected && Instant::now() < deadline {
let flush_guard = epoch::pin();
flush_guard.flush();
thread::yield_now();
thread::sleep(Duration::from_micros(50));
}
prop_assert_eq!(dropped.load(Ordering::SeqCst), expected);
}
}
#[test]
fn ebr_metrics_basic_recording() {
let m = EbrMetrics::new();
m.record_retirement_deferred();
m.record_retirement_deferred();
m.record_flush();
m.record_guard_pinned(1);
m.record_guard_unpinned();
m.record_stale_warnings(2);
m.record_chain_length_sample(5);
m.record_chain_length_sample(9);
m.record_gc_freed(7);
m.record_gc_blocked();
let snap = m.snapshot();
assert_eq!(snap.retirements_deferred_total, 2);
assert_eq!(snap.flush_calls_total, 1);
assert_eq!(snap.guards_pinned_total, 1);
assert_eq!(snap.guards_unpinned_total, 1);
assert_eq!(snap.stale_reader_warnings_total, 2);
assert_eq!(snap.active_guards_high_water, 1);
assert_eq!(snap.max_chain_length_observed, 9);
assert_eq!(snap.chain_length_samples_total, 2);
assert_eq!(snap.chain_length_sum_total, 14);
assert_eq!(snap.gc_freed_count, 7);
assert_eq!(snap.gc_blocked_count, 1);
assert!((snap.avg_chain_length() - 7.0).abs() < f64::EPSILON);
}
#[test]
fn ebr_metrics_reset() {
let m = EbrMetrics::new();
m.record_retirement_deferred();
m.record_guard_pinned(5);
m.record_chain_length_sample(12);
m.record_gc_freed(3);
m.record_gc_blocked();
assert!(m.retirements_deferred_total.load(Ordering::Relaxed) > 0);
m.reset();
let snap = m.snapshot();
assert_eq!(snap.retirements_deferred_total, 0);
assert_eq!(snap.guards_pinned_total, 0);
assert_eq!(snap.active_guards_high_water, 0);
assert_eq!(snap.max_chain_length_observed, 0);
assert_eq!(snap.chain_length_samples_total, 0);
assert_eq!(snap.chain_length_sum_total, 0);
assert_eq!(snap.gc_freed_count, 0);
assert_eq!(snap.gc_blocked_count, 0);
}
#[test]
fn ebr_metrics_high_water_mark_monotonic() {
let m = EbrMetrics::new();
m.record_guard_pinned(3);
assert_eq!(m.snapshot().active_guards_high_water, 3);
m.record_guard_pinned(1);
assert_eq!(m.snapshot().active_guards_high_water, 3);
m.record_guard_pinned(7);
assert_eq!(m.snapshot().active_guards_high_water, 7);
}
#[test]
fn ebr_metrics_display() {
let m = EbrMetrics::new();
m.record_retirement_deferred();
m.record_flush();
m.record_guard_pinned(1);
m.record_chain_length_sample(8);
let display = format!("{}", m.snapshot());
assert!(display.contains("retired=1"));
assert!(display.contains("flushed=1"));
assert!(display.contains("pinned=1"));
assert!(display.contains("chain_max=8"));
}
#[test]
fn ebr_metrics_snapshot_serializable() {
let m = EbrMetrics::new();
m.record_retirement_deferred();
m.record_guard_pinned(2);
m.record_chain_length_sample(4);
let snap = m.snapshot();
let json = serde_json::to_string(&snap).unwrap();
assert!(json.contains("\"retirements_deferred_total\":1"));
assert!(json.contains("\"active_guards_high_water\":2"));
assert!(json.contains("\"max_chain_length_observed\":4"));
}
#[test]
fn ebr_metrics_guard_lifecycle_records() {
let registry = Arc::new(VersionGuardRegistry::default());
let before = GLOBAL_EBR_METRICS.snapshot();
{
let guard = VersionGuard::pin(Arc::clone(®istry));
let after_pin = GLOBAL_EBR_METRICS.snapshot();
assert!(
after_pin.guards_pinned_total - before.guards_pinned_total >= 1,
"expected at least 1 pin"
);
guard.defer_retire(42_u64);
let after_retire = GLOBAL_EBR_METRICS.snapshot();
assert!(
after_retire.retirements_deferred_total - before.retirements_deferred_total >= 1,
"expected at least 1 retirement"
);
guard.flush();
let after_flush = GLOBAL_EBR_METRICS.snapshot();
assert!(
after_flush.flush_calls_total - before.flush_calls_total >= 1,
"expected at least 1 flush"
);
}
let after_drop = GLOBAL_EBR_METRICS.snapshot();
assert!(
after_drop.guards_unpinned_total - before.guards_unpinned_total >= 1,
"expected at least 1 unpin"
);
}
#[test]
fn ebr_metrics_ticket_lifecycle_records() {
let registry = Arc::new(VersionGuardRegistry::default());
let before = GLOBAL_EBR_METRICS.snapshot();
{
let ticket = VersionGuardTicket::register(Arc::clone(®istry));
let after_reg = GLOBAL_EBR_METRICS.snapshot();
assert!(
after_reg.guards_pinned_total > before.guards_pinned_total,
"ticket registration should record at least one pin event"
);
ticket.defer_retire(99_u32);
let after_retire = GLOBAL_EBR_METRICS.snapshot();
assert!(
after_retire.retirements_deferred_total > before.retirements_deferred_total,
"ticket defer_retire should record at least one retirement"
);
}
let after_drop = GLOBAL_EBR_METRICS.snapshot();
assert!(
after_drop.guards_unpinned_total > before.guards_unpinned_total,
"ticket drop should record at least one unpin event"
);
}
#[test]
fn ebr_metrics_stale_warning_records() {
let before = GLOBAL_EBR_METRICS.snapshot();
let registry = Arc::new(VersionGuardRegistry::new(StaleReaderConfig {
warn_after: Duration::ZERO,
warn_every: Duration::ZERO,
}));
let _guard = VersionGuard::pin(Arc::clone(®istry));
let warned = registry.warn_on_stale_readers(Instant::now());
assert!(warned > 0);
let after = GLOBAL_EBR_METRICS.snapshot();
assert!(
after.stale_reader_warnings_total > before.stale_reader_warnings_total,
"stale warnings should have been recorded"
);
}
use super::{EbrRetireQueue, VersionIdx};
#[test]
fn ebr_retire_queue_retire_and_drain() {
let queue = EbrRetireQueue::new();
assert_eq!(queue.pending_count(), 0);
assert_eq!(queue.total_retired(), 0);
assert_eq!(queue.total_recycled(), 0);
let idx1 = VersionIdx::new(0, 1, 0);
let idx2 = VersionIdx::new(0, 2, 0);
let idx3 = VersionIdx::new(0, 3, 0);
queue.retire(idx1, 0);
queue.retire(idx2, 0);
queue.retire(idx3, 0);
assert_eq!(queue.pending_count(), 3);
assert_eq!(queue.total_retired(), 3);
let drained = queue.drain_if_safe(1, 2);
assert!(drained.is_empty(), "should not drain at epoch 1");
assert_eq!(queue.pending_count(), 3);
let drained = queue.drain_if_safe(2, 2);
assert_eq!(drained.len(), 3);
assert_eq!(queue.pending_count(), 0);
assert_eq!(queue.total_recycled(), 3);
let drained = queue.drain_if_safe(10, 2);
assert!(drained.is_empty());
}
#[test]
fn ebr_retire_queue_batch_retire() {
let queue = EbrRetireQueue::new();
let indices: Vec<VersionIdx> = (0..10).map(|i| VersionIdx::new(0, i, 0)).collect();
queue.retire_batch(indices.iter().copied(), 5);
assert_eq!(queue.pending_count(), 10);
assert_eq!(queue.total_retired(), 10);
let drained = queue.drain_if_safe(7, 2);
assert_eq!(drained.len(), 10);
assert_eq!(queue.total_recycled(), 10);
}
#[test]
fn ebr_retire_queue_force_drain() {
let queue = EbrRetireQueue::new();
let idx = VersionIdx::new(1, 5, 99);
queue.retire(idx, 0);
assert_eq!(queue.pending_count(), 1);
let drained = queue.force_drain();
assert_eq!(drained.len(), 1);
assert_eq!(drained[0], idx);
assert_eq!(queue.pending_count(), 0);
assert_eq!(queue.total_recycled(), 1);
}
#[test]
fn ebr_retire_queue_multiple_epochs() {
let queue = EbrRetireQueue::new();
queue.retire(VersionIdx::new(0, 0, 0), 0);
queue.retire(VersionIdx::new(0, 1, 0), 0);
queue.retire(VersionIdx::new(0, 2, 0), 3);
assert_eq!(queue.pending_count(), 3);
let drained = queue.drain_if_safe(1, 2);
assert!(drained.is_empty(), "epoch 1: gap of 1 is not sufficient");
let drained = queue.drain_if_safe(2, 2);
assert_eq!(drained.len(), 3, "epoch 2: gap of 2 is sufficient");
}
#[test]
fn ebr_retire_queue_empty_drain() {
let queue = EbrRetireQueue::new();
let drained = queue.drain_if_safe(100, 2);
assert!(drained.is_empty());
let drained = queue.force_drain();
assert!(drained.is_empty());
}
}