use crate::types::{TaskId, Time};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct InvariantConfig {
pub enable_verification: bool,
pub enable_diagnostics: bool,
pub max_tracked_violations: usize,
pub priority_inversion_threshold_ms: u64,
pub starvation_threshold_ms: u64,
pub load_imbalance_threshold: f64,
pub enable_stack_traces: bool,
}
impl Default for InvariantConfig {
fn default() -> Self {
Self {
enable_verification: true,
enable_diagnostics: false,
max_tracked_violations: 1000,
priority_inversion_threshold_ms: 10,
starvation_threshold_ms: 100,
load_imbalance_threshold: 2.0,
enable_stack_traces: false,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum InvariantCategory {
PriorityOrdering,
Fairness,
TaskLifecycle,
QueueConsistency,
WorkStealing,
Cancellation,
LoadBalancing,
}
#[derive(Debug, Clone, PartialEq)]
pub enum SchedulerInvariant {
TaskInMultipleQueues {
task_id: TaskId,
queue_count: usize,
},
PriorityOrderViolation {
high_priority_task: TaskId,
high_priority: u8,
low_priority_task: TaskId,
low_priority: u8,
},
TaskStarvation {
task_id: TaskId,
wait_time_ms: u64,
queue_position: usize,
},
LoadImbalance {
overloaded_worker: usize,
underloaded_worker: usize,
load_ratio: f64,
},
WorkStealingDoubleExecution {
task_id: TaskId,
original_worker: usize,
stealing_worker: usize,
},
CancelledTaskLeak {
task_id: TaskId,
queue_name: String,
time_since_cancel_ms: u64,
},
QueueDepthMismatch {
queue_name: String,
reported_depth: usize,
actual_depth: usize,
},
InvalidStateTransition {
task_id: TaskId,
from_state: String,
to_state: String,
},
}
impl SchedulerInvariant {
#[must_use]
pub fn category(&self) -> InvariantCategory {
match self {
Self::PriorityOrderViolation { .. } => InvariantCategory::PriorityOrdering,
Self::TaskStarvation { .. } | Self::LoadImbalance { .. } => InvariantCategory::Fairness,
Self::InvalidStateTransition { .. } => InvariantCategory::TaskLifecycle,
Self::TaskInMultipleQueues { .. } | Self::QueueDepthMismatch { .. } => {
InvariantCategory::QueueConsistency
}
Self::WorkStealingDoubleExecution { .. } => InvariantCategory::WorkStealing,
Self::CancelledTaskLeak { .. } => InvariantCategory::Cancellation,
}
}
#[must_use]
pub fn severity(&self) -> u8 {
match self {
Self::QueueDepthMismatch { .. } => 1, Self::TaskStarvation { .. }
| Self::LoadImbalance { .. }
| Self::PriorityOrderViolation { .. }
| Self::CancelledTaskLeak { .. } => 2, Self::TaskInMultipleQueues { .. }
| Self::WorkStealingDoubleExecution { .. }
| Self::InvalidStateTransition { .. } => 3, }
}
#[must_use]
pub fn description(&self) -> String {
match self {
Self::TaskInMultipleQueues {
task_id,
queue_count,
} => {
format!("Task {task_id:?} found in {queue_count} queues simultaneously")
}
Self::PriorityOrderViolation {
high_priority_task,
high_priority,
low_priority_task,
low_priority,
} => {
format!(
"Priority violation: task {low_priority_task:?} (priority {low_priority}) scheduled after task {high_priority_task:?} (priority {high_priority})"
)
}
Self::TaskStarvation {
task_id,
wait_time_ms,
queue_position,
} => {
format!(
"Task {task_id:?} starved for {wait_time_ms}ms at queue position {queue_position}"
)
}
Self::LoadImbalance {
overloaded_worker,
underloaded_worker,
load_ratio,
} => {
format!(
"Load imbalance: worker {overloaded_worker} has {load_ratio:.2}x load of worker {underloaded_worker}"
)
}
Self::WorkStealingDoubleExecution {
task_id,
original_worker,
stealing_worker,
} => {
format!(
"Task {task_id:?} executed on both worker {original_worker} and worker {stealing_worker} (double execution)"
)
}
Self::CancelledTaskLeak {
task_id,
queue_name,
time_since_cancel_ms,
} => {
format!(
"Cancelled task {task_id:?} still in {queue_name} after {time_since_cancel_ms}ms"
)
}
Self::QueueDepthMismatch {
queue_name,
reported_depth,
actual_depth,
} => {
format!(
"Queue {queue_name} reports depth {reported_depth} but contains {actual_depth} items"
)
}
Self::InvalidStateTransition {
task_id,
from_state,
to_state,
} => {
format!("Task {task_id:?} invalid transition from {from_state} to {to_state}")
}
}
}
}
#[derive(Debug, Clone)]
pub struct InvariantViolation {
pub invariant: SchedulerInvariant,
pub timestamp: Time,
pub worker_id: Option<usize>,
pub stack_trace: Option<String>,
pub context: HashMap<String, String>,
}
#[derive(Debug, Clone, Default)]
pub struct InvariantStats {
pub violations_by_category: HashMap<InvariantCategory, u64>,
pub violations_by_severity: [u64; 4], pub last_violation_time: Option<Time>,
pub monitored_workers: usize,
pub operations_monitored: u64,
pub avg_monitoring_overhead_ns: u64,
}
#[derive(Debug, Clone)]
struct TaskInvariantState {
queues: HashSet<String>,
priority: u8,
enqueue_time: Time,
last_update: Time,
lifecycle_state: String,
owner_worker: Option<usize>,
is_cancelled: bool,
}
#[derive(Debug, Clone)]
pub struct QueueSnapshot {
pub name: String,
pub reported_depth: usize,
pub actual_tasks: Vec<TaskId>,
pub priority_range: Option<(u8, u8)>,
pub time_range: Option<(Time, Time)>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TrackedTaskSnapshot {
pub task_id: TaskId,
pub queues: Vec<String>,
pub priority: u8,
pub enqueue_time: Time,
pub last_update: Time,
pub lifecycle_state: String,
pub owner_worker: Option<usize>,
pub is_cancelled: bool,
}
#[derive(Debug, Clone)]
pub struct WorkerLoadSnapshot {
pub worker_id: usize,
pub local_queue_depth: usize,
pub executing_count: usize,
pub recent_task_count: u64,
pub avg_execution_time_ms: f64,
}
#[derive(Debug)]
pub struct SchedulerInvariantMonitor {
config: InvariantConfig,
violations: VecDeque<InvariantViolation>,
task_states: HashMap<TaskId, TaskInvariantState>,
stats: InvariantStats,
operations_count: AtomicU64,
total_overhead_ns: AtomicU64,
last_cleanup: Option<Time>,
}
impl SchedulerInvariantMonitor {
#[must_use]
pub fn new(config: InvariantConfig) -> Self {
Self {
config,
violations: VecDeque::new(),
task_states: HashMap::new(),
stats: InvariantStats::default(),
operations_count: AtomicU64::new(0),
total_overhead_ns: AtomicU64::new(0),
last_cleanup: None,
}
}
#[must_use]
pub fn with_defaults() -> Self {
Self::new(InvariantConfig::default())
}
pub fn record_task_enqueue(
&mut self,
task_id: TaskId,
queue_name: &str,
priority: u8,
timestamp: Time,
) {
if !self.config.enable_verification {
return;
}
let start = std::time::Instant::now();
let should_record_violation = {
let task_state =
self.task_states
.entry(task_id)
.or_insert_with(|| TaskInvariantState {
queues: HashSet::new(),
priority,
enqueue_time: timestamp,
last_update: timestamp,
lifecycle_state: "enqueued".to_string(),
owner_worker: None,
is_cancelled: false,
});
let already_queued = !task_state.queues.is_empty();
let inserted_new_queue = task_state.queues.insert(queue_name.to_string());
task_state.last_update = timestamp;
if already_queued && inserted_new_queue {
Some(task_state.queues.len())
} else {
None
}
};
if let Some(queue_count) = should_record_violation {
self.record_violation(
SchedulerInvariant::TaskInMultipleQueues {
task_id,
queue_count,
},
timestamp,
None,
);
}
self.update_monitoring_overhead(start.elapsed());
}
pub fn record_task_requeue(
&mut self,
task_id: TaskId,
queue_name: &str,
priority: u8,
timestamp: Time,
) {
if !self.config.enable_verification {
return;
}
let start = std::time::Instant::now();
let task_state = self
.task_states
.entry(task_id)
.or_insert_with(|| TaskInvariantState {
queues: HashSet::new(),
priority,
enqueue_time: timestamp,
last_update: timestamp,
lifecycle_state: "enqueued".to_string(),
owner_worker: None,
is_cancelled: false,
});
task_state.queues.clear();
task_state.queues.insert(queue_name.to_string());
task_state.priority = priority;
task_state.enqueue_time = timestamp;
task_state.last_update = timestamp;
task_state.lifecycle_state = "enqueued".to_string();
self.update_monitoring_overhead(start.elapsed());
}
pub fn record_task_dequeue(&mut self, task_id: TaskId, queue_name: &str, timestamp: Time) {
if !self.config.enable_verification {
return;
}
let start = std::time::Instant::now();
if let Some(task_state) = self.task_states.get_mut(&task_id) {
task_state.queues.remove(queue_name);
task_state.last_update = timestamp;
}
if self
.task_states
.get(&task_id)
.is_some_and(|task_state| task_state.queues.is_empty())
{
self.task_states.remove(&task_id);
}
self.update_monitoring_overhead(start.elapsed());
}
pub fn record_task_dispatch(&mut self, task_id: TaskId, timestamp: Time) {
if !self.config.enable_verification {
return;
}
let start = std::time::Instant::now();
if let Some(task_state) = self.task_states.get_mut(&task_id) {
task_state.queues.clear();
task_state.last_update = timestamp;
}
self.task_states.remove(&task_id);
self.update_monitoring_overhead(start.elapsed());
}
pub fn record_task_cancel(&mut self, task_id: TaskId, timestamp: Time) {
if !self.config.enable_verification {
return;
}
if let Some(task_state) = self.task_states.get_mut(&task_id) {
task_state.is_cancelled = true;
task_state.lifecycle_state = "cancelled".to_string();
task_state.last_update = timestamp;
}
}
pub fn record_task_complete(&mut self, task_id: TaskId, worker_id: usize, timestamp: Time) {
if !self.config.enable_verification {
return;
}
let leaked_queues = {
if let Some(task_state) = self.task_states.get_mut(&task_id) {
let leaked = if !task_state.queues.is_empty() && task_state.is_cancelled {
let time_since_cancel = timestamp
.as_nanos()
.saturating_sub(task_state.last_update.as_nanos())
/ 1_000_000;
Some((task_state.queues.clone(), time_since_cancel))
} else {
None
};
task_state.lifecycle_state = "completed".to_string();
task_state.last_update = timestamp;
leaked
} else {
None
}
};
if let Some((queues, time_since_cancel_ms)) = leaked_queues {
for queue_name in queues {
self.record_violation(
SchedulerInvariant::CancelledTaskLeak {
task_id,
queue_name,
time_since_cancel_ms,
},
timestamp,
Some(worker_id),
);
}
}
}
pub fn verify_priority_ordering(
&mut self,
first_task: TaskId,
first_priority: u8,
second_task: TaskId,
second_priority: u8,
timestamp: Time,
) {
if !self.config.enable_verification {
return;
}
if first_priority < second_priority {
self.record_violation(
SchedulerInvariant::PriorityOrderViolation {
high_priority_task: second_task,
high_priority: second_priority,
low_priority_task: first_task,
low_priority: first_priority,
},
timestamp,
None,
);
}
}
pub fn verify_queue_consistency(&mut self, snapshot: &QueueSnapshot, timestamp: Time) {
if !self.config.enable_verification {
return;
}
let start = std::time::Instant::now();
if snapshot.reported_depth != snapshot.actual_tasks.len() {
self.record_violation(
SchedulerInvariant::QueueDepthMismatch {
queue_name: snapshot.name.clone(),
reported_depth: snapshot.reported_depth,
actual_depth: snapshot.actual_tasks.len(),
},
timestamp,
None,
);
}
if let Some((oldest_time, _)) = snapshot.time_range {
let wait_time_ms =
timestamp.as_nanos().saturating_sub(oldest_time.as_nanos()) / 1_000_000;
if wait_time_ms > self.config.starvation_threshold_ms {
if let Some(&oldest_task) = snapshot.actual_tasks.first() {
self.record_violation(
SchedulerInvariant::TaskStarvation {
task_id: oldest_task,
wait_time_ms,
queue_position: 0,
},
timestamp,
None,
);
}
}
}
self.update_monitoring_overhead(start.elapsed());
}
pub fn verify_load_balance(&mut self, worker_loads: &[WorkerLoadSnapshot], timestamp: Time) {
if !self.config.enable_verification || worker_loads.len() < 2 {
return;
}
let start = std::time::Instant::now();
let min_worker = worker_loads
.iter()
.min_by_key(|w| w.local_queue_depth)
.unwrap();
let max_worker = worker_loads
.iter()
.max_by_key(|w| w.local_queue_depth)
.unwrap();
if min_worker.local_queue_depth > 0 {
let load_ratio =
max_worker.local_queue_depth as f64 / min_worker.local_queue_depth as f64;
if load_ratio > self.config.load_imbalance_threshold {
self.record_violation(
SchedulerInvariant::LoadImbalance {
overloaded_worker: max_worker.worker_id,
underloaded_worker: min_worker.worker_id,
load_ratio,
},
timestamp,
Some(max_worker.worker_id),
);
}
}
self.update_monitoring_overhead(start.elapsed());
}
#[allow(unused_variables)]
fn record_violation(
&mut self,
invariant: SchedulerInvariant,
timestamp: Time,
worker_id: Option<usize>,
) {
let severity = invariant.severity();
let category = invariant.category();
*self
.stats
.violations_by_category
.entry(category)
.or_insert(0) += 1;
self.stats.violations_by_severity[severity as usize] += 1;
self.stats.last_violation_time = Some(timestamp);
let violation = InvariantViolation {
invariant: invariant.clone(),
timestamp,
worker_id,
stack_trace: if self.config.enable_stack_traces {
Some(format!("{:?}", std::backtrace::Backtrace::capture()))
} else {
None
},
context: HashMap::new(),
};
if self.config.enable_diagnostics {
crate::tracing_compat::error!(
category = ?category,
severity = severity,
worker_id = ?worker_id,
timestamp = ?timestamp,
invariant = ?invariant,
description = %invariant.description(),
"scheduler invariant violation"
);
}
self.violations.push_back(violation);
while self.violations.len() > self.config.max_tracked_violations {
self.violations.pop_front();
}
}
fn update_monitoring_overhead(&self, elapsed: Duration) {
self.operations_count.fetch_add(1, Ordering::Relaxed);
self.total_overhead_ns
.fetch_add(elapsed.as_nanos() as u64, Ordering::Relaxed);
}
pub fn stats(&self) -> InvariantStats {
let operations = self.operations_count.load(Ordering::Relaxed);
let total_overhead = self.total_overhead_ns.load(Ordering::Relaxed);
InvariantStats {
violations_by_category: self.stats.violations_by_category.clone(),
violations_by_severity: self.stats.violations_by_severity,
last_violation_time: self.stats.last_violation_time,
monitored_workers: self.stats.monitored_workers,
operations_monitored: operations,
avg_monitoring_overhead_ns: if operations > 0 {
total_overhead / operations
} else {
0
},
}
}
pub fn violations(&self) -> &VecDeque<InvariantViolation> {
&self.violations
}
pub fn violations_by_category(&self, category: InvariantCategory) -> Vec<&InvariantViolation> {
self.violations
.iter()
.filter(|v| v.invariant.category() == category)
.collect()
}
pub fn violations_by_severity(&self, severity: u8) -> Vec<&InvariantViolation> {
self.violations
.iter()
.filter(|v| v.invariant.severity() == severity)
.collect()
}
pub fn tracked_tasks(&self) -> Vec<TrackedTaskSnapshot> {
let mut tasks = self
.task_states
.iter()
.map(|(task_id, state)| {
let mut queues = state.queues.iter().cloned().collect::<Vec<_>>();
queues.sort();
TrackedTaskSnapshot {
task_id: *task_id,
queues,
priority: state.priority,
enqueue_time: state.enqueue_time,
last_update: state.last_update,
lifecycle_state: state.lifecycle_state.clone(),
owner_worker: state.owner_worker,
is_cancelled: state.is_cancelled,
}
})
.collect::<Vec<_>>();
tasks.sort_by_key(|task| task.task_id);
tasks
}
pub fn cleanup_old_data(&mut self, current_time: Time, max_age: Duration) {
let cutoff_time = Time::from_nanos(
current_time
.as_nanos()
.saturating_sub(max_age.as_nanos() as u64),
);
self.task_states
.retain(|_, state| state.last_update.as_nanos() >= cutoff_time.as_nanos());
self.violations
.retain(|violation| violation.timestamp.as_nanos() >= cutoff_time.as_nanos());
self.last_cleanup = Some(current_time);
}
pub fn is_enabled(&self) -> bool {
self.config.enable_verification
}
pub fn config(&self) -> &InvariantConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_invariant_monitor_basic_operations() {
let mut monitor = SchedulerInvariantMonitor::with_defaults();
let now = Time::from_nanos(1000);
let task_id = TaskId::new_for_test(42, 0);
monitor.record_task_enqueue(task_id, "ready_queue", 1, now);
assert_eq!(monitor.task_states.len(), 1);
assert!(monitor.violations.is_empty());
monitor.record_task_dequeue(task_id, "ready_queue", now);
assert!(monitor.violations.is_empty());
monitor.record_task_complete(task_id, 0, now);
assert!(monitor.violations.is_empty());
}
#[test]
fn test_invariant_violations_detected() {
let mut monitor = SchedulerInvariantMonitor::with_defaults();
let now = Time::from_nanos(1000);
let task_id = TaskId::new_for_test(42, 0);
monitor.record_task_enqueue(task_id, "ready_queue", 1, now);
monitor.record_task_enqueue(task_id, "timed_queue", 1, now);
assert_eq!(monitor.violations.len(), 1);
match &monitor.violations[0].invariant {
SchedulerInvariant::TaskInMultipleQueues {
task_id: tid,
queue_count,
} => {
assert_eq!(*tid, task_id);
assert_eq!(*queue_count, 2);
}
_ => panic!("Expected TaskInMultipleQueues violation"),
}
}
#[test]
fn test_priority_ordering_violations() {
let mut monitor = SchedulerInvariantMonitor::with_defaults();
let now = Time::from_nanos(1000);
monitor.verify_priority_ordering(
TaskId::new_for_test(1, 0),
3,
TaskId::new_for_test(2, 0),
5,
now,
);
assert_eq!(monitor.violations.len(), 1);
match &monitor.violations[0].invariant {
SchedulerInvariant::PriorityOrderViolation {
high_priority_task,
high_priority,
low_priority_task,
low_priority,
} => {
assert_eq!(*high_priority_task, TaskId::new_for_test(2, 0));
assert_eq!(*high_priority, 5);
assert_eq!(*low_priority_task, TaskId::new_for_test(1, 0));
assert_eq!(*low_priority, 3);
}
_ => panic!("Expected PriorityOrderViolation"),
}
}
#[test]
fn test_queue_consistency_verification() {
let mut monitor = SchedulerInvariantMonitor::with_defaults();
let now = Time::from_nanos(1000);
let snapshot = QueueSnapshot {
name: "test_queue".to_string(),
reported_depth: 5,
actual_tasks: vec![
TaskId::new_for_test(1, 0),
TaskId::new_for_test(2, 0),
TaskId::new_for_test(3, 0),
],
priority_range: Some((1, 3)),
time_range: Some((Time::from_nanos(500), now)),
};
monitor.verify_queue_consistency(&snapshot, now);
assert_eq!(monitor.violations.len(), 1);
match &monitor.violations[0].invariant {
SchedulerInvariant::QueueDepthMismatch {
queue_name,
reported_depth,
actual_depth,
} => {
assert_eq!(queue_name, "test_queue");
assert_eq!(*reported_depth, 5);
assert_eq!(*actual_depth, 3);
}
_ => panic!("Expected QueueDepthMismatch violation"),
}
}
#[test]
fn test_load_balance_verification() {
let mut monitor = SchedulerInvariantMonitor::with_defaults();
let now = Time::from_nanos(1000);
let worker_loads = vec![
WorkerLoadSnapshot {
worker_id: 0,
local_queue_depth: 10,
executing_count: 2,
recent_task_count: 100,
avg_execution_time_ms: 5.0,
},
WorkerLoadSnapshot {
worker_id: 1,
local_queue_depth: 2,
executing_count: 1,
recent_task_count: 20,
avg_execution_time_ms: 4.0,
},
];
monitor.verify_load_balance(&worker_loads, now);
assert_eq!(monitor.violations.len(), 1);
match &monitor.violations[0].invariant {
SchedulerInvariant::LoadImbalance {
overloaded_worker,
underloaded_worker,
load_ratio,
} => {
assert_eq!(*overloaded_worker, 0);
assert_eq!(*underloaded_worker, 1);
assert!((*load_ratio - 5.0).abs() < 0.1);
}
_ => panic!("Expected LoadImbalance violation"),
}
}
#[test]
fn test_statistics_tracking() {
let mut monitor = SchedulerInvariantMonitor::with_defaults();
let now = Time::from_nanos(1000);
monitor.verify_priority_ordering(
TaskId::new_for_test(1, 0),
3,
TaskId::new_for_test(2, 0),
5,
now,
);
monitor.verify_priority_ordering(
TaskId::new_for_test(3, 0),
2,
TaskId::new_for_test(4, 0),
7,
now,
);
let stats = monitor.stats();
assert_eq!(
stats.violations_by_category[&InvariantCategory::PriorityOrdering],
2
);
assert_eq!(stats.violations_by_severity[2], 2); assert_eq!(stats.last_violation_time, Some(now));
}
#[test]
fn test_cleanup_old_data() {
let mut monitor = SchedulerInvariantMonitor::with_defaults();
let old_time = Time::from_nanos(1000);
let new_time = Time::from_nanos(5000);
monitor.record_task_enqueue(TaskId::new_for_test(1, 0), "queue", 1, old_time);
monitor.verify_priority_ordering(
TaskId::new_for_test(2, 0),
3,
TaskId::new_for_test(3, 0),
5,
old_time,
);
assert_eq!(monitor.task_states.len(), 1);
assert_eq!(monitor.violations.len(), 1);
monitor.cleanup_old_data(new_time, Duration::from_nanos(2000));
assert_eq!(monitor.task_states.len(), 0);
assert_eq!(monitor.violations.len(), 0);
}
#[test]
fn test_tracked_task_snapshots_expose_internal_state() {
let mut monitor = SchedulerInvariantMonitor::with_defaults();
let enqueue_time = Time::from_nanos(1000);
let task_id = TaskId::new_for_test(7, 0);
monitor.record_task_enqueue(task_id, "ready_queue", 3, enqueue_time);
if let Some(state) = monitor.task_states.get_mut(&task_id) {
state.owner_worker = Some(2);
}
monitor.record_task_cancel(task_id, Time::from_nanos(1500));
let snapshots = monitor.tracked_tasks();
assert_eq!(snapshots.len(), 1);
let snapshot = &snapshots[0];
assert_eq!(snapshot.task_id, task_id);
assert_eq!(snapshot.queues, vec!["ready_queue".to_string()]);
assert_eq!(snapshot.priority, 3);
assert_eq!(snapshot.enqueue_time, enqueue_time);
assert_eq!(snapshot.last_update, Time::from_nanos(1500));
assert_eq!(snapshot.lifecycle_state, "cancelled");
assert_eq!(snapshot.owner_worker, Some(2));
assert!(snapshot.is_cancelled);
}
#[test]
fn test_reenqueue_same_queue_does_not_trigger_multiple_queue_violation() {
let mut monitor = SchedulerInvariantMonitor::with_defaults();
let task_id = TaskId::new_for_test(9, 0);
monitor.record_task_enqueue(task_id, "ready_queue", 10, Time::from_nanos(1_000));
monitor.record_task_enqueue(task_id, "ready_queue", 10, Time::from_nanos(1_200));
assert!(
monitor.violations.is_empty(),
"re-observing the same queue must not look like a multiple-queue violation"
);
}
#[test]
fn test_requeue_replaces_previous_queue_without_multiple_queue_violation() {
let mut monitor = SchedulerInvariantMonitor::with_defaults();
let task_id = TaskId::new_for_test(11, 0);
monitor.record_task_enqueue(task_id, "ready_queue", 10, Time::from_nanos(1_000));
monitor.record_task_requeue(task_id, "cancel_queue", 50, Time::from_nanos(1_200));
assert!(
monitor.violations.is_empty(),
"intentional queue moves must not look like multiple-queue corruption"
);
let snapshots = monitor.tracked_tasks();
assert_eq!(snapshots.len(), 1);
assert_eq!(snapshots[0].queues, vec!["cancel_queue".to_string()]);
assert_eq!(snapshots[0].priority, 50);
}
#[test]
fn test_dispatch_removes_task_from_tracking() {
let mut monitor = SchedulerInvariantMonitor::with_defaults();
let task_id = TaskId::new_for_test(12, 0);
monitor.record_task_enqueue(task_id, "ready_queue", 10, Time::from_nanos(1_000));
monitor.record_task_dispatch(task_id, Time::from_nanos(1_500));
assert!(
monitor.tracked_tasks().is_empty(),
"dispatched task should no longer appear as queued"
);
assert!(monitor.violations.is_empty());
}
#[test]
fn test_cancelled_task_leak_requires_cancelled_task_state() {
let task_id = TaskId::new_for_test(10, 0);
let mut uncancelled = SchedulerInvariantMonitor::with_defaults();
uncancelled.record_task_enqueue(task_id, "ready_queue", 10, Time::from_nanos(1_000));
uncancelled.record_task_complete(task_id, 0, Time::from_nanos(2_000));
assert!(
uncancelled.violations.is_empty(),
"non-cancelled tasks must not be reported as cancelled leaks"
);
let mut cancelled = SchedulerInvariantMonitor::with_defaults();
cancelled.record_task_enqueue(task_id, "ready_queue", 10, Time::from_nanos(1_000));
cancelled.record_task_cancel(task_id, Time::from_nanos(1_500));
cancelled.record_task_complete(task_id, 0, Time::from_nanos(3_500));
assert_eq!(cancelled.violations.len(), 1);
match &cancelled.violations[0].invariant {
SchedulerInvariant::CancelledTaskLeak {
task_id: leaked_task,
queue_name,
time_since_cancel_ms,
} => {
assert_eq!(*leaked_task, task_id);
assert_eq!(queue_name, "ready_queue");
assert_eq!(*time_since_cancel_ms, 0);
}
other => panic!("Expected CancelledTaskLeak, got {other:?}"), }
}
}