use crate::lab::util::stack_trace;
use crate::trace::distributed::DistTraceId;
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{SystemTime, UNIX_EPOCH};
#[inline]
fn waker_event_now() -> SystemTime {
#[cfg(any(test, feature = "deterministic-mode"))]
{
UNIX_EPOCH
}
#[cfg(not(any(test, feature = "deterministic-mode")))]
{
SystemTime::now()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct WakerId(pub u64);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ChannelId(pub u64);
#[derive(Debug, Clone)]
pub struct WakerDedupConfig {
pub track_queued_state: bool,
pub track_wakeup_events: bool,
pub detect_registration_races: bool,
pub track_cleanup: bool,
pub enforcement: EnforcementMode,
pub structured_logging: bool,
pub include_stack_traces: bool,
pub enable_replay_commands: bool,
pub max_violations_tracked: usize,
pub max_wakers_per_channel: usize,
pub max_tracked_wakers: usize,
pub race_detection_window_ms: u64,
}
impl Default for WakerDedupConfig {
fn default() -> Self {
Self {
track_queued_state: true,
track_wakeup_events: true,
detect_registration_races: true,
track_cleanup: true,
enforcement: EnforcementMode::Warn,
structured_logging: false,
include_stack_traces: false,
enable_replay_commands: false,
max_violations_tracked: 1000,
max_wakers_per_channel: 1000,
max_tracked_wakers: 10000,
race_detection_window_ms: 10, }
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EnforcementMode {
Collect,
Warn,
Panic,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WakerDedupViolation {
LostWakeup {
waker_id: WakerId,
channel_id: ChannelId,
registered_at: SystemTime,
expected_wake_at: SystemTime,
trace_id: Option<DistTraceId>,
},
SpuriousWakeup {
waker_id: WakerId,
channel_id: ChannelId,
woken_at: SystemTime,
reason: String,
trace_id: Option<DistTraceId>,
},
InconsistentQueuedState {
waker_id: WakerId,
channel_id: ChannelId,
expected_queued: bool,
actual_queued: bool,
detected_at: SystemTime,
trace_id: Option<DistTraceId>,
},
RegistrationRace {
waker_id: WakerId,
channel_id: ChannelId,
registration_time: SystemTime,
wakeup_time: SystemTime,
trace_id: Option<DistTraceId>,
},
DoubleWakeup {
waker_id: WakerId,
channel_id: ChannelId,
first_wake_at: SystemTime,
second_wake_at: SystemTime,
trace_id: Option<DistTraceId>,
},
WakerLeak {
waker_id: WakerId,
channel_id: ChannelId,
registered_at: SystemTime,
detected_at: SystemTime,
trace_id: Option<DistTraceId>,
},
UseAfterDrop {
waker_id: WakerId,
channel_id: ChannelId,
dropped_at: SystemTime,
operation_at: SystemTime,
operation: String,
trace_id: Option<DistTraceId>,
},
}
impl std::fmt::Display for WakerDedupViolation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::LostWakeup {
waker_id,
channel_id,
registered_at,
expected_wake_at,
trace_id,
} => {
write!(
f,
"Lost wakeup: waker {waker_id:?} on channel {channel_id:?} registered at {registered_at:?}, expected wake at {expected_wake_at:?}"
)?;
if let Some(trace) = trace_id {
write!(f, " (trace: {trace:?})")?;
}
Ok(())
}
Self::SpuriousWakeup {
waker_id,
channel_id,
woken_at,
reason,
trace_id,
} => {
write!(
f,
"Spurious wakeup: waker {waker_id:?} on channel {channel_id:?} woken at {woken_at:?}, reason: {reason}"
)?;
if let Some(trace) = trace_id {
write!(f, " (trace: {trace:?})")?;
}
Ok(())
}
Self::InconsistentQueuedState {
waker_id,
channel_id,
expected_queued,
actual_queued,
detected_at,
trace_id,
} => {
write!(
f,
"Inconsistent queued state: waker {waker_id:?} on channel {channel_id:?} expected queued={expected_queued}, actual queued={actual_queued}, detected at {detected_at:?}"
)?;
if let Some(trace) = trace_id {
write!(f, " (trace: {trace:?})")?;
}
Ok(())
}
Self::RegistrationRace {
waker_id,
channel_id,
registration_time,
wakeup_time,
trace_id,
} => {
write!(
f,
"Registration race: waker {waker_id:?} on channel {channel_id:?} registered at {registration_time:?}, woken at {wakeup_time:?}"
)?;
if let Some(trace) = trace_id {
write!(f, " (trace: {trace:?})")?;
}
Ok(())
}
Self::DoubleWakeup {
waker_id,
channel_id,
first_wake_at,
second_wake_at,
trace_id,
} => {
write!(
f,
"Double wakeup: waker {waker_id:?} on channel {channel_id:?} first woken at {first_wake_at:?}, second at {second_wake_at:?}"
)?;
if let Some(trace) = trace_id {
write!(f, " (trace: {trace:?})")?;
}
Ok(())
}
Self::WakerLeak {
waker_id,
channel_id,
registered_at,
detected_at,
trace_id,
} => {
write!(
f,
"Waker leak: waker {waker_id:?} on channel {channel_id:?} registered at {registered_at:?}, detected at {detected_at:?}"
)?;
if let Some(trace) = trace_id {
write!(f, " (trace: {trace:?})")?;
}
Ok(())
}
Self::UseAfterDrop {
waker_id,
channel_id,
dropped_at,
operation_at,
operation,
trace_id,
} => {
write!(
f,
"Use after drop: waker {waker_id:?} on channel {channel_id:?} dropped at {dropped_at:?}, operation '{operation}' at {operation_at:?}"
)?;
if let Some(trace) = trace_id {
write!(f, " (trace: {trace:?})")?;
}
Ok(())
}
}
}
}
#[derive(Debug, Clone)]
pub struct ViolationRecord {
pub violation: WakerDedupViolation,
pub timestamp: SystemTime,
pub trace_id: Option<DistTraceId>,
pub stack_trace: Option<String>,
pub replay_command: Option<String>,
}
impl ViolationRecord {
#[must_use]
pub fn new(violation: WakerDedupViolation, config: &WakerDedupConfig) -> Self {
let trace_id = match &violation {
WakerDedupViolation::LostWakeup { trace_id, .. } => *trace_id,
WakerDedupViolation::SpuriousWakeup { trace_id, .. } => *trace_id,
WakerDedupViolation::InconsistentQueuedState { trace_id, .. } => *trace_id,
WakerDedupViolation::RegistrationRace { trace_id, .. } => *trace_id,
WakerDedupViolation::DoubleWakeup { trace_id, .. } => *trace_id,
WakerDedupViolation::WakerLeak { trace_id, .. } => *trace_id,
WakerDedupViolation::UseAfterDrop { trace_id, .. } => *trace_id,
};
let stack_trace = if config.include_stack_traces {
Some(Self::capture_stack_trace())
} else {
None
};
let replay_command = if config.enable_replay_commands {
trace_id.map(|tid| format!("asupersync-replay --trace-id {tid:?}"))
} else {
None
};
Self {
violation,
timestamp: waker_event_now(),
trace_id,
stack_trace,
replay_command,
}
}
#[allow(unused_variables)]
pub fn emit_structured_log(&self) {
let timestamp_millis = self
.timestamp
.duration_since(UNIX_EPOCH)
.map_or(0, |d| d.as_millis());
crate::tracing_compat::error!(
violation_type = "waker_dedup_violation",
timestamp_millis = timestamp_millis,
violation = %self.violation,
trace_id = ?self.trace_id,
replay_command = ?self.replay_command,
stack_trace = ?self.stack_trace,
"waker deduplication violation"
);
}
fn capture_stack_trace() -> String {
stack_trace::capture_stack_trace_default()
}
}
#[derive(Debug, Clone)]
pub struct WakerState {
pub waker_id: WakerId,
pub channel_id: ChannelId,
pub registered_at: SystemTime,
pub trace_id: Option<DistTraceId>,
pub status: WakerStatus,
pub last_operation_at: SystemTime,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WakerStatus {
Queued,
Woken {
at: SystemTime,
},
Dropped {
at: SystemTime,
},
}
#[derive(Debug, Clone, Default)]
pub struct WakerDedupStatistics {
pub total_wakers_registered: u64,
pub total_wakers_woken: u64,
pub total_wakers_dropped: u64,
pub total_lost_wakeups: u64,
pub total_spurious_wakeups: u64,
pub total_double_wakeups: u64,
pub total_registration_races: u64,
pub total_state_inconsistencies: u64,
pub total_leaks: u64,
pub total_use_after_drop: u64,
pub total_violations: u64,
pub active_wakers: u64,
}
#[derive(Debug)]
pub struct WakerDedupOracle {
config: WakerDedupConfig,
wakers: HashMap<WakerId, WakerState>,
channel_wakers: HashMap<ChannelId, HashSet<WakerId>>,
recent_registrations: VecDeque<(WakerId, SystemTime)>,
recent_wakeups: VecDeque<(WakerId, SystemTime)>,
violations: Vec<WakerDedupViolation>,
violation_records: VecDeque<ViolationRecord>,
stats: WakerDedupStatistics,
}
impl Default for WakerDedupOracle {
fn default() -> Self {
Self::new(WakerDedupConfig::default())
}
}
impl WakerDedupOracle {
#[must_use]
pub fn new(config: WakerDedupConfig) -> Self {
Self {
config,
wakers: HashMap::new(),
channel_wakers: HashMap::new(),
recent_registrations: VecDeque::new(),
recent_wakeups: VecDeque::new(),
violations: Vec::new(),
violation_records: VecDeque::new(),
stats: WakerDedupStatistics::default(),
}
}
#[must_use]
pub fn with_defaults() -> Self {
Self::new(WakerDedupConfig::default())
}
#[must_use]
pub fn for_runtime() -> Self {
Self::new(WakerDedupConfig {
track_queued_state: true,
track_wakeup_events: true,
detect_registration_races: true,
track_cleanup: true,
enforcement: EnforcementMode::Panic,
structured_logging: true,
include_stack_traces: true,
enable_replay_commands: true,
max_violations_tracked: 100,
max_wakers_per_channel: 100,
max_tracked_wakers: 1000,
race_detection_window_ms: 5,
})
}
pub fn on_waker_registered(
&mut self,
waker_id: WakerId,
channel_id: ChannelId,
is_queued: bool,
trace_id: Option<DistTraceId>,
) {
if !self.config.track_queued_state {
return;
}
let now = waker_event_now();
if self.config.detect_registration_races {
self.check_registration_races(waker_id, now);
}
let state = WakerState {
waker_id,
channel_id,
registered_at: now,
trace_id,
status: if is_queued {
WakerStatus::Queued
} else {
WakerStatus::Woken { at: now }
},
last_operation_at: now,
};
self.wakers.insert(waker_id, state);
self.channel_wakers
.entry(channel_id)
.or_default()
.insert(waker_id);
self.recent_registrations.push_back((waker_id, now));
self.stats.total_wakers_registered += 1;
if is_queued {
self.stats.active_wakers += 1;
}
self.cleanup_tracking_data();
}
pub fn on_waker_wake_requested(
&mut self,
waker_id: WakerId,
_reason: String,
trace_id: Option<DistTraceId>,
) {
if !self.config.track_wakeup_events {
return;
}
let now = waker_event_now();
if let Some(state) = self.wakers.get(&waker_id) {
match &state.status {
WakerStatus::Queued => {
}
WakerStatus::Woken { at } => {
let violation = WakerDedupViolation::DoubleWakeup {
waker_id,
channel_id: state.channel_id,
first_wake_at: *at,
second_wake_at: now,
trace_id,
};
self.record_violation(violation);
self.stats.total_double_wakeups += 1;
}
WakerStatus::Dropped { at } => {
let violation = WakerDedupViolation::UseAfterDrop {
waker_id,
channel_id: state.channel_id,
dropped_at: *at,
operation_at: now,
operation: "wake_request".to_string(),
trace_id,
};
self.record_violation(violation);
self.stats.total_use_after_drop += 1;
}
}
} else {
}
}
pub fn on_waker_actually_woken(&mut self, waker_id: WakerId, trace_id: Option<DistTraceId>) {
if !self.config.track_wakeup_events {
return;
}
let now = waker_event_now();
if let Some(state) = self.wakers.get_mut(&waker_id) {
match &state.status {
WakerStatus::Queued => {
state.status = WakerStatus::Woken { at: now };
state.last_operation_at = now;
self.stats.total_wakers_woken += 1;
self.stats.active_wakers = self.stats.active_wakers.saturating_sub(1);
}
WakerStatus::Woken { at } => {
let violation = WakerDedupViolation::DoubleWakeup {
waker_id,
channel_id: state.channel_id,
first_wake_at: *at,
second_wake_at: now,
trace_id,
};
self.record_violation(violation);
self.stats.total_double_wakeups += 1;
}
WakerStatus::Dropped { at } => {
let violation = WakerDedupViolation::UseAfterDrop {
waker_id,
channel_id: state.channel_id,
dropped_at: *at,
operation_at: now,
operation: "wakeup".to_string(),
trace_id,
};
self.record_violation(violation);
self.stats.total_use_after_drop += 1;
}
}
} else {
let violation = WakerDedupViolation::SpuriousWakeup {
waker_id,
channel_id: ChannelId(0),
woken_at: now,
reason: "unknown waker".to_string(),
trace_id,
};
self.record_violation(violation);
self.stats.total_spurious_wakeups += 1;
}
self.recent_wakeups.push_back((waker_id, now));
}
pub fn on_waker_dropped(&mut self, waker_id: WakerId) {
if !self.config.track_cleanup {
return;
}
let now = waker_event_now();
if let Some(state) = self.wakers.get_mut(&waker_id) {
match &state.status {
WakerStatus::Queued => {
self.stats.active_wakers = self.stats.active_wakers.saturating_sub(1);
}
WakerStatus::Woken { .. } => {
}
WakerStatus::Dropped { .. } => {
}
}
state.status = WakerStatus::Dropped { at: now };
state.last_operation_at = now;
self.stats.total_wakers_dropped += 1;
}
}
pub fn verify_queued_state(
&mut self,
waker_id: WakerId,
actual_queued: bool,
trace_id: Option<DistTraceId>,
) {
if !self.config.track_queued_state {
return;
}
if let Some(state) = self.wakers.get(&waker_id) {
let expected_queued = matches!(state.status, WakerStatus::Queued);
if expected_queued != actual_queued {
let violation = WakerDedupViolation::InconsistentQueuedState {
waker_id,
channel_id: state.channel_id,
expected_queued,
actual_queued,
detected_at: waker_event_now(),
trace_id,
};
self.record_violation(violation);
self.stats.total_state_inconsistencies += 1;
}
}
}
pub fn check_for_violations(&mut self) -> Result<Vec<WakerDedupViolation>, String> {
self.check_for_leaked_wakers();
self.check_for_lost_wakeups();
Ok(self.violations.clone())
}
#[must_use]
pub fn statistics(&self) -> WakerDedupStatistics {
self.stats.clone()
}
pub fn reset(&mut self) {
self.wakers.clear();
self.channel_wakers.clear();
self.recent_registrations.clear();
self.recent_wakeups.clear();
self.violations.clear();
self.violation_records.clear();
self.stats = WakerDedupStatistics::default();
}
#[must_use]
pub fn violation_records(&self) -> Vec<ViolationRecord> {
self.violation_records.iter().cloned().collect()
}
fn record_violation(&mut self, violation: WakerDedupViolation) {
self.violations.push(violation.clone());
let record = ViolationRecord::new(violation.clone(), &self.config);
if self.config.structured_logging {
record.emit_structured_log();
}
self.violation_records.push_back(record);
if self.violation_records.len() > self.config.max_violations_tracked {
self.violation_records.pop_front();
}
self.stats.total_violations += 1;
match self.config.enforcement {
EnforcementMode::Panic => {
panic!("Waker deduplication violation detected: {violation}") }
EnforcementMode::Warn => {
crate::tracing_compat::warn!(
violation = %violation,
"waker deduplication violation"
);
}
EnforcementMode::Collect => {} }
}
fn check_registration_races(&mut self, waker_id: WakerId, registration_time: SystemTime) {
let window = std::time::Duration::from_millis(self.config.race_detection_window_ms);
let mut violations_to_record = Vec::new();
for (recent_waker_id, recent_wakeup_time) in &self.recent_wakeups {
if *recent_waker_id == waker_id {
if let Ok(time_diff) = registration_time.duration_since(*recent_wakeup_time) {
if time_diff <= window {
if let Some(state) = self.wakers.get(&waker_id) {
let violation = WakerDedupViolation::RegistrationRace {
waker_id,
channel_id: state.channel_id,
registration_time,
wakeup_time: *recent_wakeup_time,
trace_id: state.trace_id,
};
violations_to_record.push(violation);
}
}
}
}
}
for violation in violations_to_record {
self.record_violation(violation);
self.stats.total_registration_races += 1;
}
}
fn check_for_leaked_wakers(&mut self) {
let now = waker_event_now();
let leak_threshold = std::time::Duration::from_secs(60); let mut leaked_wakers = Vec::new();
let mut violations_to_record = Vec::new();
for (waker_id, state) in &self.wakers {
if matches!(state.status, WakerStatus::Queued) {
if let Ok(age) = now.duration_since(state.registered_at) {
if age > leak_threshold {
leaked_wakers.push(*waker_id);
let violation = WakerDedupViolation::WakerLeak {
waker_id: *waker_id,
channel_id: state.channel_id,
registered_at: state.registered_at,
detected_at: now,
trace_id: state.trace_id,
};
violations_to_record.push(violation);
}
}
}
}
for violation in violations_to_record {
self.record_violation(violation);
self.stats.total_leaks += 1;
}
for waker_id in leaked_wakers {
if let Some(state) = self.wakers.get_mut(&waker_id) {
state.status = WakerStatus::Dropped { at: now };
state.last_operation_at = now;
self.stats.active_wakers = self.stats.active_wakers.saturating_sub(1);
}
}
}
fn check_for_lost_wakeups(&mut self) {
let now = waker_event_now();
let lost_threshold = std::time::Duration::from_secs(30); let mut violations_to_record = Vec::new();
for (waker_id, state) in &self.wakers {
if matches!(state.status, WakerStatus::Queued) {
if let Ok(age) = now.duration_since(state.registered_at) {
if age > lost_threshold {
let violation = WakerDedupViolation::LostWakeup {
waker_id: *waker_id,
channel_id: state.channel_id,
registered_at: state.registered_at,
expected_wake_at: state.registered_at + lost_threshold,
trace_id: state.trace_id,
};
violations_to_record.push(violation);
}
}
}
}
for violation in violations_to_record {
self.record_violation(violation);
self.stats.total_lost_wakeups += 1;
}
}
fn cleanup_tracking_data(&mut self) {
let now = waker_event_now();
let cleanup_window = std::time::Duration::from_secs(300);
while let Some((_, time)) = self.recent_registrations.front() {
if now.duration_since(*time).unwrap_or_default() > cleanup_window {
self.recent_registrations.pop_front();
} else {
break;
}
}
while let Some((_, time)) = self.recent_wakeups.front() {
if now.duration_since(*time).unwrap_or_default() > cleanup_window {
self.recent_wakeups.pop_front();
} else {
break;
}
}
let dropped_cleanup_window = std::time::Duration::from_secs(600); self.wakers.retain(|_, state| {
if let WakerStatus::Dropped { at } = state.status {
now.duration_since(at).unwrap_or_default() <= dropped_cleanup_window
} else {
true
}
});
if self.wakers.len() > self.config.max_tracked_wakers {
let mut to_remove = Vec::new();
for (waker_id, state) in &self.wakers {
if matches!(state.status, WakerStatus::Dropped { .. }) {
to_remove.push(*waker_id);
}
}
to_remove.sort_by_key(|id| self.wakers[id].last_operation_at);
let remove_count =
(self.wakers.len() - self.config.max_tracked_wakers).min(to_remove.len());
for waker_id in &to_remove[..remove_count] {
if let Some(state) = self.wakers.remove(waker_id) {
if let Some(channel_set) = self.channel_wakers.get_mut(&state.channel_id) {
channel_set.remove(waker_id);
}
}
}
}
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
use std::time::Duration;
#[test]
fn test_normal_waker_lifecycle() {
let mut oracle = WakerDedupOracle::with_defaults();
let waker_id = WakerId(1);
let channel_id = ChannelId(1);
oracle.on_waker_registered(waker_id, channel_id, true, None);
oracle.on_waker_wake_requested(waker_id, "channel_send".to_string(), None);
oracle.on_waker_actually_woken(waker_id, None);
oracle.on_waker_dropped(waker_id);
let violations = oracle.check_for_violations().unwrap();
assert!(violations.is_empty());
let stats = oracle.statistics();
assert_eq!(stats.total_wakers_registered, 1);
assert_eq!(stats.total_wakers_woken, 1);
assert_eq!(stats.total_wakers_dropped, 1);
assert_eq!(stats.total_violations, 0);
}
#[test]
fn test_double_wakeup_detection() {
let mut oracle = WakerDedupOracle::new(WakerDedupConfig {
enforcement: EnforcementMode::Collect,
..Default::default()
});
let waker_id = WakerId(1);
let channel_id = ChannelId(1);
oracle.on_waker_registered(waker_id, channel_id, true, None);
oracle.on_waker_actually_woken(waker_id, None);
oracle.on_waker_actually_woken(waker_id, None);
let violations = oracle.check_for_violations().unwrap();
assert_eq!(violations.len(), 1);
assert!(matches!(
violations[0],
WakerDedupViolation::DoubleWakeup { .. }
));
}
#[test]
fn test_spurious_wakeup_detection() {
let mut oracle = WakerDedupOracle::new(WakerDedupConfig {
enforcement: EnforcementMode::Collect,
..Default::default()
});
let waker_id = WakerId(1);
oracle.on_waker_actually_woken(waker_id, None);
let violations = oracle.check_for_violations().unwrap();
assert_eq!(violations.len(), 1);
assert!(matches!(
violations[0],
WakerDedupViolation::SpuriousWakeup { .. }
));
}
#[test]
fn test_use_after_drop_detection() {
let mut oracle = WakerDedupOracle::new(WakerDedupConfig {
enforcement: EnforcementMode::Collect,
..Default::default()
});
let waker_id = WakerId(1);
let channel_id = ChannelId(1);
oracle.on_waker_registered(waker_id, channel_id, true, None);
oracle.on_waker_dropped(waker_id);
oracle.on_waker_actually_woken(waker_id, None);
let violations = oracle.check_for_violations().unwrap();
assert_eq!(violations.len(), 1);
assert!(matches!(
violations[0],
WakerDedupViolation::UseAfterDrop { .. }
));
}
#[test]
fn test_queued_state_verification() {
let mut oracle = WakerDedupOracle::new(WakerDedupConfig {
enforcement: EnforcementMode::Collect,
..Default::default()
});
let waker_id = WakerId(1);
let channel_id = ChannelId(1);
oracle.on_waker_registered(waker_id, channel_id, true, None);
oracle.verify_queued_state(waker_id, false, None);
let violations = oracle.check_for_violations().unwrap();
assert_eq!(violations.len(), 1);
assert!(matches!(
violations[0],
WakerDedupViolation::InconsistentQueuedState { .. }
));
}
#[test]
fn test_registration_race_detection() {
let mut oracle = WakerDedupOracle::new(WakerDedupConfig {
enforcement: EnforcementMode::Collect,
race_detection_window_ms: 100, ..Default::default()
});
let waker_id = WakerId(1);
let channel_id = ChannelId(1);
oracle.on_waker_registered(waker_id, channel_id, true, None);
oracle
.recent_wakeups
.push_back((waker_id, waker_event_now()));
oracle.on_waker_registered(waker_id, channel_id, true, None);
let _violations = oracle.check_for_violations().unwrap();
}
#[test]
fn test_waker_leak_detection() {
let mut oracle = WakerDedupOracle::new(WakerDedupConfig {
enforcement: EnforcementMode::Collect,
..Default::default()
});
let waker_id = WakerId(1);
let channel_id = ChannelId(1);
oracle.on_waker_registered(waker_id, channel_id, true, None);
if let Some(state) = oracle.wakers.get_mut(&waker_id) {
state.registered_at = waker_event_now() - Duration::from_secs(120); }
let violations = oracle.check_for_violations().unwrap();
assert_eq!(violations.len(), 1);
assert!(matches!(
violations[0],
WakerDedupViolation::WakerLeak { .. }
));
}
#[test]
fn test_violation_record_creation() {
let config = WakerDedupConfig {
include_stack_traces: true,
enable_replay_commands: true,
structured_logging: false,
..Default::default()
};
let violation = WakerDedupViolation::LostWakeup {
waker_id: WakerId(1),
channel_id: ChannelId(1),
registered_at: waker_event_now(),
expected_wake_at: waker_event_now(),
trace_id: Some(DistTraceId::new_for_test(1)),
};
let record = ViolationRecord::new(violation, &config);
assert!(record.trace_id.is_some());
assert!(record.stack_trace.is_some());
assert!(record.replay_command.is_some());
}
#[test]
fn test_oracle_reset() {
let mut oracle = WakerDedupOracle::with_defaults();
oracle.on_waker_registered(WakerId(1), ChannelId(1), true, None);
oracle.on_waker_actually_woken(WakerId(999), None);
assert!(!oracle.wakers.is_empty());
assert!(!oracle.violations.is_empty());
oracle.reset();
assert!(oracle.wakers.is_empty());
assert!(oracle.violations.is_empty());
assert_eq!(oracle.statistics().total_wakers_registered, 0);
}
}