use crate::epoch::EpochId;
use crate::types::Time;
use parking_lot::RwLock;
use std::backtrace::Backtrace;
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RuntimeModule {
Scheduler,
RegionTable,
TaskTable,
ObligationTable,
TimerWheel,
IoReactor,
CancelProtocol,
}
impl RuntimeModule {
#[must_use]
pub fn name(self) -> &'static str {
match self {
Self::Scheduler => "scheduler",
Self::RegionTable => "region_table",
Self::TaskTable => "task_table",
Self::ObligationTable => "obligation_table",
Self::TimerWheel => "timer_wheel",
Self::IoReactor => "io_reactor",
Self::CancelProtocol => "cancel_protocol",
}
}
#[must_use]
pub fn all_modules() -> &'static [Self] {
&[
Self::Scheduler,
Self::RegionTable,
Self::TaskTable,
Self::ObligationTable,
Self::TimerWheel,
Self::IoReactor,
Self::CancelProtocol,
]
}
}
#[derive(Debug, Clone)]
pub struct RuntimeEpochConfig {
pub max_epoch_skew: u64,
pub max_transition_duration_ns: u64,
pub sync_window_ns: u64,
pub max_violations: usize,
pub panic_on_violation: bool,
pub capture_stack_traces: bool,
pub max_stack_trace_depth: usize,
pub consistency_level: ConsistencyLevel,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConsistencyLevel {
Relaxed,
Strict,
Development,
}
impl Default for RuntimeEpochConfig {
fn default() -> Self {
Self {
max_epoch_skew: 2, max_transition_duration_ns: 1_000_000_000, sync_window_ns: 10_000_000, max_violations: 1000,
panic_on_violation: false,
capture_stack_traces: true,
max_stack_trace_depth: 32,
consistency_level: ConsistencyLevel::Strict,
}
}
}
#[derive(Debug, Clone)]
pub enum RuntimeEpochViolation {
EpochSkew {
module_a: RuntimeModule,
epoch_a: EpochId,
module_b: RuntimeModule,
epoch_b: EpochId,
skew_amount: u64,
detected_at: Time,
stack_trace: Option<Arc<Backtrace>>,
},
SlowTransition {
module: RuntimeModule,
from_epoch: EpochId,
to_epoch: EpochId,
transition_duration_ns: u64,
detected_at: Time,
stack_trace: Option<Arc<Backtrace>>,
},
MissedTransition {
module: RuntimeModule,
expected_epoch: EpochId,
actual_epoch: EpochId,
detected_at: Time,
stack_trace: Option<Arc<Backtrace>>,
},
StaleEpochUpdate {
module: RuntimeModule,
update_epoch: EpochId,
current_epoch: EpochId,
staleness_amount: u64,
detected_at: Time,
stack_trace: Option<Arc<Backtrace>>,
},
OrderViolation {
first_module: RuntimeModule,
first_epoch: EpochId,
second_module: RuntimeModule,
second_epoch: EpochId,
expected_order: String,
detected_at: Time,
stack_trace: Option<Arc<Backtrace>>,
},
ConsistencyFailure {
affected_modules: Vec<(RuntimeModule, EpochId)>,
consistency_level: ConsistencyLevel,
detected_at: Time,
stack_trace: Option<Arc<Backtrace>>,
},
}
impl fmt::Display for RuntimeEpochViolation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::EpochSkew {
module_a,
epoch_a,
module_b,
epoch_b,
skew_amount,
detected_at,
..
} => {
write!(
f,
"Epoch skew: {} at epoch {} and {} at epoch {} (skew: {}) at {}",
module_a.name(),
epoch_a.as_u64(),
module_b.name(),
epoch_b.as_u64(),
skew_amount,
detected_at.as_nanos()
)
}
Self::SlowTransition {
module,
from_epoch,
to_epoch,
transition_duration_ns,
detected_at,
..
} => {
write!(
f,
"Slow transition: {} from epoch {} to {} took {}ns at {}",
module.name(),
from_epoch.as_u64(),
to_epoch.as_u64(),
transition_duration_ns,
detected_at.as_nanos()
)
}
Self::MissedTransition {
module,
expected_epoch,
actual_epoch,
detected_at,
..
} => {
write!(
f,
"Missed transition: {} expected epoch {} but at {} (detected at {})",
module.name(),
expected_epoch.as_u64(),
actual_epoch.as_u64(),
detected_at.as_nanos()
)
}
Self::StaleEpochUpdate {
module,
update_epoch,
current_epoch,
staleness_amount,
detected_at,
..
} => {
write!(
f,
"Stale update: {} used epoch {} when current is {} (stale by {}) at {}",
module.name(),
update_epoch.as_u64(),
current_epoch.as_u64(),
staleness_amount,
detected_at.as_nanos()
)
}
Self::OrderViolation {
first_module,
first_epoch,
second_module,
second_epoch,
expected_order,
detected_at,
..
} => {
write!(
f,
"Order violation: {} at {} and {} at {} violates expected order '{}' at {}",
first_module.name(),
first_epoch.as_u64(),
second_module.name(),
second_epoch.as_u64(),
expected_order,
detected_at.as_nanos()
)
}
Self::ConsistencyFailure {
affected_modules,
consistency_level,
detected_at,
..
} => {
let modules_str = affected_modules
.iter()
.map(|(module, epoch)| format!("{}:{}", module.name(), epoch.as_u64()))
.collect::<Vec<_>>()
.join(", ");
write!(
f,
"Consistency failure ({:?}): modules [{}] at {}",
consistency_level,
modules_str,
detected_at.as_nanos()
)
}
}
}
}
#[derive(Debug, Clone)]
struct ModuleEpochState {
module: RuntimeModule,
current_epoch: EpochId,
last_transition_time: Time,
transition_start_time: Option<Time>,
transition_history: VecDeque<(EpochId, Time, u64)>, }
impl ModuleEpochState {
fn new(module: RuntimeModule, initial_epoch: EpochId, now: Time) -> Self {
Self {
module,
current_epoch: initial_epoch,
last_transition_time: now,
transition_start_time: None,
transition_history: VecDeque::new(),
}
}
fn start_transition(&mut self, now: Time) {
self.transition_start_time = Some(
self.transition_start_time
.map_or(now, |existing_start| existing_start.min(now)),
);
}
fn complete_transition(&mut self, new_epoch: EpochId, now: Time) -> (EpochId, u64) {
let previous_epoch = self.current_epoch;
let duration_ns = if let Some(start_time) = self.transition_start_time.take() {
now.as_nanos().saturating_sub(start_time.as_nanos())
} else {
0
};
self.transition_history
.push_back((new_epoch, now, duration_ns));
while self.transition_history.len() > 100 {
self.transition_history.pop_front();
}
self.current_epoch = new_epoch;
self.last_transition_time = now;
(previous_epoch, duration_ns)
}
fn is_transitioning(&self) -> bool {
self.transition_start_time.is_some()
}
fn transition_duration_so_far(&self, now: Time) -> Option<u64> {
self.transition_start_time
.map(|start| now.as_nanos().saturating_sub(start.as_nanos()))
}
}
#[derive(Debug)]
pub struct RuntimeEpochOracle {
config: RuntimeEpochConfig,
module_states: RwLock<HashMap<RuntimeModule, ModuleEpochState>>,
global_epoch: AtomicU64,
violations: RwLock<VecDeque<RuntimeEpochViolation>>,
transitions_tracked: AtomicU64,
violations_detected: AtomicU64,
consistency_checks_performed: AtomicU64,
}
impl Default for RuntimeEpochOracle {
fn default() -> Self {
Self::with_default_config()
}
}
impl RuntimeEpochOracle {
#[must_use]
pub fn new(config: RuntimeEpochConfig) -> Self {
let oracle = Self {
config,
module_states: RwLock::new(HashMap::new()),
global_epoch: AtomicU64::new(1),
violations: RwLock::new(VecDeque::new()),
transitions_tracked: AtomicU64::new(0),
violations_detected: AtomicU64::new(0),
consistency_checks_performed: AtomicU64::new(0),
};
let initial_epoch = EpochId::new(1);
let now = Time::ZERO;
{
let mut states = oracle.module_states.write();
for &module in RuntimeModule::all_modules() {
states.insert(module, ModuleEpochState::new(module, initial_epoch, now));
}
}
oracle
}
#[must_use]
pub fn with_default_config() -> Self {
Self::new(RuntimeEpochConfig::default())
}
pub fn notify_epoch_transition_start(
&self,
module: RuntimeModule,
from_epoch: EpochId,
now: Time,
) {
let violation = {
let mut states = self.module_states.write();
states.get_mut(&module).and_then(|state| {
if state.current_epoch != from_epoch {
Some(RuntimeEpochViolation::MissedTransition {
module,
expected_epoch: state.current_epoch,
actual_epoch: from_epoch,
detected_at: now,
stack_trace: self.capture_stack_trace(),
})
} else {
state.start_transition(now);
None
}
})
};
if let Some(violation) = violation {
self.record_violation(violation);
}
}
pub fn notify_epoch_transition_complete(
&self,
module: RuntimeModule,
to_epoch: EpochId,
now: Time,
) {
enum TransitionOutcome {
Ignored,
Accepted {
from_epoch: EpochId,
duration_ns: u64,
},
}
let expected_violation;
let outcome = {
let mut states = self.module_states.write();
if let Some(state) = states.get_mut(&module) {
let current_epoch = state.current_epoch;
let expected_epoch = current_epoch.saturating_next();
if to_epoch == current_epoch {
expected_violation = None;
TransitionOutcome::Ignored
} else if to_epoch.is_before(current_epoch) {
expected_violation = Some(RuntimeEpochViolation::OrderViolation {
first_module: module,
first_epoch: current_epoch,
second_module: module,
second_epoch: to_epoch,
expected_order: "epoch transitions must be monotonically non-decreasing"
.to_string(),
detected_at: now,
stack_trace: self.capture_stack_trace(),
});
TransitionOutcome::Ignored
} else {
expected_violation = if to_epoch.is_after(expected_epoch) {
Some(RuntimeEpochViolation::MissedTransition {
module,
expected_epoch,
actual_epoch: to_epoch,
detected_at: now,
stack_trace: self.capture_stack_trace(),
})
} else {
None
};
let (from_epoch, duration_ns) = state.complete_transition(to_epoch, now);
TransitionOutcome::Accepted {
from_epoch,
duration_ns,
}
}
} else {
expected_violation = None;
TransitionOutcome::Ignored
}
};
if let Some(violation) = expected_violation {
self.record_violation(violation);
}
let TransitionOutcome::Accepted {
from_epoch,
duration_ns: transition_duration,
} = outcome
else {
return;
};
self.transitions_tracked.fetch_add(1, Ordering::Relaxed);
if transition_duration > self.config.max_transition_duration_ns {
let violation = RuntimeEpochViolation::SlowTransition {
module,
from_epoch,
to_epoch,
transition_duration_ns: transition_duration,
detected_at: now,
stack_trace: self.capture_stack_trace(),
};
self.record_violation(violation);
}
let max_epoch = {
let states = self.module_states.read();
states
.values()
.map(|s| s.current_epoch.as_u64())
.max()
.unwrap_or(1)
};
self.global_epoch.store(max_epoch, Ordering::Relaxed);
}
pub fn notify_epoch_update(&self, module: RuntimeModule, update_epoch: EpochId, now: Time) {
let current_epoch = {
let states = self.module_states.read();
states
.get(&module)
.map_or(EpochId::new(1), |s| s.current_epoch)
};
let staleness = current_epoch.as_u64().saturating_sub(update_epoch.as_u64());
if staleness > 0 {
let violation = RuntimeEpochViolation::StaleEpochUpdate {
module,
update_epoch,
current_epoch,
staleness_amount: staleness,
detected_at: now,
stack_trace: self.capture_stack_trace(),
};
self.record_violation(violation);
}
}
pub fn check_epoch_consistency(&self, now: Time) {
self.consistency_checks_performed
.fetch_add(1, Ordering::Relaxed);
let states = self.module_states.read();
let modules: Vec<_> = states.keys().copied().collect();
for i in 0..modules.len() {
for j in (i + 1)..modules.len() {
let module_a = modules[i];
let module_b = modules[j];
if let (Some(state_a), Some(state_b)) =
(states.get(&module_a), states.get(&module_b))
{
let epoch_a = state_a.current_epoch;
let epoch_b = state_b.current_epoch;
let skew = epoch_a.as_u64().abs_diff(epoch_b.as_u64());
if skew > self.config.max_epoch_skew {
let violation = RuntimeEpochViolation::EpochSkew {
module_a,
epoch_a,
module_b,
epoch_b,
skew_amount: skew,
detected_at: now,
stack_trace: self.capture_stack_trace(),
};
self.record_violation(violation);
}
}
}
}
for state in states.values() {
if let Some(transition_duration) = state.transition_duration_so_far(now) {
if transition_duration > self.config.max_transition_duration_ns {
let violation = RuntimeEpochViolation::SlowTransition {
module: state.module,
from_epoch: state.current_epoch,
to_epoch: EpochId::new(state.current_epoch.as_u64() + 1),
transition_duration_ns: transition_duration,
detected_at: now,
stack_trace: self.capture_stack_trace(),
};
self.record_violation(violation);
}
}
}
}
pub fn check(&self, now: Time) -> Result<(), RuntimeEpochViolation> {
self.check_epoch_consistency(now);
let violations = self.violations.read();
if let Some(violation) = violations.front() {
return Err(violation.clone());
}
Ok(())
}
pub fn reset(&self) {
let initial_epoch = EpochId::new(1);
let now = Time::ZERO;
{
let mut states = self.module_states.write();
states.clear();
for &module in RuntimeModule::all_modules() {
states.insert(module, ModuleEpochState::new(module, initial_epoch, now));
}
}
self.global_epoch.store(1, Ordering::Relaxed);
self.violations.write().clear();
self.transitions_tracked.store(0, Ordering::Relaxed);
self.violations_detected.store(0, Ordering::Relaxed);
self.consistency_checks_performed
.store(0, Ordering::Relaxed);
}
pub fn global_epoch(&self) -> EpochId {
EpochId::new(self.global_epoch.load(Ordering::Relaxed))
}
pub fn module_epoch(&self, module: RuntimeModule) -> Option<EpochId> {
let states = self.module_states.read();
states.get(&module).map(|s| s.current_epoch)
}
pub fn get_statistics(&self) -> RuntimeEpochStatistics {
let states = self.module_states.read();
let violations = self.violations.read();
let epochs: Vec<u64> = states.values().map(|s| s.current_epoch.as_u64()).collect();
let min_epoch = epochs.iter().copied().min().unwrap_or(1);
let max_epoch = epochs.iter().copied().max().unwrap_or(1);
let epoch_variance = max_epoch - min_epoch;
RuntimeEpochStatistics {
transitions_tracked: self.transitions_tracked.load(Ordering::Relaxed),
violations_detected: self.violations_detected.load(Ordering::Relaxed),
consistency_checks_performed: self.consistency_checks_performed.load(Ordering::Relaxed),
global_epoch: self.global_epoch(),
tracked_modules: states.len(),
min_module_epoch: EpochId::new(min_epoch),
max_module_epoch: EpochId::new(max_epoch),
epoch_variance,
total_violations: violations.len(),
}
}
pub fn get_recent_violations(&self, limit: usize) -> Vec<RuntimeEpochViolation> {
let violations = self.violations.read();
violations.iter().rev().take(limit).cloned().collect()
}
pub fn get_module_states(&self) -> Vec<(RuntimeModule, EpochId, bool)> {
let states = self.module_states.read();
states
.values()
.map(|s| (s.module, s.current_epoch, s.is_transitioning()))
.collect()
}
fn record_violation(&self, violation: RuntimeEpochViolation) {
self.violations_detected.fetch_add(1, Ordering::Relaxed);
assert!(
!self.config.panic_on_violation,
"Runtime epoch violation detected: {violation}"
);
let mut violations = self.violations.write();
violations.push_back(violation);
while violations.len() > self.config.max_violations {
violations.pop_front();
}
}
fn capture_stack_trace(&self) -> Option<Arc<Backtrace>> {
if self.config.capture_stack_traces {
Some(Arc::new(Backtrace::capture()))
} else {
None
}
}
}
#[derive(Debug, Clone)]
pub struct RuntimeEpochStatistics {
pub transitions_tracked: u64,
pub violations_detected: u64,
pub consistency_checks_performed: u64,
pub global_epoch: EpochId,
pub tracked_modules: usize,
pub min_module_epoch: EpochId,
pub max_module_epoch: EpochId,
pub epoch_variance: u64,
pub total_violations: usize,
}
impl fmt::Display for RuntimeEpochStatistics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"RuntimeEpochStats {{ transitions: {}, violations: {}, checks: {}, global_epoch: {}, modules: {}, epoch_range: {}-{}, variance: {}, total_violations: {} }}",
self.transitions_tracked,
self.violations_detected,
self.consistency_checks_performed,
self.global_epoch.as_u64(),
self.tracked_modules,
self.min_module_epoch.as_u64(),
self.max_module_epoch.as_u64(),
self.epoch_variance,
self.total_violations
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::init_test_logging;
#[test]
fn test_normal_epoch_transitions() {
init_test_logging();
let oracle = RuntimeEpochOracle::with_default_config();
let now = Time::ZERO;
assert_eq!(
oracle.module_epoch(RuntimeModule::Scheduler),
Some(EpochId::new(1))
);
oracle.notify_epoch_transition_start(RuntimeModule::Scheduler, EpochId::new(1), now);
oracle.notify_epoch_transition_complete(
RuntimeModule::Scheduler,
EpochId::new(2),
Time::from_nanos(1000),
);
let stats = oracle.get_statistics();
assert_eq!(stats.violations_detected, 0);
assert_eq!(stats.transitions_tracked, 1);
assert_eq!(
oracle.module_epoch(RuntimeModule::Scheduler),
Some(EpochId::new(2))
);
}
#[test]
fn test_epoch_skew_detection() {
init_test_logging();
let config = RuntimeEpochConfig {
max_epoch_skew: 1, ..Default::default()
};
let oracle = RuntimeEpochOracle::new(config);
let now = Time::ZERO;
oracle.notify_epoch_transition_complete(RuntimeModule::Scheduler, EpochId::new(3), now);
oracle.check_epoch_consistency(now);
let stats = oracle.get_statistics();
assert!(stats.violations_detected > 0);
assert_eq!(stats.epoch_variance, 2);
let violations = oracle.get_recent_violations(1);
assert!(!violations.is_empty());
assert!(matches!(
violations[0],
RuntimeEpochViolation::EpochSkew { .. }
));
}
#[test]
fn test_slow_transition_detection() {
init_test_logging();
let config = RuntimeEpochConfig {
max_transition_duration_ns: 100_000, ..Default::default()
};
let oracle = RuntimeEpochOracle::new(config);
let now = Time::ZERO;
oracle.notify_epoch_transition_start(RuntimeModule::Scheduler, EpochId::new(1), now);
oracle.notify_epoch_transition_complete(
RuntimeModule::Scheduler,
EpochId::new(2),
Time::from_nanos(1_000_000), );
let stats = oracle.get_statistics();
assert_eq!(stats.violations_detected, 1);
let violations = oracle.get_recent_violations(1);
assert!(matches!(
violations[0],
RuntimeEpochViolation::SlowTransition { .. }
));
}
#[test]
fn test_stale_epoch_update_detection() {
init_test_logging();
let oracle = RuntimeEpochOracle::with_default_config();
let now = Time::ZERO;
oracle.notify_epoch_transition_complete(RuntimeModule::Scheduler, EpochId::new(2), now);
oracle.notify_epoch_update(RuntimeModule::Scheduler, EpochId::new(1), now);
let stats = oracle.get_statistics();
assert_eq!(stats.violations_detected, 1);
let violations = oracle.get_recent_violations(1);
assert!(matches!(
violations[0],
RuntimeEpochViolation::StaleEpochUpdate { .. }
));
}
#[test]
fn test_oracle_check_method() {
init_test_logging();
let oracle = RuntimeEpochOracle::with_default_config();
let result = oracle.check(Time::ZERO);
assert!(result.is_ok());
oracle.notify_epoch_transition_complete(
RuntimeModule::Scheduler,
EpochId::new(10),
Time::ZERO,
);
let result = oracle.check(Time::ZERO);
assert!(result.is_err());
}
#[test]
fn test_oracle_reset() {
init_test_logging();
let oracle = RuntimeEpochOracle::with_default_config();
oracle.notify_epoch_transition_complete(
RuntimeModule::Scheduler,
EpochId::new(5),
Time::ZERO,
);
oracle.check_epoch_consistency(Time::ZERO);
let stats_before = oracle.get_statistics();
assert!(stats_before.violations_detected > 0);
assert!(stats_before.global_epoch.as_u64() > 1);
oracle.reset();
let stats_after = oracle.get_statistics();
assert_eq!(stats_after.violations_detected, 0);
assert_eq!(stats_after.transitions_tracked, 0);
assert_eq!(stats_after.global_epoch.as_u64(), 1);
assert_eq!(stats_after.epoch_variance, 0);
for &module in RuntimeModule::all_modules() {
assert_eq!(oracle.module_epoch(module), Some(EpochId::new(1)));
}
}
#[test]
fn test_global_epoch_tracking() {
init_test_logging();
let oracle = RuntimeEpochOracle::with_default_config();
assert_eq!(oracle.global_epoch().as_u64(), 1);
oracle.notify_epoch_transition_complete(
RuntimeModule::Scheduler,
EpochId::new(3),
Time::ZERO,
);
oracle.notify_epoch_transition_complete(
RuntimeModule::TaskTable,
EpochId::new(2),
Time::ZERO,
);
assert_eq!(oracle.global_epoch().as_u64(), 3);
}
#[test]
fn test_all_modules_tracked() {
init_test_logging();
let oracle = RuntimeEpochOracle::with_default_config();
let module_states = oracle.get_module_states();
assert_eq!(module_states.len(), RuntimeModule::all_modules().len());
for &expected_module in RuntimeModule::all_modules() {
assert!(
module_states
.iter()
.any(|(module, _, _)| *module == expected_module)
);
}
}
#[test]
fn test_stale_transition_start_is_ignored_and_recorded() {
init_test_logging();
let config = RuntimeEpochConfig {
max_transition_duration_ns: 100,
..Default::default()
};
let oracle = RuntimeEpochOracle::new(config);
oracle.notify_epoch_transition_complete(
RuntimeModule::Scheduler,
EpochId::new(2),
Time::from_nanos(10),
);
oracle.notify_epoch_transition_start(
RuntimeModule::Scheduler,
EpochId::new(1),
Time::from_nanos(20),
);
oracle.check_epoch_consistency(Time::from_nanos(1_000));
assert!(
!oracle
.get_recent_violations(4)
.iter()
.any(|violation| matches!(violation, RuntimeEpochViolation::SlowTransition { .. })),
"stale start witness must not manufacture an in-flight slow transition"
);
assert!(matches!(
oracle.get_recent_violations(1).first(),
Some(RuntimeEpochViolation::MissedTransition {
module: RuntimeModule::Scheduler,
expected_epoch,
actual_epoch,
..
}) if *expected_epoch == EpochId::new(2) && *actual_epoch == EpochId::new(1)
));
}
#[test]
fn test_backward_transition_completion_does_not_rewind_epoch() {
init_test_logging();
let oracle = RuntimeEpochOracle::with_default_config();
oracle.notify_epoch_transition_complete(
RuntimeModule::Scheduler,
EpochId::new(2),
Time::from_nanos(10),
);
oracle.notify_epoch_transition_complete(
RuntimeModule::Scheduler,
EpochId::new(1),
Time::from_nanos(20),
);
let stats = oracle.get_statistics();
assert_eq!(stats.transitions_tracked, 1);
assert_eq!(
oracle.module_epoch(RuntimeModule::Scheduler),
Some(EpochId::new(2))
);
assert!(matches!(
oracle.get_recent_violations(1).first(),
Some(RuntimeEpochViolation::OrderViolation {
first_module: RuntimeModule::Scheduler,
first_epoch,
second_module: RuntimeModule::Scheduler,
second_epoch,
..
}) if *first_epoch == EpochId::new(2) && *second_epoch == EpochId::new(1)
));
}
#[test]
fn test_skipped_forward_transition_records_missing_transition_and_advances() {
init_test_logging();
let oracle = RuntimeEpochOracle::with_default_config();
oracle.notify_epoch_transition_complete(
RuntimeModule::Scheduler,
EpochId::new(4),
Time::from_nanos(10),
);
let stats = oracle.get_statistics();
assert_eq!(stats.transitions_tracked, 1);
assert_eq!(
oracle.module_epoch(RuntimeModule::Scheduler),
Some(EpochId::new(4))
);
assert!(matches!(
oracle.get_recent_violations(1).first(),
Some(RuntimeEpochViolation::MissedTransition {
module: RuntimeModule::Scheduler,
expected_epoch,
actual_epoch,
..
}) if *expected_epoch == EpochId::new(2) && *actual_epoch == EpochId::new(4)
));
}
#[test]
fn test_slow_transition_reports_actual_from_epoch() {
init_test_logging();
let config = RuntimeEpochConfig {
max_transition_duration_ns: 50,
..Default::default()
};
let oracle = RuntimeEpochOracle::new(config);
oracle.notify_epoch_transition_complete(
RuntimeModule::Scheduler,
EpochId::new(5),
Time::from_nanos(10),
);
oracle.notify_epoch_transition_start(
RuntimeModule::Scheduler,
EpochId::new(5),
Time::from_nanos(20),
);
oracle.notify_epoch_transition_complete(
RuntimeModule::Scheduler,
EpochId::new(8),
Time::from_nanos(200),
);
let violations = oracle.get_recent_violations(2);
assert!(violations.iter().any(|violation| {
matches!(
violation,
RuntimeEpochViolation::SlowTransition {
module: RuntimeModule::Scheduler,
from_epoch,
to_epoch,
transition_duration_ns: 180,
..
} if *from_epoch == EpochId::new(5) && *to_epoch == EpochId::new(8)
)
}));
assert!(violations.iter().any(|violation| {
matches!(
violation,
RuntimeEpochViolation::MissedTransition {
module: RuntimeModule::Scheduler,
expected_epoch,
actual_epoch,
..
} if *expected_epoch == EpochId::new(6) && *actual_epoch == EpochId::new(8)
)
}));
}
}