use crate::trace::distributed::DistTraceId;
use crate::util::stack_trace;
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{SystemTime, UNIX_EPOCH};
#[inline]
fn channel_atomicity_now() -> SystemTime {
#[cfg(any(test, feature = "deterministic-mode"))]
{
UNIX_EPOCH
}
#[cfg(not(any(test, feature = "deterministic-mode")))]
{
SystemTime::now()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ReservationId(pub u64);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ChannelId(pub u64);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct WakerId(pub u64);
#[derive(Debug, Clone)]
#[allow(clippy::struct_excessive_bools)]
pub struct ChannelAtomicityConfig {
pub track_reservations: bool,
pub track_wakers: bool,
pub enforcement: EnforcementMode,
pub structured_logging: bool,
pub include_stack_traces: bool,
pub enable_replay_commands: bool,
pub max_violations_tracked: usize,
pub max_reservation_age_seconds: u64,
pub max_reservations_per_channel: usize,
}
impl Default for ChannelAtomicityConfig {
fn default() -> Self {
Self {
track_reservations: true,
track_wakers: true,
enforcement: EnforcementMode::Warn,
structured_logging: false,
include_stack_traces: false,
enable_replay_commands: false,
max_violations_tracked: 1000,
max_reservation_age_seconds: 3600, max_reservations_per_channel: 10000,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EnforcementMode {
Collect,
Warn,
Panic,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ChannelAtomicityViolation {
ReservationLeak {
reservation_id: ReservationId,
channel_id: ChannelId,
created_at: SystemTime,
trace_id: Option<DistTraceId>,
},
DoubleCommit {
reservation_id: ReservationId,
channel_id: ChannelId,
first_commit_at: SystemTime,
second_commit_at: SystemTime,
trace_id: Option<DistTraceId>,
},
DoubleAbort {
reservation_id: ReservationId,
channel_id: ChannelId,
first_abort_at: SystemTime,
second_abort_at: SystemTime,
trace_id: Option<DistTraceId>,
},
UseAfterCommit {
reservation_id: ReservationId,
channel_id: ChannelId,
commit_at: SystemTime,
use_at: SystemTime,
operation: String,
trace_id: Option<DistTraceId>,
},
UseAfterAbort {
reservation_id: ReservationId,
channel_id: ChannelId,
abort_at: SystemTime,
use_at: SystemTime,
operation: String,
trace_id: Option<DistTraceId>,
},
LostWakeup {
waker_id: WakerId,
channel_id: ChannelId,
expected_at: SystemTime,
detected_at: SystemTime,
trace_id: Option<DistTraceId>,
},
SpuriousWakeup {
waker_id: WakerId,
channel_id: ChannelId,
wakeup_at: SystemTime,
trace_id: Option<DistTraceId>,
},
DataLossOnCancel {
channel_id: ChannelId,
data_size: usize,
cancel_at: SystemTime,
trace_id: Option<DistTraceId>,
},
}
impl std::fmt::Display for ChannelAtomicityViolation {
#[allow(clippy::too_many_lines)]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ReservationLeak {
reservation_id,
channel_id,
created_at,
trace_id,
} => {
write!(
f,
"Reservation leak: {reservation_id:?} on channel {channel_id:?} created at {created_at:?}"
)?;
if let Some(trace) = trace_id {
write!(f, " (trace: {trace:?})")?;
}
Ok(())
}
Self::DoubleCommit {
reservation_id,
channel_id,
first_commit_at,
second_commit_at,
trace_id,
} => {
write!(
f,
"Double commit: {reservation_id:?} on channel {channel_id:?} first at {first_commit_at:?}, second at {second_commit_at:?}"
)?;
if let Some(trace) = trace_id {
write!(f, " (trace: {trace:?})")?;
}
Ok(())
}
Self::DoubleAbort {
reservation_id,
channel_id,
first_abort_at,
second_abort_at,
trace_id,
} => {
write!(
f,
"Double abort: {reservation_id:?} on channel {channel_id:?} first at {first_abort_at:?}, second at {second_abort_at:?}"
)?;
if let Some(trace) = trace_id {
write!(f, " (trace: {trace:?})")?;
}
Ok(())
}
Self::UseAfterCommit {
reservation_id,
channel_id,
commit_at,
use_at,
operation,
trace_id,
} => {
write!(
f,
"Use after commit: {reservation_id:?} on channel {channel_id:?} committed at {commit_at:?}, used at {use_at:?} for {operation}"
)?;
if let Some(trace) = trace_id {
write!(f, " (trace: {trace:?})")?;
}
Ok(())
}
Self::UseAfterAbort {
reservation_id,
channel_id,
abort_at,
use_at,
operation,
trace_id,
} => {
write!(
f,
"Use after abort: {reservation_id:?} on channel {channel_id:?} aborted at {abort_at:?}, used at {use_at:?} for {operation}"
)?;
if let Some(trace) = trace_id {
write!(f, " (trace: {trace:?})")?;
}
Ok(())
}
Self::LostWakeup {
waker_id,
channel_id,
expected_at,
detected_at,
trace_id,
} => {
write!(
f,
"Lost wakeup: {waker_id:?} on channel {channel_id:?} expected at {expected_at:?}, detected at {detected_at:?}"
)?;
if let Some(trace) = trace_id {
write!(f, " (trace: {trace:?})")?;
}
Ok(())
}
Self::SpuriousWakeup {
waker_id,
channel_id,
wakeup_at,
trace_id,
} => {
write!(
f,
"Spurious wakeup: {waker_id:?} on channel {channel_id:?} at {wakeup_at:?}"
)?;
if let Some(trace) = trace_id {
write!(f, " (trace: {trace:?})")?;
}
Ok(())
}
Self::DataLossOnCancel {
channel_id,
data_size,
cancel_at,
trace_id,
} => {
write!(
f,
"Data loss on cancel: channel {channel_id:?} lost {data_size} bytes at {cancel_at:?}"
)?;
if let Some(trace) = trace_id {
write!(f, " (trace: {trace:?})")?;
}
Ok(())
}
}
}
}
#[derive(Debug, Clone)]
pub struct ViolationRecord {
pub violation: ChannelAtomicityViolation,
pub timestamp: SystemTime,
pub trace_id: Option<DistTraceId>,
pub stack_trace: Option<String>,
pub replay_command: Option<String>,
}
impl ViolationRecord {
#[must_use]
pub fn new(violation: ChannelAtomicityViolation, config: &ChannelAtomicityConfig) -> Self {
let trace_id = match &violation {
ChannelAtomicityViolation::ReservationLeak { trace_id, .. } => *trace_id,
ChannelAtomicityViolation::DoubleCommit { trace_id, .. } => *trace_id,
ChannelAtomicityViolation::DoubleAbort { trace_id, .. } => *trace_id,
ChannelAtomicityViolation::UseAfterCommit { trace_id, .. } => *trace_id,
ChannelAtomicityViolation::UseAfterAbort { trace_id, .. } => *trace_id,
ChannelAtomicityViolation::LostWakeup { trace_id, .. } => *trace_id,
ChannelAtomicityViolation::SpuriousWakeup { trace_id, .. } => *trace_id,
ChannelAtomicityViolation::DataLossOnCancel { trace_id, .. } => *trace_id,
};
let stack_trace = if config.include_stack_traces {
Some(Self::capture_stack_trace())
} else {
None
};
let replay_command = if config.enable_replay_commands {
trace_id.map(|tid| format!("asupersync-replay --trace-id {tid:?}"))
} else {
None
};
Self {
violation,
timestamp: channel_atomicity_now(),
trace_id,
stack_trace,
replay_command,
}
}
#[allow(unused_variables)]
pub fn emit_structured_log(&self) {
let timestamp_millis = self
.timestamp
.duration_since(UNIX_EPOCH)
.map_or(0, |d| d.as_millis());
crate::tracing_compat::error!(
violation_type = "channel_atomicity_violation",
timestamp_millis = timestamp_millis,
violation = %self.violation,
trace_id = ?self.trace_id,
replay_command = ?self.replay_command,
stack_trace = ?self.stack_trace,
"channel atomicity violation"
);
}
fn capture_stack_trace() -> String {
stack_trace::capture_stack_trace()
}
}
#[derive(Debug, Clone)]
pub struct ReservationState {
pub reservation_id: ReservationId,
pub channel_id: ChannelId,
pub created_at: SystemTime,
pub trace_id: Option<DistTraceId>,
pub status: ReservationStatus,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReservationStatus {
Active,
Committed {
at: SystemTime,
data_size: usize,
},
Aborted {
at: SystemTime,
reason: String,
},
}
#[derive(Debug, Clone)]
pub struct WakerState {
pub waker_id: WakerId,
pub channel_id: ChannelId,
pub registered_at: SystemTime,
pub expected_wakeup_at: Option<SystemTime>,
pub actual_wakeup_at: Option<SystemTime>,
pub trace_id: Option<DistTraceId>,
}
#[derive(Debug)]
pub struct ChannelAtomicityOracle {
config: ChannelAtomicityConfig,
reservations: HashMap<ReservationId, ReservationState>,
channel_reservations: HashMap<ChannelId, HashSet<ReservationId>>,
wakers: HashMap<WakerId, WakerState>,
channel_wakers: HashMap<ChannelId, HashSet<WakerId>>,
violations: Vec<ChannelAtomicityViolation>,
violation_records: VecDeque<ViolationRecord>,
stats: ChannelAtomicityStatistics,
}
#[derive(Debug, Clone, Default)]
pub struct ChannelAtomicityStatistics {
pub total_reservations_created: u64,
pub total_reservations_committed: u64,
pub total_reservations_aborted: u64,
pub total_reservations_leaked: u64,
pub total_wakers_registered: u64,
pub total_wakeups_expected: u64,
pub total_wakeups_actual: u64,
pub total_lost_wakeups: u64,
pub total_spurious_wakeups: u64,
pub total_violations: u64,
}
impl Default for ChannelAtomicityOracle {
fn default() -> Self {
Self::new(ChannelAtomicityConfig::default())
}
}
impl ChannelAtomicityOracle {
#[must_use]
pub fn new(config: ChannelAtomicityConfig) -> Self {
Self {
config,
reservations: HashMap::new(),
channel_reservations: HashMap::new(),
wakers: HashMap::new(),
channel_wakers: HashMap::new(),
violations: Vec::new(),
violation_records: VecDeque::new(),
stats: ChannelAtomicityStatistics::default(),
}
}
#[must_use]
pub fn with_defaults() -> Self {
Self::new(ChannelAtomicityConfig::default())
}
#[must_use]
pub fn for_runtime() -> Self {
Self::new(ChannelAtomicityConfig {
track_reservations: true,
track_wakers: true,
enforcement: EnforcementMode::Panic,
structured_logging: true,
include_stack_traces: true,
enable_replay_commands: true,
max_violations_tracked: 100,
max_reservation_age_seconds: 300, max_reservations_per_channel: 1000,
})
}
pub fn on_reservation_created(
&mut self,
reservation_id: ReservationId,
channel_id: ChannelId,
trace_id: Option<DistTraceId>,
) {
if !self.config.track_reservations {
return;
}
let state = ReservationState {
reservation_id,
channel_id,
created_at: channel_atomicity_now(),
trace_id,
status: ReservationStatus::Active,
};
self.reservations.insert(reservation_id, state);
self.channel_reservations
.entry(channel_id)
.or_default()
.insert(reservation_id);
self.stats.total_reservations_created += 1;
self.cleanup_old_reservations();
}
pub fn on_reservation_committed(&mut self, reservation_id: ReservationId, data_size: usize) {
if !self.config.track_reservations {
return;
}
if let Some(state) = self.reservations.get_mut(&reservation_id) {
let commit_time = channel_atomicity_now();
match &state.status {
ReservationStatus::Active => {
state.status = ReservationStatus::Committed {
at: commit_time,
data_size,
};
self.stats.total_reservations_committed += 1;
}
ReservationStatus::Committed { at, .. } => {
let violation = ChannelAtomicityViolation::DoubleCommit {
reservation_id,
channel_id: state.channel_id,
first_commit_at: *at,
second_commit_at: commit_time,
trace_id: state.trace_id,
};
self.record_violation(violation);
}
ReservationStatus::Aborted { .. } => {
let violation = ChannelAtomicityViolation::UseAfterAbort {
reservation_id,
channel_id: state.channel_id,
abort_at: match state.status {
ReservationStatus::Aborted { at, .. } => at,
_ => unreachable!(),
},
use_at: commit_time,
operation: "commit".to_string(),
trace_id: state.trace_id,
};
self.record_violation(violation);
}
}
}
}
pub fn on_reservation_aborted(&mut self, reservation_id: ReservationId, reason: String) {
if !self.config.track_reservations {
return;
}
if let Some(state) = self.reservations.get_mut(&reservation_id) {
let abort_time = channel_atomicity_now();
match &state.status {
ReservationStatus::Active => {
state.status = ReservationStatus::Aborted {
at: abort_time,
reason,
};
self.stats.total_reservations_aborted += 1;
}
ReservationStatus::Aborted { at, .. } => {
let violation = ChannelAtomicityViolation::DoubleAbort {
reservation_id,
channel_id: state.channel_id,
first_abort_at: *at,
second_abort_at: abort_time,
trace_id: state.trace_id,
};
self.record_violation(violation);
}
ReservationStatus::Committed { at, .. } => {
let violation = ChannelAtomicityViolation::UseAfterCommit {
reservation_id,
channel_id: state.channel_id,
commit_at: *at,
use_at: abort_time,
operation: "abort".to_string(),
trace_id: state.trace_id,
};
self.record_violation(violation);
}
}
}
}
pub fn on_waker_registered(
&mut self,
waker_id: WakerId,
channel_id: ChannelId,
trace_id: Option<DistTraceId>,
) {
if !self.config.track_wakers {
return;
}
let state = WakerState {
waker_id,
channel_id,
registered_at: channel_atomicity_now(),
expected_wakeup_at: None,
actual_wakeup_at: None,
trace_id,
};
self.wakers.insert(waker_id, state);
self.channel_wakers
.entry(channel_id)
.or_default()
.insert(waker_id);
self.stats.total_wakers_registered += 1;
}
pub fn on_waker_expected(&mut self, waker_id: WakerId, expected_at: SystemTime) {
if !self.config.track_wakers {
return;
}
if let Some(state) = self.wakers.get_mut(&waker_id) {
state.expected_wakeup_at = Some(expected_at);
self.stats.total_wakeups_expected += 1;
}
}
pub fn on_waker_wakeup(&mut self, waker_id: WakerId, actual_at: SystemTime) {
if !self.config.track_wakers {
return;
}
if let Some(state) = self.wakers.get_mut(&waker_id) {
state.actual_wakeup_at = Some(actual_at);
self.stats.total_wakeups_actual += 1;
if let Some(expected_at) = state.expected_wakeup_at {
if let Ok(delay) = actual_at.duration_since(expected_at) {
if delay.as_millis() > 100 {
let violation = ChannelAtomicityViolation::LostWakeup {
waker_id,
channel_id: state.channel_id,
expected_at,
detected_at: actual_at,
trace_id: state.trace_id,
};
self.record_violation(violation);
self.stats.total_lost_wakeups += 1;
}
}
} else {
let violation = ChannelAtomicityViolation::SpuriousWakeup {
waker_id,
channel_id: state.channel_id,
wakeup_at: actual_at,
trace_id: state.trace_id,
};
self.record_violation(violation);
self.stats.total_spurious_wakeups += 1;
}
}
}
pub fn on_cancel_data_loss(
&mut self,
channel_id: ChannelId,
data_size: usize,
trace_id: Option<DistTraceId>,
) {
let violation = ChannelAtomicityViolation::DataLossOnCancel {
channel_id,
data_size,
cancel_at: channel_atomicity_now(),
trace_id,
};
self.record_violation(violation);
}
pub fn check_for_violations(&mut self) -> Result<Vec<ChannelAtomicityViolation>, String> {
self.check_for_reservation_leaks();
Ok(self.violations.clone())
}
#[must_use]
pub fn statistics(&self) -> ChannelAtomicityStatistics {
self.stats.clone()
}
pub fn reset(&mut self) {
self.reservations.clear();
self.channel_reservations.clear();
self.wakers.clear();
self.channel_wakers.clear();
self.violations.clear();
self.violation_records.clear();
self.stats = ChannelAtomicityStatistics::default();
}
#[must_use]
pub fn violation_records(&self) -> Vec<ViolationRecord> {
self.violation_records.iter().cloned().collect()
}
fn record_violation(&mut self, violation: ChannelAtomicityViolation) {
self.violations.push(violation.clone());
let record = ViolationRecord::new(violation.clone(), &self.config);
if self.config.structured_logging {
record.emit_structured_log();
}
self.violation_records.push_back(record);
if self.violation_records.len() > self.config.max_violations_tracked {
self.violation_records.pop_front();
}
self.stats.total_violations += 1;
match self.config.enforcement {
EnforcementMode::Panic => panic!("Channel atomicity violation detected: {violation}"), EnforcementMode::Warn => {
crate::tracing_compat::warn!(
violation = %violation,
"channel atomicity violation"
);
}
EnforcementMode::Collect => {} }
}
fn check_for_reservation_leaks(&mut self) {
let now = channel_atomicity_now();
let mut leaked_reservations = Vec::new();
let mut violations_to_record = Vec::new();
for (reservation_id, state) in &self.reservations {
if matches!(state.status, ReservationStatus::Active) {
if let Ok(age) = now.duration_since(state.created_at) {
if age.as_secs() >= self.config.max_reservation_age_seconds {
leaked_reservations.push(*reservation_id);
let violation = ChannelAtomicityViolation::ReservationLeak {
reservation_id: *reservation_id,
channel_id: state.channel_id,
created_at: state.created_at,
trace_id: state.trace_id,
};
violations_to_record.push(violation);
}
}
}
}
for violation in violations_to_record {
self.record_violation(violation);
self.stats.total_reservations_leaked += 1;
}
for reservation_id in leaked_reservations {
if let Some(state) = self.reservations.remove(&reservation_id) {
if let Some(channel_set) = self.channel_reservations.get_mut(&state.channel_id) {
channel_set.remove(&reservation_id);
}
}
}
}
fn cleanup_old_reservations(&mut self) {
for reservation_set in self.channel_reservations.values_mut() {
if reservation_set.len() > self.config.max_reservations_per_channel {
let to_remove: Vec<ReservationId> = reservation_set
.iter()
.take(reservation_set.len() - self.config.max_reservations_per_channel)
.copied()
.collect();
for reservation_id in to_remove {
reservation_set.remove(&reservation_id);
self.reservations.remove(&reservation_id);
}
}
}
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
use std::time::Duration;
#[test]
fn test_reservation_lifecycle_happy_path() {
let mut oracle = ChannelAtomicityOracle::with_defaults();
let reservation_id = ReservationId(1);
let channel_id = ChannelId(1);
oracle.on_reservation_created(reservation_id, channel_id, None);
oracle.on_reservation_committed(reservation_id, 100);
let violations = oracle.check_for_violations().unwrap();
assert!(violations.is_empty());
let stats = oracle.statistics();
assert_eq!(stats.total_reservations_created, 1);
assert_eq!(stats.total_reservations_committed, 1);
assert_eq!(stats.total_violations, 0);
}
#[test]
fn test_double_commit_detection() {
let mut oracle = ChannelAtomicityOracle::new(ChannelAtomicityConfig {
enforcement: EnforcementMode::Collect,
..Default::default()
});
let reservation_id = ReservationId(1);
let channel_id = ChannelId(1);
oracle.on_reservation_created(reservation_id, channel_id, None);
oracle.on_reservation_committed(reservation_id, 100);
oracle.on_reservation_committed(reservation_id, 200);
let violations = oracle.check_for_violations().unwrap();
assert_eq!(violations.len(), 1);
assert!(matches!(
violations[0],
ChannelAtomicityViolation::DoubleCommit { .. }
));
}
#[test]
fn test_use_after_abort_detection() {
let mut oracle = ChannelAtomicityOracle::new(ChannelAtomicityConfig {
enforcement: EnforcementMode::Collect,
..Default::default()
});
let reservation_id = ReservationId(1);
let channel_id = ChannelId(1);
oracle.on_reservation_created(reservation_id, channel_id, None);
oracle.on_reservation_aborted(reservation_id, "cancelled".to_string());
oracle.on_reservation_committed(reservation_id, 100);
let violations = oracle.check_for_violations().unwrap();
assert_eq!(violations.len(), 1);
assert!(matches!(
violations[0],
ChannelAtomicityViolation::UseAfterAbort { .. }
));
}
#[test]
fn test_reservation_leak_detection() {
let mut oracle = ChannelAtomicityOracle::new(ChannelAtomicityConfig {
enforcement: EnforcementMode::Collect,
max_reservation_age_seconds: 0, ..Default::default()
});
let reservation_id = ReservationId(1);
let channel_id = ChannelId(1);
oracle.on_reservation_created(reservation_id, channel_id, None);
std::thread::sleep(Duration::from_millis(10));
let violations = oracle.check_for_violations().unwrap();
assert_eq!(violations.len(), 1);
assert!(matches!(
violations[0],
ChannelAtomicityViolation::ReservationLeak { .. }
));
}
#[test]
fn test_spurious_wakeup_detection() {
let mut oracle = ChannelAtomicityOracle::new(ChannelAtomicityConfig {
enforcement: EnforcementMode::Collect,
..Default::default()
});
let waker_id = WakerId(1);
let channel_id = ChannelId(1);
oracle.on_waker_registered(waker_id, channel_id, None);
oracle.on_waker_wakeup(waker_id, SystemTime::now());
let violations = oracle.check_for_violations().unwrap();
assert_eq!(violations.len(), 1);
assert!(matches!(
violations[0],
ChannelAtomicityViolation::SpuriousWakeup { .. }
));
}
#[test]
fn test_lost_wakeup_detection() {
let mut oracle = ChannelAtomicityOracle::new(ChannelAtomicityConfig {
enforcement: EnforcementMode::Collect,
..Default::default()
});
let waker_id = WakerId(1);
let channel_id = ChannelId(1);
let now = SystemTime::now();
oracle.on_waker_registered(waker_id, channel_id, None);
oracle.on_waker_expected(waker_id, now);
let delayed_wakeup = now + Duration::from_millis(200);
oracle.on_waker_wakeup(waker_id, delayed_wakeup);
let violations = oracle.check_for_violations().unwrap();
assert_eq!(violations.len(), 1);
assert!(matches!(
violations[0],
ChannelAtomicityViolation::LostWakeup { .. }
));
}
#[test]
fn test_data_loss_on_cancel() {
let mut oracle = ChannelAtomicityOracle::new(ChannelAtomicityConfig {
enforcement: EnforcementMode::Collect,
..Default::default()
});
let channel_id = ChannelId(1);
oracle.on_cancel_data_loss(channel_id, 1024, None);
let violations = oracle.check_for_violations().unwrap();
assert_eq!(violations.len(), 1);
assert!(matches!(
violations[0],
ChannelAtomicityViolation::DataLossOnCancel { .. }
));
}
#[test]
fn test_violation_record_creation() {
let config = ChannelAtomicityConfig {
include_stack_traces: true,
enable_replay_commands: true,
structured_logging: false,
..Default::default()
};
let violation = ChannelAtomicityViolation::ReservationLeak {
reservation_id: ReservationId(1),
channel_id: ChannelId(1),
created_at: SystemTime::now(),
trace_id: Some(DistTraceId::new_for_test(1)),
};
let record = ViolationRecord::new(violation, &config);
assert!(record.trace_id.is_some());
assert!(record.stack_trace.is_some());
assert!(record.replay_command.is_some());
}
#[test]
fn test_oracle_reset() {
let mut oracle = ChannelAtomicityOracle::with_defaults();
oracle.on_reservation_created(ReservationId(1), ChannelId(1), None);
oracle.on_waker_registered(WakerId(1), ChannelId(1), None);
oracle.on_cancel_data_loss(ChannelId(1), 100, None);
assert!(!oracle.reservations.is_empty());
assert!(!oracle.wakers.is_empty());
assert!(!oracle.violations.is_empty());
oracle.reset();
assert!(oracle.reservations.is_empty());
assert!(oracle.wakers.is_empty());
assert!(oracle.violations.is_empty());
assert_eq!(oracle.statistics().total_reservations_created, 0);
}
}