use super::config::LabConfig;
use super::oracle::OracleSuite;
use crate::lab::chaos::{ChaosRng, ChaosStats};
use crate::record::ObligationKind;
use crate::record::task::TaskState;
use crate::runtime::RuntimeState;
use crate::runtime::config::ObligationLeakResponse;
use crate::runtime::deadline_monitor::{
DeadlineMonitor, DeadlineWarning, MonitorConfig, default_warning_handler,
};
use crate::runtime::reactor::LabReactor;
use crate::runtime::scheduler::{DispatchLane, ScheduleCertificate};
use crate::time::VirtualClock;
use crate::trace::TraceBufferHandle;
use crate::trace::crashpack::{
CrashPack, CrashPackConfig, FailureInfo, FailureOutcome, ReplayCommand, artifact_filename,
};
use crate::trace::event::TraceEventKind;
use crate::trace::recorder::TraceRecorder;
use crate::trace::replay::{ReplayEvent, ReplayTrace, TraceMetadata};
use crate::trace::scoring::seed_fingerprint;
use crate::trace::{TraceData, TraceEvent, check_refinement_firewall};
use crate::trace::{canonicalize::trace_fingerprint, certificate::TraceCertificate};
use crate::types::Time;
use crate::types::{ObligationId, RegionId, TaskId};
use crate::util::det_hash::{DetHashMap, DetHashSet};
use crate::util::{DetEntropy, DetRng};
use parking_lot::Mutex;
use std::fmt;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct LabTraceCertificateSummary {
pub event_hash: u64,
pub event_count: u64,
pub schedule_hash: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum AutoAdvanceTermination {
Quiescent,
StepLimitReached,
StuckBailout,
}
impl fmt::Display for AutoAdvanceTermination {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Quiescent => f.write_str("quiescent"),
Self::StepLimitReached => f.write_str("step-limit-reached"),
Self::StuckBailout => f.write_str("stuck-bailout"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct VirtualTimeReport {
pub steps: u64,
pub auto_advances: u64,
pub total_wakeups: u64,
pub time_start: Time,
pub time_end: Time,
pub virtual_elapsed_nanos: u64,
pub termination: AutoAdvanceTermination,
}
impl VirtualTimeReport {
#[must_use]
pub const fn virtual_elapsed_ms(&self) -> u64 {
self.virtual_elapsed_nanos / 1_000_000
}
#[must_use]
pub const fn virtual_elapsed_secs(&self) -> u64 {
self.virtual_elapsed_nanos / 1_000_000_000
}
}
#[derive(Debug, Clone)]
pub struct LabRunReport {
pub seed: u64,
pub steps_delta: u64,
pub steps_total: u64,
pub quiescent: bool,
pub now_nanos: u64,
pub trace_len: usize,
pub trace_fingerprint: u64,
pub trace_certificate: LabTraceCertificateSummary,
pub oracle_report: crate::lab::oracle::OracleReport,
pub invariant_violations: Vec<String>,
pub temporal_invariant_failures: Vec<String>,
pub temporal_counterexample_prefix_len: Option<usize>,
pub refinement_firewall_rule_id: Option<String>,
pub refinement_firewall_event_index: Option<usize>,
pub refinement_firewall_event_seq: Option<u64>,
pub refinement_counterexample_prefix_len: Option<usize>,
pub refinement_firewall_skipped_due_to_trace_truncation: bool,
}
impl LabRunReport {
#[must_use]
pub fn to_json(&self) -> serde_json::Value {
use serde_json::json;
json!({
"seed": self.seed,
"steps_delta": self.steps_delta,
"steps_total": self.steps_total,
"quiescent": self.quiescent,
"now_nanos": self.now_nanos,
"trace": {
"len": self.trace_len,
"fingerprint": self.trace_fingerprint,
"certificate": {
"event_hash": self.trace_certificate.event_hash,
"event_count": self.trace_certificate.event_count,
"schedule_hash": self.trace_certificate.schedule_hash,
}
},
"oracles": self.oracle_report.to_json(),
"invariants": self.invariant_violations,
"temporal": {
"failed_invariants": self.temporal_invariant_failures,
"counterexample_prefix_len": self.temporal_counterexample_prefix_len,
},
"refinement_firewall": {
"rule_id": self.refinement_firewall_rule_id,
"event_index": self.refinement_firewall_event_index,
"event_seq": self.refinement_firewall_event_seq,
"counterexample_prefix_len": self.refinement_counterexample_prefix_len,
"skipped_due_to_trace_truncation": self.refinement_firewall_skipped_due_to_trace_truncation,
},
})
}
#[must_use]
pub fn export_tla(
&self,
trace_events: &[crate::trace::TraceEvent],
module_name: &str,
) -> Option<crate::trace::tla_export::TlaModule> {
if trace_events.is_empty() {
return None;
}
let exporter = crate::trace::tla_export::TlaExporter::from_trace(trace_events);
if exporter.snapshot_count() == 0 {
return None;
}
Some(exporter.export_behavior(module_name))
}
}
const TEMPORAL_ORACLE_INVARIANTS: &[&str] = &[
"task_leak",
"obligation_leak",
"quiescence",
"cancellation_protocol",
"loser_drain",
"region_tree",
"deadline_monotone",
#[cfg(feature = "messaging-fabric")]
"fabric_publish",
#[cfg(feature = "messaging-fabric")]
"fabric_reply",
#[cfg(feature = "messaging-fabric")]
"fabric_quiescence",
#[cfg(feature = "messaging-fabric")]
"fabric_redelivery",
];
#[derive(Debug, Clone, PartialEq)]
pub struct LabConfigSummary {
pub seed: u64,
pub entropy_seed: u64,
pub worker_count: usize,
pub panic_on_obligation_leak: bool,
pub trace_capacity: usize,
pub futurelock_max_idle_steps: u64,
pub panic_on_futurelock: bool,
pub max_steps: Option<u64>,
pub chaos: Option<ChaosConfigSummary>,
pub replay_recording_enabled: bool,
}
impl LabConfigSummary {
#[must_use]
pub fn from_config(config: &LabConfig) -> Self {
Self {
seed: config.seed,
entropy_seed: config.entropy_seed,
worker_count: config.worker_count,
panic_on_obligation_leak: config.panic_on_obligation_leak,
trace_capacity: config.trace_capacity,
futurelock_max_idle_steps: config.futurelock_max_idle_steps,
panic_on_futurelock: config.panic_on_futurelock,
max_steps: config.max_steps,
chaos: config.chaos.as_ref().map(ChaosConfigSummary::from_config),
replay_recording_enabled: config.replay_recording.is_some(),
}
}
#[must_use]
pub fn config_hash(&self) -> u64 {
use std::hash::{Hash, Hasher};
let mut h = crate::util::DetHasher::default();
self.seed.hash(&mut h);
self.entropy_seed.hash(&mut h);
self.worker_count.hash(&mut h);
self.panic_on_obligation_leak.hash(&mut h);
self.trace_capacity.hash(&mut h);
self.futurelock_max_idle_steps.hash(&mut h);
self.panic_on_futurelock.hash(&mut h);
self.max_steps.hash(&mut h);
self.replay_recording_enabled.hash(&mut h);
if let Some(ref c) = self.chaos {
1u8.hash(&mut h);
c.seed.hash(&mut h);
c.cancel_probability.to_bits().hash(&mut h);
c.delay_probability.to_bits().hash(&mut h);
c.io_error_probability.to_bits().hash(&mut h);
c.wakeup_storm_probability.to_bits().hash(&mut h);
c.budget_exhaust_probability.to_bits().hash(&mut h);
} else {
0u8.hash(&mut h);
}
h.finish()
}
#[must_use]
pub fn to_json(&self) -> serde_json::Value {
use serde_json::json;
json!({
"seed": self.seed,
"entropy_seed": self.entropy_seed,
"worker_count": self.worker_count,
"panic_on_obligation_leak": self.panic_on_obligation_leak,
"trace_capacity": self.trace_capacity,
"futurelock_max_idle_steps": self.futurelock_max_idle_steps,
"panic_on_futurelock": self.panic_on_futurelock,
"max_steps": self.max_steps,
"chaos": self.chaos.as_ref().map(ChaosConfigSummary::to_json),
"replay_recording_enabled": self.replay_recording_enabled,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct ChaosConfigSummary {
pub seed: u64,
pub cancel_probability: f64,
pub delay_probability: f64,
pub io_error_probability: f64,
pub wakeup_storm_probability: f64,
pub budget_exhaust_probability: f64,
}
impl ChaosConfigSummary {
#[must_use]
pub fn from_config(config: &crate::lab::chaos::ChaosConfig) -> Self {
Self {
seed: config.seed,
cancel_probability: config.cancel_probability,
delay_probability: config.delay_probability,
io_error_probability: config.io_error_probability,
wakeup_storm_probability: config.wakeup_storm_probability,
budget_exhaust_probability: config.budget_exhaust_probability,
}
}
#[must_use]
pub fn to_json(&self) -> serde_json::Value {
use serde_json::json;
json!({
"seed": self.seed,
"cancel_probability": self.cancel_probability,
"delay_probability": self.delay_probability,
"io_error_probability": self.io_error_probability,
"wakeup_storm_probability": self.wakeup_storm_probability,
"budget_exhaust_probability": self.budget_exhaust_probability,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum HarnessAttachmentKind {
CrashPack,
ReplayTrace,
Trace,
Other,
}
impl fmt::Display for HarnessAttachmentKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::CrashPack => write!(f, "crashpack"),
Self::ReplayTrace => write!(f, "replay_trace"),
Self::Trace => write!(f, "trace"),
Self::Other => write!(f, "other"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct HarnessAttachmentRef {
pub kind: HarnessAttachmentKind,
pub path: String,
}
impl HarnessAttachmentRef {
#[must_use]
pub fn crashpack(path: impl Into<String>) -> Self {
Self {
kind: HarnessAttachmentKind::CrashPack,
path: path.into(),
}
}
#[must_use]
pub fn replay_trace(path: impl Into<String>) -> Self {
Self {
kind: HarnessAttachmentKind::ReplayTrace,
path: path.into(),
}
}
#[must_use]
pub fn trace(path: impl Into<String>) -> Self {
Self {
kind: HarnessAttachmentKind::Trace,
path: path.into(),
}
}
#[must_use]
pub fn to_json(&self) -> serde_json::Value {
use serde_json::json;
json!({
"kind": self.kind.to_string(),
"path": self.path,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CrashpackLink {
pub path: String,
pub id: String,
pub fingerprint: u64,
pub replay: ReplayCommand,
}
impl CrashpackLink {
#[must_use]
pub fn to_json(&self) -> serde_json::Value {
use serde_json::json;
json!({
"path": self.path,
"id": self.id,
"fingerprint": self.fingerprint,
"replay": self.replay,
})
}
}
#[derive(Debug, Clone)]
pub struct SporkHarnessReport {
pub schema_version: u32,
pub app: String,
pub config: LabConfigSummary,
pub run: LabRunReport,
pub attachments: Vec<HarnessAttachmentRef>,
}
impl SporkHarnessReport {
pub const SCHEMA_VERSION: u32 = 3;
#[must_use]
pub fn new(
app: impl Into<String>,
config: &LabConfig,
run: LabRunReport,
attachments: Vec<HarnessAttachmentRef>,
) -> Self {
Self {
schema_version: Self::SCHEMA_VERSION,
app: app.into(),
config: LabConfigSummary::from_config(config),
run,
attachments,
}
}
#[must_use]
pub fn passed(&self) -> bool {
self.run.oracle_report.all_passed() && self.run.invariant_violations.is_empty()
}
#[must_use]
pub fn trace_fingerprint(&self) -> u64 {
self.run.trace_fingerprint
}
#[must_use]
pub fn seed(&self) -> u64 {
self.run.seed
}
#[must_use]
pub fn config_hash(&self) -> u64 {
self.config.config_hash()
}
#[must_use]
pub fn crashpack_path(&self) -> Option<&str> {
self.attachments
.iter()
.find(|a| a.kind == HarnessAttachmentKind::CrashPack)
.map(|a| a.path.as_str())
}
#[must_use]
pub fn crashpack_link(&self) -> Option<CrashpackLink> {
let path = self.crashpack_path()?.to_string();
let crash_config = CrashPackConfig {
seed: self.seed(),
config_hash: self.config_hash(),
worker_count: self.config.worker_count,
max_steps: self.config.max_steps,
commit_hash: None,
};
let replay = ReplayCommand::from_config(&crash_config, Some(path.as_str()));
Some(CrashpackLink {
id: format!(
"crashpack-{seed:016x}-{fingerprint:016x}",
seed = self.seed(),
fingerprint = self.trace_fingerprint()
),
fingerprint: self.trace_fingerprint(),
path,
replay,
})
}
#[must_use]
pub fn oracle_failures(&self) -> Vec<String> {
self.run
.oracle_report
.failures()
.iter()
.map(|e| {
let desc = e
.violation
.as_ref()
.map_or_else(String::new, |v| format!(": {v}"));
format!("{}{desc}", e.invariant)
})
.collect()
}
#[must_use]
pub fn summary_line(&self) -> String {
let verdict = if self.passed() { "PASS" } else { "FAIL" };
let oracle = &self.run.oracle_report;
format!(
"[{verdict}] app=\"{}\" seed={} fingerprint={} oracles={}/{} invariant_violations={}",
self.app,
self.run.seed,
self.run.trace_fingerprint,
oracle.passed,
oracle.total,
self.run.invariant_violations.len(),
)
}
#[must_use]
pub fn to_json(&self) -> serde_json::Value {
use serde_json::json;
let mut attachments = self.attachments.clone();
attachments.sort_by(|a, b| (a.kind, &a.path).cmp(&(b.kind, &b.path)));
json!({
"schema_version": self.schema_version,
"verdict": if self.passed() { "pass" } else { "fail" },
"app": { "name": self.app },
"lab": {
"config": self.config.to_json(),
"config_hash": self.config.config_hash(),
},
"fingerprints": {
"trace": self.run.trace_fingerprint,
"event_hash": self.run.trace_certificate.event_hash,
"event_count": self.run.trace_certificate.event_count,
"schedule_hash": self.run.trace_certificate.schedule_hash,
},
"run": self.run.to_json(),
"crashpack": self.crashpack_link().map(|link| link.to_json()),
"attachments": attachments.iter().map(HarnessAttachmentRef::to_json).collect::<Vec<_>>(),
})
}
}
#[derive(Debug)]
pub struct LabRuntime {
pub state: RuntimeState,
lab_reactor: Arc<LabReactor>,
seen_io_tokens: DetHashSet<usize>,
pub scheduler: Arc<Mutex<LabScheduler>>,
config: LabConfig,
rng: DetRng,
virtual_time: Time,
virtual_clock: Arc<VirtualClock>,
steps: u64,
chaos_rng: Option<ChaosRng>,
chaos_stats: ChaosStats,
seen_reactor_chaos_stats: ChaosStats,
replay_recorder: TraceRecorder,
deadline_monitor: Option<DeadlineMonitor>,
pub oracles: OracleSuite,
certificate: ScheduleCertificate,
}
impl LabRuntime {
#[must_use]
pub fn new(config: LabConfig) -> Self {
let rng = config.rng();
let chaos_rng = config.chaos.as_ref().map(ChaosRng::from_config);
let lab_reactor = config.chaos.as_ref().map_or_else(
|| Arc::new(LabReactor::new()),
|chaos| Arc::new(LabReactor::with_chaos(chaos.clone())),
);
let mut state = RuntimeState::with_reactor(lab_reactor.clone());
state.set_logical_clock_mode(crate::trace::distributed::LogicalClockMode::Lamport);
state.set_obligation_leak_response(if config.panic_on_obligation_leak {
ObligationLeakResponse::Panic
} else {
ObligationLeakResponse::Log
});
let virtual_clock = Arc::new(VirtualClock::starting_at(Time::ZERO));
state.set_timer_driver(crate::time::TimerDriverHandle::with_virtual_clock(
virtual_clock.clone(),
));
state.set_entropy_source(Arc::new(DetEntropy::new(config.entropy_seed)));
state.trace = TraceBufferHandle::new(config.trace_capacity);
let mut replay_recorder = if let Some(ref rec_config) = config.replay_recording {
TraceRecorder::with_config(TraceMetadata::new(config.seed), rec_config.clone())
} else {
TraceRecorder::disabled()
};
replay_recorder.record_rng_seed(config.seed);
crate::tracing_compat::info!("virtual clock initialized: start_time_ms=0");
Self {
state,
lab_reactor,
seen_io_tokens: DetHashSet::default(),
scheduler: Arc::new(Mutex::new(LabScheduler::new(config.worker_count))),
config,
rng,
virtual_time: Time::ZERO,
virtual_clock,
steps: 0,
chaos_rng,
chaos_stats: ChaosStats::new(),
seen_reactor_chaos_stats: ChaosStats::new(),
replay_recorder,
deadline_monitor: None,
oracles: OracleSuite::new(),
certificate: ScheduleCertificate::new(),
}
}
#[must_use]
pub fn with_seed(seed: u64) -> Self {
Self::new(LabConfig::new(seed))
}
#[must_use]
pub const fn now(&self) -> Time {
self.virtual_time
}
#[must_use]
pub const fn steps(&self) -> u64 {
self.steps
}
#[must_use]
pub const fn config(&self) -> &LabConfig {
&self.config
}
#[must_use]
pub fn lab_reactor(&self) -> &Arc<LabReactor> {
&self.lab_reactor
}
#[must_use]
pub fn trace(&self) -> &TraceBufferHandle {
&self.state.trace
}
#[must_use]
pub fn detected_races(&self) -> crate::trace::dpor::RaceReport {
crate::trace::dpor::detect_hb_races(&self.state.trace.snapshot())
}
#[must_use]
pub fn chaos_stats(&self) -> &ChaosStats {
&self.chaos_stats
}
#[must_use]
pub fn certificate(&self) -> &ScheduleCertificate {
&self.certificate
}
#[must_use]
pub fn has_replay_recording(&self) -> bool {
self.replay_recorder.is_enabled()
}
#[must_use]
pub fn replay_recorder(&self) -> &TraceRecorder {
&self.replay_recorder
}
pub fn take_replay_trace(&mut self) -> Option<ReplayTrace> {
self.replay_recorder.take()
}
pub fn finish_replay_trace(&mut self) -> Option<ReplayTrace> {
let recorder = std::mem::replace(&mut self.replay_recorder, TraceRecorder::disabled());
recorder.finish()
}
#[must_use]
pub fn has_chaos(&self) -> bool {
self.chaos_rng.is_some() && self.config.has_chaos()
}
#[must_use]
pub fn is_quiescent(&self) -> bool {
self.state.is_quiescent()
}
pub fn advance_time(&mut self, nanos: u64) {
let from = self.virtual_time;
self.virtual_time = self.virtual_time.saturating_add_nanos(nanos);
self.state.now = self.virtual_time;
self.virtual_clock.advance(nanos);
self.lab_reactor.advance_time(Duration::from_nanos(nanos));
self.replay_recorder
.record_time_advanced(from, self.virtual_time);
crate::tracing_compat::debug!(
"virtual clock advanced: delta_ms={}, new_time_ms={}",
nanos / 1_000_000,
self.virtual_time.as_nanos() / 1_000_000
);
}
pub fn advance_time_to(&mut self, time: Time) {
if time > self.virtual_time {
let from = self.virtual_time;
self.virtual_time = time;
self.state.now = self.virtual_time;
self.virtual_clock.advance_to(time);
self.lab_reactor.advance_time_to(time);
self.replay_recorder
.record_time_advanced(from, self.virtual_time);
crate::tracing_compat::debug!(
"virtual clock advanced: delta_ms={}, new_time_ms={}",
(time.as_nanos() - from.as_nanos()) / 1_000_000,
time.as_nanos() / 1_000_000
);
} else if time < self.virtual_time {
crate::tracing_compat::error!(
"virtual clock attempt to go backward: current_ms={}, requested_ms={}",
self.virtual_time.as_nanos() / 1_000_000,
time.as_nanos() / 1_000_000
);
}
}
pub fn advance_to_next_timer(&mut self) -> usize {
let next = self
.state
.timer_driver_handle()
.and_then(|h| h.next_deadline());
let Some(deadline) = next else {
return 0;
};
if deadline <= self.virtual_time {
return self
.state
.timer_driver_handle()
.map_or(0, |h| h.process_timers());
}
let delta_nanos = deadline.as_nanos() - self.virtual_time.as_nanos();
self.advance_time(delta_nanos);
let wakeups = self
.state
.timer_driver_handle()
.map_or(0, |h| h.process_timers());
crate::tracing_compat::debug!(
"virtual clock auto-advance: reason=all_tasks_blocked, \
next_wakeup_ms={}, delta_ms={}, wakeup_count={}",
deadline.as_nanos() / 1_000_000,
delta_nanos / 1_000_000,
wakeups
);
wakeups
}
#[must_use]
pub fn next_timer_deadline(&self) -> Option<Time> {
self.state
.timer_driver_handle()
.and_then(|h| h.next_deadline())
}
fn next_reactor_deadline(&self) -> Option<Time> {
self.state
.io_driver_handle()
.and_then(|_| self.lab_reactor.next_event_time())
}
fn next_auto_advance_deadline(&self) -> Option<Time> {
match (self.next_timer_deadline(), self.next_reactor_deadline()) {
(Some(timer), Some(reactor)) => Some(timer.min(reactor)),
(Some(timer), None) => Some(timer),
(None, Some(reactor)) => Some(reactor),
(None, None) => None,
}
}
fn pump_due_system_events(&mut self) -> usize {
let wakeups = self
.state
.timer_driver_handle()
.map_or(0, |h| h.process_timers());
self.poll_io();
self.schedule_async_finalizers();
self.check_deadline_monitor();
wakeups
}
#[must_use]
pub fn pending_timer_count(&self) -> usize {
self.state
.timer_driver_handle()
.map_or(0, |h| h.pending_count())
}
pub fn run_with_auto_advance(&mut self) -> VirtualTimeReport {
let start_steps = self.steps;
let mut auto_advances: u64 = 0;
let mut total_wakeups: u64 = 0;
let mut stuck_counter: u32 = 0;
let start_time = self.virtual_time;
let termination = loop {
if let Some(max) = self.config.max_steps {
if self.steps >= max {
break AutoAdvanceTermination::StepLimitReached;
}
}
let is_empty = self.scheduler.lock().is_empty();
if !is_empty {
stuck_counter = 0;
self.step();
continue;
}
if let Some(deadline) = self.next_auto_advance_deadline() {
if deadline > self.virtual_time {
self.advance_time_to(deadline);
let wakeups = self
.state
.timer_driver_handle()
.map_or(0, |h| h.process_timers());
auto_advances += 1;
total_wakeups += wakeups as u64;
continue;
}
total_wakeups += self.pump_due_system_events() as u64;
continue;
}
if self.is_quiescent() {
break AutoAdvanceTermination::Quiescent;
}
stuck_counter += 1;
if stuck_counter > 1000 {
break AutoAdvanceTermination::StuckBailout;
}
self.step();
};
VirtualTimeReport {
steps: self.steps - start_steps,
auto_advances,
total_wakeups,
time_start: start_time,
time_end: self.virtual_time,
virtual_elapsed_nanos: self.virtual_time.as_nanos() - start_time.as_nanos(),
termination,
}
}
pub fn pause_clock(&self) {
self.virtual_clock.pause();
crate::tracing_compat::info!(
"virtual clock paused at time_ms={}",
self.virtual_time.as_nanos() / 1_000_000
);
}
pub fn resume_clock(&self) {
self.virtual_clock.resume();
crate::tracing_compat::info!(
"virtual clock resumed at time_ms={}",
self.virtual_time.as_nanos() / 1_000_000
);
}
#[must_use]
pub fn is_clock_paused(&self) -> bool {
self.virtual_clock.is_paused()
}
#[allow(clippy::no_effect_underscore_binding)]
pub fn inject_clock_skew(&mut self, skew_nanos: u64) {
let old_nanos = self.virtual_time.as_nanos();
self.advance_time(skew_nanos);
crate::tracing_compat::warn!(
"virtual clock jump detected: old_time_ms={}, new_time_ms={}, jump_ms={} \
-- may affect lease/timeout correctness",
old_nanos / 1_000_000,
self.virtual_time.as_nanos() / 1_000_000,
skew_nanos / 1_000_000
);
#[cfg(not(feature = "tracing-integration"))]
let _ = old_nanos;
}
pub fn run_until_quiescent(&mut self) -> u64 {
let start_steps = self.steps;
while !self.is_quiescent() {
if let Some(max) = self.config.max_steps {
if self.steps >= max {
break;
}
}
self.step();
}
self.steps - start_steps
}
pub fn run_until_idle(&mut self) -> u64 {
let start_steps = self.steps;
loop {
if let Some(max) = self.config.max_steps {
if self.steps >= max {
break;
}
}
let is_empty = self.scheduler.lock().is_empty();
if is_empty {
break;
}
self.step();
}
self.steps - start_steps
}
#[must_use]
pub fn run_until_quiescent_with_report(&mut self) -> LabRunReport {
let steps_delta = self.run_until_quiescent();
self.report_with_steps_delta(steps_delta)
}
#[must_use]
pub fn report(&mut self) -> LabRunReport {
self.report_with_steps_delta(0)
}
#[must_use]
pub fn run_until_quiescent_spork_report(
&mut self,
app: impl Into<String>,
attachments: Vec<HarnessAttachmentRef>,
) -> SporkHarnessReport {
let run = self.run_until_quiescent_with_report();
self.build_spork_report(app.into(), run, attachments)
}
#[must_use]
pub fn spork_report(
&mut self,
app: impl Into<String>,
attachments: Vec<HarnessAttachmentRef>,
) -> SporkHarnessReport {
let run = self.report();
self.build_spork_report(app.into(), run, attachments)
}
fn build_spork_report(
&self,
app: String,
run: LabRunReport,
mut attachments: Vec<HarnessAttachmentRef>,
) -> SporkHarnessReport {
if let Some(auto_crashpack) = self.auto_crashpack_attachment(&run, &attachments) {
attachments.push(auto_crashpack);
}
SporkHarnessReport::new(app, &self.config, run, attachments)
}
fn auto_crashpack_attachment(
&self,
run: &LabRunReport,
attachments: &[HarnessAttachmentRef],
) -> Option<HarnessAttachmentRef> {
if attachments
.iter()
.any(|attachment| attachment.kind == HarnessAttachmentKind::CrashPack)
{
return None;
}
let crashpack = self.build_crashpack_for_report(run)?;
Some(HarnessAttachmentRef::crashpack(artifact_filename(
&crashpack,
)))
}
#[must_use]
pub fn build_crashpack_for_report(&self, run: &LabRunReport) -> Option<CrashPack> {
let has_failure = !run.oracle_report.all_passed()
|| !run.invariant_violations.is_empty()
|| run.refinement_firewall_rule_id.is_some();
if !has_failure {
return None;
}
let config_summary = LabConfigSummary::from_config(&self.config);
let crash_config = CrashPackConfig {
seed: run.seed,
config_hash: config_summary.config_hash(),
worker_count: self.config.worker_count,
max_steps: self.config.max_steps,
commit_hash: None,
};
let (task, region) = self
.state
.tasks_iter()
.find(|(_, task)| !task.state.is_terminal())
.map(|(_, task)| (task.id, task.owner))
.or_else(|| {
self.state
.obligations_iter()
.find(|(_, obligation)| obligation.is_pending())
.map(|(_, obligation)| (obligation.holder, obligation.region))
})
.or_else(|| {
self.state
.regions_iter()
.next()
.map(|(_, region)| (TaskId::testing_default(), region.id))
})
.or_else(|| {
self.state
.root_region
.map(|root| (TaskId::testing_default(), root))
})
.unwrap_or((TaskId::testing_default(), RegionId::testing_default()));
let mut oracle_violations = run.invariant_violations.clone();
oracle_violations.extend(
run.oracle_report
.failures()
.iter()
.map(|entry| entry.invariant.clone()),
);
if let Some(rule_id) = &run.refinement_firewall_rule_id {
oracle_violations.push(format!("refinement_firewall:{rule_id}"));
}
if let Some(prefix_len) = run.refinement_counterexample_prefix_len {
oracle_violations.push(format!(
"refinement_firewall:minimal_counterexample_prefix_len={prefix_len}"
));
}
oracle_violations.sort();
oracle_violations.dedup();
let trace_events = self.trace().snapshot();
let mut builder = CrashPack::builder(crash_config.clone())
.failure(FailureInfo {
task,
region,
outcome: FailureOutcome::Err,
virtual_time: Time::from_nanos(run.now_nanos),
})
.oracle_violations(oracle_violations)
.replay(ReplayCommand::from_config(&crash_config, None));
let divergent_prefix = self.auto_divergent_prefix();
if !divergent_prefix.is_empty() {
builder = builder.divergent_prefix(divergent_prefix);
}
builder = if trace_events.is_empty() {
builder
.fingerprint(run.trace_fingerprint)
.event_count(run.trace_certificate.event_count)
} else {
builder.from_trace(&trace_events)
};
match builder.build() {
Ok(pack) => Some(pack),
Err(err) => {
let _ = &err;
crate::tracing_compat::error!("failed to build crash pack for lab report: {err}");
None
}
}
}
fn auto_divergent_prefix(&self) -> Vec<ReplayEvent> {
let Some(replay_trace) = self.replay_recorder.snapshot() else {
return Vec::new();
};
if replay_trace.events.is_empty() {
return Vec::new();
}
let failure_index = replay_trace
.events
.iter()
.position(
|event| matches!(event, ReplayEvent::TaskCompleted { outcome, .. } if *outcome > 0),
)
.unwrap_or(replay_trace.events.len() - 1);
crate::trace::minimal_divergent_prefix(&replay_trace, failure_index).events
}
fn report_with_steps_delta(&mut self, steps_delta: u64) -> LabRunReport {
let seed = self.config.seed;
let quiescent = self.is_quiescent();
let now = self.now();
let trace_events = self.trace().snapshot();
let trace_len = trace_events.len();
let trace_fingerprint = if trace_events.is_empty() {
seed_fingerprint(seed)
} else {
trace_fingerprint(&trace_events)
};
let schedule_hash = self.certificate().hash();
let mut certificate = TraceCertificate::new();
for e in &trace_events {
certificate.record_event(e);
}
certificate.set_schedule_hash(schedule_hash);
self.oracles.hydrate_temporal_from_state(&self.state, now);
let oracle_report = self.oracles.report(now);
let temporal_invariant_failures = oracle_report
.failures()
.into_iter()
.filter(|entry| TEMPORAL_ORACLE_INVARIANTS.contains(&entry.invariant.as_str()))
.map(|entry| entry.invariant.clone())
.collect::<Vec<_>>();
let temporal_counterexample_prefix_len = if temporal_invariant_failures.is_empty() {
None
} else {
let prefix_len = self.auto_divergent_prefix().len();
(prefix_len > 0).then_some(prefix_len)
};
let refinement_firewall_skipped_due_to_trace_truncation =
self.trace().total_pushed() > trace_events.len() as u64;
let refinement_violation = if refinement_firewall_skipped_due_to_trace_truncation {
None
} else {
check_refinement_firewall(&trace_events).first_violation
};
let refinement_violation = refinement_violation.as_ref();
let refinement_firewall_rule_id = refinement_violation.map(|v| v.rule_id.to_owned());
let refinement_firewall_event_index = refinement_violation.map(|v| v.event_index);
let refinement_firewall_event_seq = refinement_violation.map(|v| v.event_seq);
let refinement_counterexample_prefix_len =
refinement_firewall_event_index.map(|idx| idx + 1);
let mut invariant_violations = self
.check_invariants()
.into_iter()
.map(|v| v.to_string())
.collect::<Vec<_>>();
for invariant in &temporal_invariant_failures {
invariant_violations.push(format!("temporal:{invariant}"));
}
if let Some(prefix_len) = temporal_counterexample_prefix_len {
invariant_violations.push(format!(
"temporal:minimal_divergent_prefix_len={prefix_len}"
));
}
if let Some(rule_id) = &refinement_firewall_rule_id {
invariant_violations.push(format!("refinement_firewall:{rule_id}"));
}
if let Some(prefix_len) = refinement_counterexample_prefix_len {
invariant_violations.push(format!(
"refinement_firewall:minimal_counterexample_prefix_len={prefix_len}"
));
}
invariant_violations.sort();
invariant_violations.dedup();
LabRunReport {
seed,
steps_delta,
steps_total: self.steps(),
quiescent,
now_nanos: now.as_nanos(),
trace_len,
trace_fingerprint,
trace_certificate: LabTraceCertificateSummary {
event_hash: certificate.event_hash(),
event_count: certificate.event_count(),
schedule_hash: certificate.schedule_hash(),
},
oracle_report,
invariant_violations,
temporal_invariant_failures,
temporal_counterexample_prefix_len,
refinement_firewall_rule_id,
refinement_firewall_event_index,
refinement_firewall_event_seq,
refinement_counterexample_prefix_len,
refinement_firewall_skipped_due_to_trace_truncation,
}
}
pub fn enable_deadline_monitoring(&mut self, config: MonitorConfig) {
self.enable_deadline_monitoring_with_handler(config, default_warning_handler);
}
pub fn enable_deadline_monitoring_with_handler<F>(&mut self, config: MonitorConfig, f: F)
where
F: Fn(DeadlineWarning) + Send + Sync + 'static,
{
let mut monitor = DeadlineMonitor::new(config);
monitor.on_warning(f);
self.deadline_monitor = Some(monitor);
}
pub fn deadline_monitor_mut(&mut self) -> Option<&mut DeadlineMonitor> {
self.deadline_monitor.as_mut()
}
#[allow(clippy::too_many_lines)]
fn step(&mut self) {
self.steps += 1;
let rng_value = self.rng.next_u64();
if self.steps < 50 {
crate::tracing_compat::trace!(
"lab runtime rng sample: rng_value={}, worker_hint={}",
rng_value,
(rng_value >> 32) as usize % self.config.worker_count.max(1)
);
}
self.replay_recorder.record_rng_value(rng_value);
self.check_futurelocks();
if let Some(timer) = self.state.timer_driver_handle() {
let _ = timer.process_timers();
}
self.poll_io();
self.schedule_async_finalizers();
let worker_count = self.config.worker_count.max(1);
let worker_hint = ((rng_value >> 32) as usize) % worker_count;
let now = self.now();
let (task_id, dispatch_lane) = {
let mut sched = self.scheduler.lock();
if let Some((tid, lane)) = sched.pop_for_worker(worker_hint, rng_value, now) {
(tid, lane)
} else if let Some(tid) = sched.steal_for_worker(worker_hint, rng_value.rotate_left(17))
{
(tid, DispatchLane::Stolen)
} else {
drop(sched);
self.check_deadline_monitor();
return;
}
};
self.certificate.record(task_id, dispatch_lane, self.steps);
self.replay_recorder
.record_task_scheduled(task_id, self.steps);
if self.inject_pre_poll_chaos(task_id) {
return;
}
let priority = self.state.task(task_id).map_or(0, |record| {
record.cx_inner.as_ref().map_or(0, |inner| {
let mut guard = inner.write();
if guard.budget.consume_poll().is_none() {
guard.cancel_requested = true;
guard
.fast_cancel
.store(true, std::sync::atomic::Ordering::Release);
if let Some(existing) = &mut guard.cancel_reason {
existing.strengthen(&crate::types::CancelReason::poll_quota());
} else {
guard.cancel_reason = Some(crate::types::CancelReason::poll_quota());
}
}
guard.budget.priority
})
});
let waker = Waker::from(Arc::new(TaskWaker {
task_id,
priority,
scheduler: self.scheduler.clone(),
}));
let mut cx = Context::from_waker(&waker);
if let Some(record) = self.state.task(task_id) {
if let Some(inner) = record.cx_inner.as_ref() {
let cancel_waker = Waker::from(Arc::new(CancelTaskWaker {
task_id,
priority,
scheduler: self.scheduler.clone(),
}));
{
let mut guard = inner.write();
guard.cancel_waker = Some(cancel_waker);
}
}
}
let current_cx = self
.state
.task(task_id)
.and_then(|record| record.cx.clone());
let _cx_guard = crate::cx::Cx::set_current(current_cx);
let started_running = if let Some(record) = self.state.task_mut(task_id) {
let old_state = record.state.clone();
if record.start_running() {
Some((old_state, record.state.clone()))
} else {
None
}
} else {
None
};
if let Some((from_state, to_state)) = started_running.as_ref() {
if self.config.has_cancellation_oracle() {
self.notify_cancellation_oracle_task_transition(task_id, from_state, to_state);
}
}
if self.steps < 50 {
crate::tracing_compat::trace!(
"lab runtime executing task {:?} at step {}",
task_id,
self.steps
);
}
if self.config.has_cancellation_oracle() {
self.notify_cancellation_oracle_task_poll(task_id);
}
let result = if let Some(stored) = self.state.get_stored_future(task_id) {
stored.poll(&mut cx)
} else {
return;
};
if let Some(record) = self.state.task_mut(task_id) {
record.mark_polled(self.steps);
}
let cancel_ack = self.consume_cancel_ack(task_id);
if cancel_ack && self.config.has_cancellation_oracle() {
self.notify_cancellation_oracle_cancel_ack(task_id);
}
match result {
Poll::Ready(outcome) => {
self.state.remove_stored_future(task_id);
self.scheduler.lock().forget_task(task_id);
let mut oracle_transitions = Vec::new();
if let Some(record) = self.state.task_mut(task_id) {
if !record.state.is_terminal() {
let old_state = record.state.clone();
let record_outcome = match outcome {
crate::types::Outcome::Ok(()) => crate::types::Outcome::Ok(()),
crate::types::Outcome::Err(()) => crate::types::Outcome::Err(
crate::error::Error::new(crate::error::ErrorKind::Internal),
),
crate::types::Outcome::Cancelled(r) => {
crate::types::Outcome::Cancelled(r)
}
crate::types::Outcome::Panicked(p) => {
crate::types::Outcome::Panicked(p)
}
};
let completed_via_cancel =
if matches!(record_outcome, crate::types::Outcome::Ok(())) {
let should_cancel = matches!(
record.state,
TaskState::Cancelling { .. } | TaskState::Finalizing { .. }
) || (cancel_ack
&& matches!(record.state, TaskState::CancelRequested { .. }));
if should_cancel {
if matches!(record.state, TaskState::CancelRequested { .. }) {
let state_before = record.state.clone();
let _ = record.acknowledge_cancel();
oracle_transitions
.push((state_before, record.state.clone()));
}
if matches!(record.state, TaskState::Cancelling { .. }) {
let state_before = record.state.clone();
record.cleanup_done();
oracle_transitions
.push((state_before, record.state.clone()));
}
if matches!(record.state, TaskState::Finalizing { .. }) {
let state_before = record.state.clone();
record.finalize_done();
oracle_transitions
.push((state_before, record.state.clone()));
}
matches!(
record.state,
TaskState::Completed(crate::types::Outcome::Cancelled(_))
)
} else {
false
}
} else {
false
};
if !completed_via_cancel {
record.complete(record_outcome);
oracle_transitions.push((old_state, record.state.clone()));
}
}
}
if self.config.has_cancellation_oracle() {
for (from_state, to_state) in oracle_transitions {
self.notify_cancellation_oracle_task_transition(
task_id,
&from_state,
&to_state,
);
}
}
let final_severity =
self.state
.task(task_id)
.map_or(crate::types::Severity::Ok, |record| match &record.state {
TaskState::Completed(outcome) => outcome.severity(),
_ => crate::types::Severity::Ok,
});
self.replay_recorder
.record_task_completed(task_id, final_severity);
if let Some(monitor) = &mut self.deadline_monitor {
if let Some(record) = self.state.task(task_id) {
let now = self.state.now;
let duration =
Duration::from_nanos(now.duration_since(record.created_at()));
let (task_type, deadline) = record
.cx_inner
.as_ref()
.map(|inner| inner.read())
.map_or_else(
|| ("default".to_string(), None),
|inner| {
(
inner
.task_type
.clone()
.unwrap_or_else(|| "default".to_string()),
inner.budget.deadline,
)
},
);
monitor.record_completion(task_id, &task_type, duration, deadline, now);
}
}
let waiters = self.state.task_completed(task_id);
let mut sched = self.scheduler.lock();
for waiter in waiters {
let prio = self
.state
.task(waiter)
.and_then(|t| t.cx_inner.as_ref())
.map_or(0, |inner| inner.read().budget.priority);
sched.schedule(waiter, prio);
}
}
Poll::Pending => {
self.replay_recorder.record_task_yielded(task_id);
self.inject_post_poll_chaos(task_id, priority);
}
}
self.check_deadline_monitor();
}
fn check_deadline_monitor(&mut self) {
if let Some(monitor) = &mut self.deadline_monitor {
let now = self.state.now;
monitor.check(now, self.state.tasks_iter().map(|(_, record)| record));
}
}
fn poll_io(&mut self) {
let Some(handle) = self.state.io_driver_handle() else {
return;
};
let now = self.state.now;
let (state, recorder, seen) = (
&mut self.state,
&mut self.replay_recorder,
&mut self.seen_io_tokens,
);
if let Err(error) = handle.turn_with(Some(Duration::ZERO), |event, interest| {
let token = event.token.0;
let interest = interest.unwrap_or(event.ready);
if seen.insert(token) {
state.record_trace_event(|seq| {
TraceEvent::io_requested(seq, now, token as u64, interest.bits())
});
}
state.record_trace_event(|seq| {
TraceEvent::io_ready(seq, now, token as u64, event.ready.bits())
});
recorder.record_io_ready(
token as u64,
event.is_readable(),
event.is_writable(),
event.is_error(),
event.is_hangup(),
);
}) {
let _ = &error;
crate::tracing_compat::warn!(
error = ?error,
"lab runtime io_driver poll failed"
);
}
self.sync_reactor_chaos_stats();
}
fn inject_pre_poll_chaos(&mut self, task_id: TaskId) -> bool {
let Some(chaos_config) = self.config.chaos.clone() else {
return false;
};
let Some(chaos_rng) = &mut self.chaos_rng else {
return false;
};
let cancel = chaos_rng.should_inject_cancel(&chaos_config);
let delay = chaos_rng
.should_inject_delay(&chaos_config)
.then(|| chaos_rng.next_delay(&chaos_config));
let budget_exhaust = chaos_rng.should_inject_budget_exhaust(&chaos_config);
let skip_poll = cancel | budget_exhaust;
self.chaos_stats
.record_pre_poll_outcomes(cancel, delay, budget_exhaust);
if cancel {
self.inject_cancel(task_id);
}
if let Some(d) = delay {
self.advance_time(Self::duration_nanos_saturating(d));
}
if budget_exhaust {
self.inject_budget_exhaust(task_id);
}
if skip_poll {
self.reschedule_after_chaos_skip(task_id);
}
skip_poll
}
#[inline]
fn duration_nanos_saturating(duration: Duration) -> u64 {
u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX)
}
fn inject_post_poll_chaos(&mut self, task_id: TaskId, priority: u8) {
let Some(chaos_config) = self.config.chaos.clone() else {
return;
};
let Some(chaos_rng) = &mut self.chaos_rng else {
return;
};
let wakeup_count = if chaos_rng.should_inject_wakeup_storm(&chaos_config) {
Some(chaos_rng.next_wakeup_count(&chaos_config))
} else {
None
};
if let Some(count) = wakeup_count {
self.chaos_stats.record_wakeup_storm(count as u64);
self.inject_spurious_wakes(task_id, priority, count);
} else {
self.chaos_stats.record_no_injection();
}
}
fn sync_reactor_chaos_stats(&mut self) {
let current = self.lab_reactor.chaos_stats();
let previous = &self.seen_reactor_chaos_stats;
let delta = ChaosStats {
cancellations: current.cancellations.saturating_sub(previous.cancellations),
delays: current.delays.saturating_sub(previous.delays),
total_delay: current.total_delay.saturating_sub(previous.total_delay),
io_errors: current.io_errors.saturating_sub(previous.io_errors),
wakeup_storms: current.wakeup_storms.saturating_sub(previous.wakeup_storms),
spurious_wakeups: current
.spurious_wakeups
.saturating_sub(previous.spurious_wakeups),
budget_exhaustions: current
.budget_exhaustions
.saturating_sub(previous.budget_exhaustions),
decision_points: current
.decision_points
.saturating_sub(previous.decision_points),
};
self.chaos_stats.merge(&delta);
self.seen_reactor_chaos_stats = current;
}
fn reschedule_after_chaos_skip(&self, task_id: TaskId) {
let Some(record) = self.state.task(task_id) else {
return;
};
if record.state.is_terminal() {
return;
}
let priority = record
.cx_inner
.as_ref()
.map_or(0, |inner| inner.read().budget.priority);
let mut sched = self.scheduler.lock();
sched.schedule_cancel(task_id, priority);
}
fn schedule_async_finalizers(&mut self) {
let tasks = self.state.drain_ready_async_finalizers();
if tasks.is_empty() {
return;
}
let mut sched = self.scheduler.lock();
for (task_id, priority) in tasks {
sched.schedule(task_id, priority);
}
}
fn consume_cancel_ack(&mut self, task_id: TaskId) -> bool {
let Some(record) = self.state.task_mut(task_id) else {
return false;
};
let Some(inner) = record.cx_inner.as_ref() else {
return false;
};
let mut acknowledged = false;
{
let mut guard = inner.write();
if guard.cancel_acknowledged {
guard.cancel_acknowledged = false;
drop(guard);
acknowledged = true;
}
}
if acknowledged {
let _ = record.acknowledge_cancel();
}
acknowledged
}
fn inject_cancel(&mut self, task_id: TaskId) {
use crate::types::{Budget, CancelReason};
self.replay_recorder.record_cancel_injection(task_id);
let reason = CancelReason::user("chaos-injected");
if self.config.has_cancellation_oracle() {
self.oracles.cancellation_protocol.on_cancel_request(
task_id,
reason.clone(),
self.virtual_time,
);
}
if let Some(record) = self.state.task_mut(task_id) {
if !record.state.is_terminal() {
let old_state = record.state.clone();
record.request_cancel_with_budget(reason, Budget::ZERO);
if self.config.has_cancellation_oracle() {
let new_state = record.state.clone();
self.oracles.cancellation_protocol.on_transition(
task_id,
&old_state,
&new_state,
self.virtual_time,
);
}
}
}
self.state.record_trace_event(|seq| {
TraceEvent::new(
seq,
self.virtual_time,
TraceEventKind::ChaosInjection,
TraceData::Chaos {
kind: "cancel".to_string(),
task: Some(task_id),
detail: "chaos-injected cancellation".to_string(),
},
)
});
}
pub fn notify_cancellation_oracle_task_create(&mut self, task_id: TaskId, region_id: RegionId) {
self.oracles
.cancellation_protocol
.on_task_create(task_id, region_id);
}
pub fn notify_cancellation_oracle_region_create(
&mut self,
region_id: RegionId,
parent: Option<RegionId>,
) {
self.oracles
.cancellation_protocol
.on_region_create(region_id, parent);
}
pub fn notify_cancellation_oracle_task_transition(
&mut self,
task_id: TaskId,
from: &crate::record::task::TaskState,
to: &crate::record::task::TaskState,
) {
self.oracles
.cancellation_protocol
.on_transition(task_id, from, to, self.virtual_time);
}
pub fn notify_cancellation_oracle_cancel_request(
&mut self,
task_id: TaskId,
reason: crate::types::CancelReason,
) {
self.oracles
.cancellation_protocol
.on_cancel_request(task_id, reason, self.virtual_time);
}
pub fn notify_cancellation_oracle_cancel_ack(&mut self, task_id: TaskId) {
self.oracles
.cancellation_protocol
.on_cancel_ack(task_id, self.virtual_time);
}
pub fn notify_cancellation_oracle_task_poll(&mut self, task_id: TaskId) {
self.oracles.cancellation_protocol.on_task_poll(task_id);
}
pub fn notify_cancellation_oracle_mask_enter(&mut self, task_id: TaskId) {
self.oracles
.cancellation_protocol
.on_mask_enter(task_id, self.virtual_time);
}
pub fn notify_cancellation_oracle_mask_exit(&mut self, task_id: TaskId) {
self.oracles
.cancellation_protocol
.on_mask_exit(task_id, self.virtual_time);
}
pub fn notify_cancellation_oracle_region_cancel(
&mut self,
region_id: RegionId,
reason: crate::types::CancelReason,
) {
self.oracles
.cancellation_protocol
.on_region_cancel(region_id, reason, self.virtual_time);
}
pub fn check_cancellation_protocol(
&mut self,
) -> Result<(), crate::lab::oracle::CancellationProtocolViolation> {
if !self.config.has_cancellation_oracle() {
return Ok(());
}
let result = self.oracles.cancellation_protocol.check();
if let Err(ref violation) = result {
if self.config.panic_on_cancellation_violation {
panic!("Cancellation protocol violation detected: {violation}");
} else {
crate::tracing_compat::warn!(
violation = %violation,
"Cancellation protocol violation detected"
);
}
}
result
}
fn inject_budget_exhaust(&mut self, task_id: TaskId) {
self.replay_recorder
.record_budget_exhaust_injection(task_id);
if let Some(record) = self.state.task(task_id) {
if let Some(cx_inner) = &record.cx_inner {
let mut inner = cx_inner.write();
inner.budget.poll_quota = 0;
inner.budget.cost_quota = Some(0);
}
}
self.state.record_trace_event(|seq| {
TraceEvent::new(
seq,
self.virtual_time,
TraceEventKind::ChaosInjection,
TraceData::Chaos {
kind: "budget_exhaust".to_string(),
task: Some(task_id),
detail: "chaos-injected budget exhaustion".to_string(),
},
)
});
}
fn inject_spurious_wakes(&mut self, task_id: TaskId, priority: u8, count: usize) {
self.replay_recorder
.record_wakeup_storm_injection(task_id, u32::try_from(count).unwrap_or(u32::MAX));
let mut sched = self.scheduler.lock();
sched.inject_spurious_wakes(task_id, priority, count);
drop(sched);
self.state.record_trace_event(|seq| {
TraceEvent::new(
seq,
self.virtual_time,
TraceEventKind::ChaosInjection,
TraceData::Chaos {
kind: "wakeup_storm".to_string(),
task: Some(task_id),
detail: format!("chaos-injected {count} spurious wakeups"),
},
)
});
}
pub fn step_for_test(&mut self) {
self.step();
}
#[must_use]
pub fn check_invariants(&mut self) -> Vec<InvariantViolation> {
let mut violations = Vec::new();
if let Err(violation) = self.check_cancellation_protocol() {
violations.push(InvariantViolation::CancellationProtocol {
violation: violation.to_string(),
});
}
let leaks = self.obligation_leaks();
if !leaks.is_empty() {
for leak in &leaks {
let _ = self.state.mark_obligation_leaked(leak.obligation);
}
violations.push(InvariantViolation::ObligationLeak { leaks });
}
violations.extend(self.futurelock_violations());
violations.extend(self.quiescence_violations());
let task_leak_count = self.task_leaks();
if task_leak_count > 0 {
violations.push(InvariantViolation::TaskLeak {
count: task_leak_count,
});
}
violations
}
fn obligation_leaks(&self) -> Vec<ObligationLeak> {
let mut leaks = Vec::new();
for (_, obligation) in self.state.obligations_iter() {
if !obligation.is_pending() {
continue;
}
let holder_terminal = self
.state
.task(obligation.holder)
.is_none_or(|t| t.state.is_terminal());
let region_closed = self
.state
.region(obligation.region)
.is_none_or(|r| r.state().is_terminal());
if holder_terminal || region_closed {
leaks.push(ObligationLeak {
obligation: obligation.id,
kind: obligation.kind,
holder: obligation.holder,
region: obligation.region,
});
}
}
leaks
}
fn task_leaks(&self) -> usize {
self.state
.tasks_iter()
.filter(|(_, t)| !t.state.is_terminal())
.count()
}
fn quiescence_violations(&self) -> Vec<InvariantViolation> {
let mut violations = Vec::new();
for (_, region) in self.state.regions_iter() {
if region.state().is_terminal() {
let live_tasks = region
.task_ids()
.iter()
.any(|&tid| self.state.task(tid).is_some_and(|t| !t.state.is_terminal()));
let live_children = region.child_ids().iter().any(|&rid| {
self.state
.region(rid)
.is_some_and(|r| !r.state().is_terminal())
});
if live_tasks || live_children {
violations.push(InvariantViolation::QuiescenceViolation);
}
}
}
violations
}
fn futurelock_violations(&self) -> Vec<InvariantViolation> {
let threshold = self.config.futurelock_max_idle_steps;
if threshold == 0 {
return Vec::new();
}
let current_step = self.steps;
let mut violations = Vec::new();
for (_, task) in self.state.tasks_iter() {
if task.state.is_terminal() {
continue;
}
let mut held = Vec::new();
for (_, obligation) in self.state.obligations_iter() {
if obligation.is_pending() && obligation.holder == task.id {
held.push(obligation.id);
}
}
if held.is_empty() {
continue;
}
let idle_steps = current_step.saturating_sub(task.last_polled_step);
if idle_steps > threshold {
violations.push(InvariantViolation::Futurelock {
task: task.id,
region: task.owner,
idle_steps,
held,
});
}
}
violations
}
fn check_futurelocks(&self) {
let violations = self.futurelock_violations();
if violations.is_empty() {
return;
}
for v in violations {
let InvariantViolation::Futurelock {
task,
region,
idle_steps,
held,
} = v
else {
continue;
};
let mut held_kinds = Vec::new();
for oid in &held {
for (_, obligation) in self.state.obligations_iter() {
if obligation.id == *oid {
held_kinds.push((obligation.id, obligation.kind));
break;
}
}
}
self.state.record_trace_event(|seq| {
TraceEvent::new(
seq,
self.virtual_time,
TraceEventKind::FuturelockDetected,
TraceData::Futurelock {
task,
region,
idle_steps,
held: held_kinds,
},
)
});
assert!(
!self.config.panic_on_futurelock,
"futurelock detected: {task} in {region} idle={idle_steps} held={held:?}"
);
}
}
}
const DEFAULT_LAB_CANCEL_STREAK_LIMIT: usize = 16;
#[derive(Debug, Clone, Copy)]
struct PendingSpuriousWake {
priority: u8,
remaining: usize,
}
#[derive(Debug)]
pub struct LabScheduler {
workers: Vec<crate::runtime::scheduler::PriorityScheduler>,
scheduled: DetHashSet<TaskId>,
pending_spurious_wakes: DetHashMap<TaskId, PendingSpuriousWake>,
assignments: Vec<Option<usize>>,
next_worker: usize,
cancel_streak: Vec<usize>,
cancel_streak_limit: usize,
}
impl LabScheduler {
fn new(worker_count: usize) -> Self {
let count = if worker_count == 0 { 1 } else { worker_count };
let cancel_streak_limit = DEFAULT_LAB_CANCEL_STREAK_LIMIT.max(1);
Self {
workers: (0..count)
.map(|_| crate::runtime::scheduler::PriorityScheduler::new())
.collect(),
scheduled: DetHashSet::default(),
pending_spurious_wakes: DetHashMap::default(),
assignments: Vec::new(),
next_worker: 0,
cancel_streak: vec![0; count],
cancel_streak_limit,
}
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.scheduled.is_empty()
}
#[must_use]
pub fn cancel_streak_limit(&self) -> usize {
self.cancel_streak_limit
}
#[inline]
fn set_assignment(&mut self, task: TaskId, worker: usize) {
let slot = task.arena_index().index() as usize;
if slot >= self.assignments.len() {
self.assignments.resize(slot + 1, None);
}
self.assignments[slot] = Some(worker);
}
fn assign_worker(&mut self, task: TaskId) -> usize {
let slot = task.arena_index().index() as usize;
if slot < self.assignments.len() {
if let Some(worker) = self.assignments[slot] {
return worker;
}
}
let worker = self.next_worker % self.workers.len();
self.next_worker = self.next_worker.wrapping_add(1);
if slot >= self.assignments.len() {
self.assignments.resize(slot + 1, None);
}
self.assignments[slot] = Some(worker);
worker
}
pub fn schedule(&mut self, task: TaskId, priority: u8) {
if !self.scheduled.insert(task) {
crate::tracing_compat::trace!("LabScheduler already scheduled {task:?}");
return;
}
crate::tracing_compat::trace!("LabScheduler scheduling {task:?}");
let worker = self.assign_worker(task);
self.workers[worker].schedule(task, priority);
}
fn inject_spurious_wakes(&mut self, task: TaskId, priority: u8, count: usize) {
if count == 0 {
return;
}
let mut remaining = count;
if self.scheduled.insert(task) {
let worker = self.assign_worker(task);
self.workers[worker].schedule(task, priority);
remaining = remaining.saturating_sub(1);
}
if remaining == 0 {
return;
}
self.pending_spurious_wakes
.entry(task)
.and_modify(|pending| {
pending.priority = pending.priority.max(priority);
pending.remaining = pending.remaining.saturating_add(remaining);
})
.or_insert(PendingSpuriousWake {
priority,
remaining,
});
}
pub fn schedule_cancel(&mut self, task: TaskId, priority: u8) {
if self.scheduled.insert(task) {
let worker = self.assign_worker(task);
self.workers[worker].schedule_cancel(task, priority);
return;
}
let slot = task.arena_index().index() as usize;
if let Some(&Some(worker)) = self.assignments.get(slot) {
self.workers[worker].move_to_cancel_lane(task, priority);
}
}
pub fn schedule_timed(&mut self, task: TaskId, deadline: Time) {
if !self.scheduled.insert(task) {
return;
}
let worker = self.assign_worker(task);
self.workers[worker].schedule_timed(task, deadline);
}
fn pop_for_worker(
&mut self,
worker: usize,
rng_hint: u64,
now: Time,
) -> Option<(TaskId, DispatchLane)> {
if self.workers.is_empty() {
return None;
}
let worker = worker % self.workers.len();
let cancel_streak = &mut self.cancel_streak[worker];
if *cancel_streak < self.cancel_streak_limit {
if let Some((task, lane)) = self.workers[worker].pop_cancel_with_rng(rng_hint) {
*cancel_streak += 1;
self.scheduled.remove(&task);
self.set_assignment(task, worker);
self.rearm_spurious_wake(task);
return Some((task, lane));
}
}
if let Some(task) = self.workers[worker].pop_timed_only_with_hint(rng_hint, now) {
*cancel_streak = 0;
self.scheduled.remove(&task);
self.set_assignment(task, worker);
self.rearm_spurious_wake(task);
return Some((task, DispatchLane::Timed));
}
if let Some(task) = self.workers[worker].pop_ready_only_with_hint(rng_hint) {
*cancel_streak = 0;
self.scheduled.remove(&task);
self.set_assignment(task, worker);
self.rearm_spurious_wake(task);
return Some((task, DispatchLane::Ready));
}
if let Some((task, lane)) = self.workers[worker].pop_cancel_with_rng(rng_hint) {
*cancel_streak = 1;
self.scheduled.remove(&task);
self.set_assignment(task, worker);
self.rearm_spurious_wake(task);
return Some((task, lane));
}
*cancel_streak = 0;
None
}
fn steal_for_worker(&mut self, thief: usize, rng_hint: u64) -> Option<TaskId> {
let count = self.workers.len();
if count <= 1 {
return None;
}
let thief = thief % count;
let start = (rng_hint as usize) % count;
for offset in 0..count {
let victim = (start + offset) % count;
if victim == thief {
continue;
}
if let Some(task) =
self.workers[victim].pop_ready_only_with_hint(rng_hint.wrapping_add(offset as u64))
{
self.scheduled.remove(&task);
self.set_assignment(task, thief);
self.rearm_spurious_wake(task);
return Some(task);
}
}
None
}
fn forget_task(&mut self, task: TaskId) {
self.scheduled.remove(&task);
self.pending_spurious_wakes.remove(&task);
let slot = task.arena_index().index() as usize;
if slot < self.assignments.len() {
self.assignments[slot] = None;
}
for worker in &mut self.workers {
worker.remove(task);
}
}
fn rearm_spurious_wake(&mut self, task: TaskId) {
let Some(mut pending) = self.pending_spurious_wakes.remove(&task) else {
return;
};
self.schedule(task, pending.priority);
pending.remaining = pending.remaining.saturating_sub(1);
if pending.remaining > 0 {
self.pending_spurious_wakes.insert(task, pending);
}
}
}
struct TaskWaker {
task_id: crate::types::TaskId,
priority: u8,
scheduler: Arc<Mutex<LabScheduler>>,
}
use std::task::Wake;
impl Wake for TaskWaker {
fn wake(self: Arc<Self>) {
self.scheduler.lock().schedule(self.task_id, self.priority);
}
}
struct CancelTaskWaker {
task_id: crate::types::TaskId,
priority: u8,
scheduler: Arc<Mutex<LabScheduler>>,
}
impl Wake for CancelTaskWaker {
fn wake(self: Arc<Self>) {
self.scheduler
.lock()
.schedule_cancel(self.task_id, self.priority);
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum InvariantViolation {
ObligationLeak {
leaks: Vec<ObligationLeak>,
},
TaskLeak {
count: usize,
},
ActorLeak {
count: usize,
},
QuiescenceViolation,
Futurelock {
task: crate::types::TaskId,
region: crate::types::RegionId,
idle_steps: u64,
held: Vec<ObligationId>,
},
CancellationProtocol {
violation: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ObligationLeak {
pub obligation: ObligationId,
pub kind: ObligationKind,
pub holder: crate::types::TaskId,
pub region: crate::types::RegionId,
}
impl std::fmt::Display for ObligationLeak {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{:?} {:?} holder={:?} region={:?}",
self.obligation, self.kind, self.holder, self.region
)
}
}
impl std::fmt::Display for InvariantViolation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ObligationLeak { leaks } => {
write!(f, "{} obligations leaked", leaks.len())
}
Self::TaskLeak { count } => write!(f, "{count} tasks leaked"),
Self::ActorLeak { count } => write!(f, "{count} actors leaked"),
Self::QuiescenceViolation => write!(f, "region closed without quiescence"),
Self::Futurelock {
task,
region,
idle_steps,
held,
} => write!(
f,
"futurelock: {task} in {region} idle={idle_steps} held={held:?}"
),
Self::CancellationProtocol { violation } => {
write!(f, "cancellation protocol violation: {violation}")
}
}
}
}
pub fn test<F, R>(seed: u64, f: F) -> R
where
F: FnOnce(&mut LabRuntime) -> R,
{
let mut runtime = LabRuntime::with_seed(seed);
let result = f(&mut runtime);
let violations = runtime.check_invariants();
assert!(
violations.is_empty(),
"Lab runtime invariant violations: {violations:?}"
);
result
}
#[cfg(test)]
mod tests {
use super::*;
use crate::lab::chaos::ChaosConfig;
use crate::record::TaskRecord;
use crate::record::{ObligationAbortReason, ObligationKind};
use crate::runtime::deadline_monitor::{AdaptiveDeadlineConfig, WarningReason};
use crate::runtime::reactor::{Event, Interest};
use crate::types::{Budget, CxInner, Outcome, TaskId};
use crate::util::ArenaIndex;
use parking_lot::Mutex;
use parking_lot::RwLock;
use std::sync::Arc;
use std::task::Waker;
use std::time::Duration;
#[cfg(unix)]
struct TestFdSource;
#[cfg(unix)]
impl std::os::fd::AsRawFd for TestFdSource {
fn as_raw_fd(&self) -> std::os::fd::RawFd {
0
}
}
#[cfg(unix)]
fn noop_waker() -> Waker {
std::task::Waker::noop().clone()
}
struct FlagWaker(Arc<std::sync::atomic::AtomicBool>);
impl Wake for FlagWaker {
fn wake(self: Arc<Self>) {
self.0.store(true, std::sync::atomic::Ordering::SeqCst);
}
}
struct CountWaker(Arc<std::sync::atomic::AtomicU64>);
impl Wake for CountWaker {
fn wake(self: Arc<Self>) {
self.0.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
}
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
struct TimerAdvanceOutcome {
advance_points: Vec<Time>,
total_wakeups: u64,
final_time: Time,
cancelled_wakeups: u64,
}
fn collect_timer_advances(
deadlines_secs: &[u64],
cancelled_indices: &[usize],
) -> TimerAdvanceOutcome {
let mut runtime = LabRuntime::with_seed(42);
let timer_handle = runtime.state.timer_driver_handle().expect("timer handle");
let live_wakeups = Arc::new(std::sync::atomic::AtomicU64::new(0));
let cancelled_wakeups = Arc::new(std::sync::atomic::AtomicU64::new(0));
let mut handles = Vec::with_capacity(deadlines_secs.len());
for (idx, secs) in deadlines_secs.iter().copied().enumerate() {
let counter = if cancelled_indices.contains(&idx) {
cancelled_wakeups.clone()
} else {
live_wakeups.clone()
};
let waker = Waker::from(Arc::new(CountWaker(counter)));
handles.push(timer_handle.register(Time::from_secs(secs), waker));
}
for &idx in cancelled_indices {
let cancelled = timer_handle.cancel(&handles[idx]);
crate::assert_with_log!(
cancelled,
"cancelled timer handle remains removable before auto-advance",
true,
cancelled
);
}
let mut advance_points = Vec::new();
while runtime.pending_timer_count() > 0 {
let before = runtime.now();
let next_deadline = runtime.next_timer_deadline().expect("pending deadline");
let wakeups = runtime.advance_to_next_timer();
let after = runtime.now();
crate::assert_with_log!(
after >= before,
"virtual time stays monotone while advancing timers",
true,
after >= before
);
crate::assert_with_log!(
after >= next_deadline,
"advance reaches or passes scheduled deadline",
true,
after >= next_deadline
);
crate::assert_with_log!(
wakeups > 0,
"each advance drains at least one live timer",
true,
wakeups > 0
);
advance_points.push(after);
}
TimerAdvanceOutcome {
advance_points,
total_wakeups: live_wakeups.load(std::sync::atomic::Ordering::SeqCst),
final_time: runtime.now(),
cancelled_wakeups: cancelled_wakeups.load(std::sync::atomic::Ordering::SeqCst),
}
}
#[test]
fn empty_runtime_is_quiescent() {
init_test("empty_runtime_is_quiescent");
let runtime = LabRuntime::with_seed(42);
let quiescent = runtime.is_quiescent();
crate::assert_with_log!(quiescent, "quiescent", true, quiescent);
crate::test_complete!("empty_runtime_is_quiescent");
}
#[test]
fn advance_time() {
init_test("advance_time");
let mut runtime = LabRuntime::with_seed(42);
let now = runtime.now();
crate::assert_with_log!(now == Time::ZERO, "now", Time::ZERO, now);
runtime.advance_time(1_000_000);
let now = runtime.now();
crate::assert_with_log!(
now == Time::from_millis(1),
"now",
Time::from_millis(1),
now
);
crate::test_complete!("advance_time");
}
#[test]
fn duration_nanos_saturating_clamps_large_duration() {
init_test("duration_nanos_saturating_clamps_large_duration");
let huge = Duration::from_secs(u64::MAX);
let saturated = LabRuntime::duration_nanos_saturating(huge);
crate::assert_with_log!(
saturated == u64::MAX,
"huge duration saturates",
u64::MAX,
saturated
);
let small = Duration::from_nanos(123);
let exact = LabRuntime::duration_nanos_saturating(small);
crate::assert_with_log!(exact == 123, "small duration exact", 123u64, exact);
crate::test_complete!("duration_nanos_saturating_clamps_large_duration");
}
#[cfg(unix)]
#[test]
fn lab_runtime_records_io_ready_trace() {
init_test("lab_runtime_records_io_ready_trace");
let mut runtime = LabRuntime::with_seed(42);
let handle = runtime.state.io_driver_handle().expect("io driver");
let waker = noop_waker();
let source = TestFdSource;
let registration = handle
.register(&source, Interest::READABLE, waker)
.expect("register source");
let token = registration.token();
runtime
.lab_reactor()
.inject_event(token, Event::readable(token), Duration::from_millis(1));
runtime.advance_time(1_000_000);
runtime.step_for_test();
let mut saw_requested = false;
let mut saw_ready = false;
for event in runtime.state.trace.snapshot() {
if event.kind == TraceEventKind::IoRequested {
saw_requested = true;
}
if event.kind == TraceEventKind::IoReady {
saw_ready = true;
}
}
crate::assert_with_log!(
saw_requested,
"io requested trace recorded",
true,
saw_requested
);
crate::assert_with_log!(saw_ready, "io ready trace recorded", true, saw_ready);
crate::test_complete!("lab_runtime_records_io_ready_trace");
}
#[cfg(unix)]
#[test]
fn lab_runtime_chaos_stats_include_reactor_io_error_injections() {
init_test("lab_runtime_chaos_stats_include_reactor_io_error_injections");
let config = LabConfig::new(7).with_chaos(
ChaosConfig::new(7)
.with_io_error_probability(1.0)
.with_io_error_kinds(vec![std::io::ErrorKind::TimedOut]),
);
let mut runtime = LabRuntime::new(config);
let handle = runtime.state.io_driver_handle().expect("io driver");
let waker = noop_waker();
let source = TestFdSource;
let registration = handle
.register(&source, Interest::READABLE, waker)
.expect("register source");
let token = registration.token();
runtime
.lab_reactor()
.inject_event(token, Event::readable(token), Duration::ZERO);
runtime.step_for_test();
let stats = runtime.chaos_stats();
crate::assert_with_log!(
stats.io_errors == 1,
"io errors aggregated",
1u64,
stats.io_errors
);
crate::assert_with_log!(
stats.decision_points == 1,
"reactor decision points aggregated",
1u64,
stats.decision_points
);
crate::assert_with_log!(
runtime.lab_reactor().last_io_error_kind() == Some(std::io::ErrorKind::TimedOut),
"reactor last error kind surfaced",
Some(std::io::ErrorKind::TimedOut),
runtime.lab_reactor().last_io_error_kind()
);
crate::test_complete!("lab_runtime_chaos_stats_include_reactor_io_error_injections");
}
#[test]
fn pending_task_without_wakeup_storm_still_counts_chaos_decision_point() {
init_test("pending_task_without_wakeup_storm_still_counts_chaos_decision_point");
let config =
LabConfig::new(99).with_chaos(ChaosConfig::new(99).with_wakeup_storm_probability(0.0));
let mut runtime = LabRuntime::new(config);
let region = runtime.state.create_root_region(Budget::INFINITE);
let (task_id, _handle) = runtime
.state
.create_task(region, Budget::INFINITE, async {
std::future::pending::<()>().await;
})
.expect("create task");
runtime.scheduler.lock().schedule(task_id, 0);
runtime.step_for_test();
let stats = runtime.chaos_stats();
crate::assert_with_log!(
stats.decision_points == 2,
"pending-task decision point counted",
2u64,
stats.decision_points
);
crate::assert_with_log!(
stats.wakeup_storms == 0,
"no wakeup storm recorded",
0u64,
stats.wakeup_storms
);
crate::test_complete!(
"pending_task_without_wakeup_storm_still_counts_chaos_decision_point"
);
}
#[test]
fn pre_poll_multi_injection_counts_one_chaos_decision_point() {
init_test("pre_poll_multi_injection_counts_one_chaos_decision_point");
let config = LabConfig::new(123).with_chaos(
ChaosConfig::new(123)
.with_cancel_probability(1.0)
.with_delay_probability(1.0)
.with_delay_range(Duration::ZERO..Duration::from_nanos(2))
.with_budget_exhaust_probability(1.0),
);
let mut runtime = LabRuntime::new(config);
let region = runtime.state.create_root_region(Budget::INFINITE);
let (task_id, _handle) = runtime
.state
.create_task(region, Budget::INFINITE, async {
std::future::pending::<()>().await;
})
.expect("create task");
runtime.scheduler.lock().schedule(task_id, 0);
runtime.step_for_test();
let stats = runtime.chaos_stats();
crate::assert_with_log!(
stats.decision_points == 1,
"multi-injection pre-poll counts once",
1u64,
stats.decision_points
);
crate::assert_with_log!(
stats.cancellations == 1,
"cancel recorded",
1u64,
stats.cancellations
);
crate::assert_with_log!(stats.delays == 1, "delay recorded", 1u64, stats.delays);
crate::assert_with_log!(
stats.budget_exhaustions == 1,
"budget exhaust recorded",
1u64,
stats.budget_exhaustions
);
crate::assert_with_log!(
stats.total_delay == Duration::from_nanos(1),
"positive delay preserved",
Duration::from_nanos(1),
stats.total_delay
);
crate::test_complete!("pre_poll_multi_injection_counts_one_chaos_decision_point");
}
#[test]
fn deterministic_rng() {
init_test("deterministic_rng");
let mut r1 = LabRuntime::with_seed(42);
let mut r2 = LabRuntime::with_seed(42);
let a = r1.rng.next_u64();
let b = r2.rng.next_u64();
crate::assert_with_log!(a == b, "rng", b, a);
crate::test_complete!("deterministic_rng");
}
#[test]
fn lab_scheduler_pop_for_worker_respects_timed_deadlines() {
init_test("lab_scheduler_pop_for_worker_respects_timed_deadlines");
let mut scheduler = LabScheduler::new(1);
let timed = TaskId::from_arena(ArenaIndex::new(1, 0));
let ready = TaskId::from_arena(ArenaIndex::new(2, 0));
scheduler.schedule_timed(timed, Time::from_nanos(100));
scheduler.schedule(ready, 10);
let first = scheduler.pop_for_worker(0, 0, Time::ZERO);
crate::assert_with_log!(
first == Some((ready, DispatchLane::Ready)),
"ready task dispatches before not-due timed task",
Some((ready, DispatchLane::Ready)),
first
);
let second = scheduler.pop_for_worker(0, 1, Time::ZERO);
crate::assert_with_log!(
second.is_none(),
"future timed task stays queued before deadline",
true,
second.is_none()
);
let third = scheduler.pop_for_worker(0, 2, Time::from_nanos(100));
crate::assert_with_log!(
third == Some((timed, DispatchLane::Timed)),
"timed task dispatches at deadline",
Some((timed, DispatchLane::Timed)),
third
);
crate::test_complete!("lab_scheduler_pop_for_worker_respects_timed_deadlines");
}
#[test]
fn lab_scheduler_steal_for_worker_only_steals_ready_tasks() {
init_test("lab_scheduler_steal_for_worker_only_steals_ready_tasks");
let mut scheduler = LabScheduler::new(2);
let cancel = TaskId::from_arena(ArenaIndex::new(10, 0));
let timed = TaskId::from_arena(ArenaIndex::new(11, 0));
let ready = TaskId::from_arena(ArenaIndex::new(12, 0));
scheduler.schedule_cancel(cancel, 100);
scheduler.schedule_timed(timed, Time::ZERO);
scheduler.schedule(ready, 50);
let stolen = scheduler.steal_for_worker(1, 0);
crate::assert_with_log!(
stolen == Some(ready),
"steal path takes only ready lane work",
Some(ready),
stolen
);
crate::assert_with_log!(
scheduler.workers[0].has_cancel_work(),
"victim cancel lane remains intact after steal",
true,
scheduler.workers[0].has_cancel_work()
);
let cancel_dispatch = scheduler.pop_for_worker(0, 0, Time::ZERO);
crate::assert_with_log!(
cancel_dispatch == Some((cancel, DispatchLane::Cancel)),
"cancel lane still dispatches from victim worker",
Some((cancel, DispatchLane::Cancel)),
cancel_dispatch
);
let timed_dispatch = scheduler.pop_for_worker(1, 0, Time::ZERO);
crate::assert_with_log!(
timed_dispatch == Some((timed, DispatchLane::Timed)),
"timed lane remains on owning worker",
Some((timed, DispatchLane::Timed)),
timed_dispatch
);
crate::test_complete!("lab_scheduler_steal_for_worker_only_steals_ready_tasks");
}
#[test]
fn lab_scheduler_spurious_wakes_do_not_collapse_duplicates() {
init_test("lab_scheduler_spurious_wakes_do_not_collapse_duplicates");
let mut scheduler = LabScheduler::new(1);
let task = TaskId::from_arena(ArenaIndex::new(13, 0));
scheduler.inject_spurious_wakes(task, 42, 3);
let first = scheduler.pop_for_worker(0, 0, Time::ZERO);
crate::assert_with_log!(
first == Some((task, DispatchLane::Ready)),
"first spurious wake dispatches",
Some((task, DispatchLane::Ready)),
first
);
let second = scheduler.pop_for_worker(0, 1, Time::ZERO);
crate::assert_with_log!(
second == Some((task, DispatchLane::Ready)),
"second spurious wake remains queued",
Some((task, DispatchLane::Ready)),
second
);
let third = scheduler.pop_for_worker(0, 2, Time::ZERO);
crate::assert_with_log!(
third == Some((task, DispatchLane::Ready)),
"third spurious wake remains queued",
Some((task, DispatchLane::Ready)),
third
);
let fourth = scheduler.pop_for_worker(0, 3, Time::ZERO);
crate::assert_with_log!(
fourth.is_none(),
"storm drains after requested wake count",
true,
fourth.is_none()
);
crate::assert_with_log!(
scheduler.is_empty(),
"scheduler empty after spurious storm drains",
true,
scheduler.is_empty()
);
crate::test_complete!("lab_scheduler_spurious_wakes_do_not_collapse_duplicates");
}
#[test]
fn lab_scheduler_forget_task_clears_pending_spurious_wakes() {
init_test("lab_scheduler_forget_task_clears_pending_spurious_wakes");
let mut scheduler = LabScheduler::new(1);
let task = TaskId::from_arena(ArenaIndex::new(14, 0));
scheduler.inject_spurious_wakes(task, 42, 3);
let first = scheduler.pop_for_worker(0, 0, Time::ZERO);
crate::assert_with_log!(
first == Some((task, DispatchLane::Ready)),
"first spurious wake dispatches before forget",
Some((task, DispatchLane::Ready)),
first
);
scheduler.forget_task(task);
let second = scheduler.pop_for_worker(0, 1, Time::ZERO);
crate::assert_with_log!(
second.is_none(),
"forget_task drains queued spurious wakes",
true,
second.is_none()
);
crate::assert_with_log!(
scheduler.pending_spurious_wakes.is_empty(),
"forget_task clears pending spurious wake budget",
true,
scheduler.pending_spurious_wakes.is_empty()
);
crate::assert_with_log!(
scheduler.is_empty(),
"scheduler empty after forget_task",
true,
scheduler.is_empty()
);
crate::test_complete!("lab_scheduler_forget_task_clears_pending_spurious_wakes");
}
#[test]
fn lab_scheduler_steal_preserves_pending_spurious_wakes() {
init_test("lab_scheduler_steal_preserves_pending_spurious_wakes");
let mut scheduler = LabScheduler::new(2);
let task = TaskId::from_arena(ArenaIndex::new(15, 0));
scheduler.inject_spurious_wakes(task, 42, 3);
let stolen = scheduler.steal_for_worker(1, 0);
crate::assert_with_log!(
stolen == Some(task),
"steal dispatches first storm wake",
Some(task),
stolen
);
let second = scheduler.pop_for_worker(1, 1, Time::ZERO);
crate::assert_with_log!(
second == Some((task, DispatchLane::Ready)),
"steal path re-arms second storm wake on thief worker",
Some((task, DispatchLane::Ready)),
second
);
let third = scheduler.pop_for_worker(1, 2, Time::ZERO);
crate::assert_with_log!(
third == Some((task, DispatchLane::Ready)),
"steal path preserves final pending storm wake",
Some((task, DispatchLane::Ready)),
third
);
let fourth = scheduler.pop_for_worker(1, 3, Time::ZERO);
crate::assert_with_log!(
fourth.is_none(),
"all stolen storm wakes drain after requested count",
true,
fourth.is_none()
);
crate::assert_with_log!(
scheduler.is_empty(),
"scheduler empty after stolen storm drains",
true,
scheduler.is_empty()
);
crate::test_complete!("lab_scheduler_steal_preserves_pending_spurious_wakes");
}
#[test]
fn deterministic_multiworker_schedule() {
init_test("deterministic_multiworker_schedule");
let config = LabConfig::new(7).worker_count(4);
crate::lab::assert_deterministic(config, |runtime| {
let root = runtime.state.create_root_region(Budget::INFINITE);
for _ in 0..4 {
let (task_id, _handle) = runtime
.state
.create_task(root, Budget::INFINITE, async {
crate::runtime::yield_now::yield_now().await;
})
.expect("create task");
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_quiescent();
});
crate::test_complete!("deterministic_multiworker_schedule");
}
#[test]
fn run_until_quiescent_with_report_is_deterministic() {
init_test("run_until_quiescent_with_report_is_deterministic");
let config = LabConfig::new(123).worker_count(4).max_steps(10_000);
let mut r1 = LabRuntime::new(config.clone());
let mut r2 = LabRuntime::new(config);
let setup = |runtime: &mut LabRuntime| {
let root = runtime.state.create_root_region(Budget::INFINITE);
for _ in 0..4 {
let (task_id, _handle) = runtime
.state
.create_task(root, Budget::INFINITE, async {
crate::runtime::yield_now::yield_now().await;
})
.expect("create task");
runtime.scheduler.lock().schedule(task_id, 0);
}
};
setup(&mut r1);
setup(&mut r2);
let rep1 = r1.run_until_quiescent_with_report();
let rep2 = r2.run_until_quiescent_with_report();
crate::assert_with_log!(rep1.quiescent, "quiescent", true, rep1.quiescent);
crate::assert_with_log!(rep2.quiescent, "quiescent", true, rep2.quiescent);
assert_eq!(rep1.trace_fingerprint, rep2.trace_fingerprint);
assert_eq!(rep1.trace_certificate, rep2.trace_certificate);
assert_eq!(rep1.oracle_report.to_json(), rep2.oracle_report.to_json());
assert_eq!(rep1.invariant_violations, rep2.invariant_violations);
crate::assert_with_log!(
rep1.oracle_report.all_passed(),
"oracles passed",
true,
rep1.oracle_report.all_passed()
);
crate::assert_with_log!(
rep2.oracle_report.all_passed(),
"oracles passed",
true,
rep2.oracle_report.all_passed()
);
crate::test_complete!("run_until_quiescent_with_report_is_deterministic");
}
#[test]
fn deadline_monitor_emits_warning() {
init_test("deadline_monitor_emits_warning");
let mut runtime = LabRuntime::with_seed(42);
let warnings: Arc<Mutex<Vec<DeadlineWarning>>> = Arc::new(Mutex::new(Vec::new()));
let warnings_clone = Arc::clone(&warnings);
let config = MonitorConfig {
check_interval: Duration::from_secs(0),
warning_threshold_fraction: 1.0,
checkpoint_timeout: Duration::from_secs(0),
adaptive: AdaptiveDeadlineConfig::default(),
enabled: true,
};
runtime.enable_deadline_monitoring_with_handler(config, move |warning| {
warnings_clone.lock().push(warning);
});
let root = runtime.state.create_root_region(Budget::INFINITE);
let budget = Budget::new().with_deadline(Time::from_millis(10));
let task_idx = runtime.state.insert_task(TaskRecord::new_with_time(
TaskId::from_arena(ArenaIndex::new(0, 0)),
root,
budget,
runtime.state.now,
));
let task_id = TaskId::from_arena(task_idx);
runtime.state.task_mut(task_id).unwrap().id = task_id;
let mut inner = CxInner::new(root, task_id, budget);
inner.checkpoint_state.last_checkpoint = None;
runtime
.state
.task_mut(task_id)
.unwrap()
.set_cx_inner(Arc::new(RwLock::new(inner)));
runtime.step();
let warnings = warnings.lock();
let warning = warnings.first().expect("expected warning");
crate::assert_with_log!(
warning.task_id == task_id,
"task_id",
task_id,
warning.task_id
);
crate::assert_with_log!(
warning.region_id == root,
"region_id",
root,
warning.region_id
);
let ok = matches!(
warning.reason,
WarningReason::ApproachingDeadline | WarningReason::ApproachingDeadlineNoProgress
);
crate::assert_with_log!(ok, "reason", true, ok);
drop(warnings);
crate::test_complete!("deadline_monitor_emits_warning");
}
#[test]
fn deadline_monitor_e2e_stuck_detection() {
init_test("deadline_monitor_e2e_stuck_detection");
let mut runtime = LabRuntime::with_seed(42);
let warnings: Arc<Mutex<Vec<DeadlineWarning>>> = Arc::new(Mutex::new(Vec::new()));
let warnings_clone = Arc::clone(&warnings);
let config = MonitorConfig {
check_interval: Duration::ZERO,
warning_threshold_fraction: 0.0,
checkpoint_timeout: Duration::ZERO,
adaptive: AdaptiveDeadlineConfig::default(),
enabled: true,
};
runtime.enable_deadline_monitoring_with_handler(config, move |warning| {
warnings_clone.lock().push(warning);
});
let root = runtime.state.create_root_region(Budget::INFINITE);
let budget = Budget::new().with_deadline(Time::from_secs(10));
let (task_id, _handle) = runtime
.state
.create_task(root, budget, async {})
.expect("create task");
{
let task = runtime.state.task_mut(task_id).unwrap();
let cx = task.cx.as_ref().expect("task cx");
cx.checkpoint_with("starting work").expect("checkpoint");
}
runtime.step();
let warnings = warnings.lock();
let warning = warnings.first().expect("expected warning");
crate::assert_with_log!(
warning.task_id == task_id,
"task_id",
task_id,
warning.task_id
);
crate::assert_with_log!(
warning.reason == WarningReason::NoProgress,
"reason",
WarningReason::NoProgress,
warning.reason
);
crate::assert_with_log!(
warning.last_checkpoint_message.as_deref() == Some("starting work"),
"checkpoint message",
Some("starting work"),
warning.last_checkpoint_message.as_deref()
);
drop(warnings);
crate::test_complete!("deadline_monitor_e2e_stuck_detection");
}
#[test]
fn futurelock_emits_trace_event() {
init_test("futurelock_emits_trace_event");
let config = LabConfig::new(42)
.futurelock_max_idle_steps(3)
.panic_on_futurelock(false);
let mut runtime = LabRuntime::new(config);
let root = runtime.state.create_root_region(Budget::INFINITE);
let task_idx = runtime.state.insert_task(TaskRecord::new(
TaskId::from_arena(ArenaIndex::new(0, 0)),
root,
Budget::INFINITE,
));
let task_id = TaskId::from_arena(task_idx);
runtime.state.task_mut(task_id).unwrap().id = task_id;
let obl_id = runtime
.state
.create_obligation(ObligationKind::SendPermit, task_id, root, None)
.expect("create obligation");
for _ in 0..4 {
runtime.step();
}
let futurelock = runtime
.trace()
.snapshot()
.into_iter()
.find(|e| e.kind == TraceEventKind::FuturelockDetected)
.expect("expected futurelock trace event");
match &futurelock.data {
TraceData::Futurelock {
task,
region,
idle_steps,
held,
} => {
crate::assert_with_log!(*task == task_id, "task", task_id, *task);
crate::assert_with_log!(*region == root, "region", root, *region);
let idle_ok = *idle_steps > 3;
crate::assert_with_log!(idle_ok, "idle_steps > 3", true, idle_ok);
let ok = held.as_slice() == [(obl_id, ObligationKind::SendPermit)];
crate::assert_with_log!(
ok,
"held",
&[(obl_id, ObligationKind::SendPermit)],
held.as_slice()
);
}
other => panic!("unexpected trace data: {other:?}"),
}
crate::test_complete!("futurelock_emits_trace_event");
}
#[test]
#[should_panic(expected = "futurelock detected")]
fn futurelock_can_panic() {
init_test("futurelock_can_panic");
let config = LabConfig::new(42).futurelock_max_idle_steps(1);
let mut runtime = LabRuntime::new(config);
let root = runtime.state.create_root_region(Budget::INFINITE);
let task_idx = runtime.state.insert_task(TaskRecord::new(
TaskId::from_arena(ArenaIndex::new(0, 0)),
root,
Budget::INFINITE,
));
let task_id = TaskId::from_arena(task_idx);
runtime.state.task_mut(task_id).unwrap().id = task_id;
let _ = runtime
.state
.create_obligation(ObligationKind::SendPermit, task_id, root, None)
.expect("create obligation");
for _ in 0..3 {
runtime.step();
}
}
#[test]
fn polled_task_not_flagged_as_futurelocked() {
init_test("polled_task_not_flagged_as_futurelocked");
let config = LabConfig::new(42)
.futurelock_max_idle_steps(5)
.panic_on_futurelock(false);
let mut runtime = LabRuntime::new(config);
let root = runtime.state.create_root_region(Budget::INFINITE);
let (task_id, _handle) = runtime
.state
.create_task(root, Budget::INFINITE, async {
loop {
crate::runtime::yield_now::yield_now().await;
}
})
.expect("create task");
let _obl = runtime
.state
.create_obligation(ObligationKind::SendPermit, task_id, root, None)
.expect("create obligation");
runtime.scheduler.lock().schedule(task_id, 0);
for _ in 0..20 {
runtime.step();
}
let violations = runtime.futurelock_violations();
crate::assert_with_log!(
violations.is_empty(),
"no futurelock for actively polled task",
true,
violations.is_empty()
);
crate::test_complete!("polled_task_not_flagged_as_futurelocked");
}
#[test]
fn immediate_completion_marks_running_before_completion() {
init_test("immediate_completion_marks_running_before_completion");
let mut runtime = LabRuntime::new(LabConfig::new(42));
let root = runtime.state.create_root_region(Budget::INFINITE);
let (task_id, _) = runtime
.state
.create_task(root, Budget::INFINITE, async {})
.expect("create task");
runtime.scheduler.lock().schedule(task_id, 0);
runtime.run_until_quiescent();
let protocol_ok = runtime.check_cancellation_protocol().is_ok();
crate::assert_with_log!(
runtime.is_quiescent(),
"runtime reached quiescence after immediate completion",
true,
runtime.is_quiescent()
);
crate::assert_with_log!(
protocol_ok,
"cancellation oracle accepted Created -> Running -> Completed",
true,
protocol_ok
);
crate::test_complete!("immediate_completion_marks_running_before_completion");
}
#[test]
fn obligation_leak_detected_when_holder_completed() {
init_test("obligation_leak_detected_when_holder_completed");
let mut runtime = LabRuntime::with_seed(7);
let root = runtime.state.create_root_region(Budget::INFINITE);
let task_idx = runtime.state.insert_task(TaskRecord::new(
TaskId::from_arena(ArenaIndex::new(0, 0)),
root,
Budget::INFINITE,
));
let task_id = TaskId::from_arena(task_idx);
runtime.state.task_mut(task_id).unwrap().id = task_id;
let obl_id = runtime
.state
.create_obligation(ObligationKind::SendPermit, task_id, root, None)
.expect("create obligation");
runtime
.state
.task_mut(task_id)
.unwrap()
.complete(Outcome::Ok(()));
let violations = runtime.check_invariants();
let mut found = false;
for violation in violations {
if let InvariantViolation::ObligationLeak { leaks } = violation {
found = true;
let len = leaks.len();
crate::assert_with_log!(len == 1, "leaks len", 1, len);
let leak = &leaks[0];
crate::assert_with_log!(
leak.obligation == obl_id,
"obligation",
obl_id,
leak.obligation
);
crate::assert_with_log!(
leak.kind == ObligationKind::SendPermit,
"kind",
ObligationKind::SendPermit,
leak.kind
);
crate::assert_with_log!(leak.holder == task_id, "holder", task_id, leak.holder);
crate::assert_with_log!(leak.region == root, "region", root, leak.region);
}
}
crate::assert_with_log!(found, "found leak", true, found);
crate::test_complete!("obligation_leak_detected_when_holder_completed");
}
#[test]
fn obligation_leak_ignored_when_resolved() {
init_test("obligation_leak_ignored_when_resolved");
let mut runtime = LabRuntime::with_seed(11);
let root = runtime.state.create_root_region(Budget::INFINITE);
let task_idx = runtime.state.insert_task(TaskRecord::new(
TaskId::from_arena(ArenaIndex::new(0, 0)),
root,
Budget::INFINITE,
));
let task_id = TaskId::from_arena(task_idx);
runtime.state.task_mut(task_id).unwrap().id = task_id;
let obl_id = runtime
.state
.create_obligation(ObligationKind::Ack, task_id, root, None)
.expect("create obligation");
runtime
.state
.commit_obligation(obl_id)
.expect("commit obligation");
runtime
.state
.task_mut(task_id)
.unwrap()
.complete(Outcome::Ok(()));
let violations = runtime.check_invariants();
let has_leak = violations
.iter()
.any(|v| matches!(v, InvariantViolation::ObligationLeak { .. }));
crate::assert_with_log!(!has_leak, "no leak", false, has_leak);
crate::test_complete!("obligation_leak_ignored_when_resolved");
}
#[test]
fn report_hydrates_temporal_oracles_from_state_snapshot() {
init_test("report_hydrates_temporal_oracles_from_state_snapshot");
let mut runtime = LabRuntime::with_seed(31);
let root = runtime.state.create_root_region(Budget::INFINITE);
let (_task, _handle) = runtime
.state
.create_task(root, Budget::INFINITE, async {})
.expect("create task");
runtime
.state
.region(root)
.expect("region exists")
.set_state(crate::record::region::RegionState::Closed);
let report = runtime.report();
let task_leak = report
.oracle_report
.entry("task_leak")
.expect("task_leak entry");
let quiescence = report
.oracle_report
.entry("quiescence")
.expect("quiescence entry");
crate::assert_with_log!(
!task_leak.passed,
"task_leak failed",
false,
task_leak.passed
);
crate::assert_with_log!(
!quiescence.passed,
"quiescence failed",
false,
quiescence.passed
);
let has_temporal_tag = report
.invariant_violations
.iter()
.any(|v| v == "temporal:task_leak");
crate::assert_with_log!(
has_temporal_tag,
"temporal marker present",
true,
has_temporal_tag
);
let temporal_failed = report
.temporal_invariant_failures
.iter()
.any(|v| v == "task_leak");
crate::assert_with_log!(
temporal_failed,
"temporal failure surfaced",
true,
temporal_failed
);
crate::test_complete!("report_hydrates_temporal_oracles_from_state_snapshot");
}
#[test]
fn report_hydrates_cancellation_propagation_from_state_snapshot() {
init_test("report_hydrates_cancellation_propagation_from_state_snapshot");
let mut runtime = LabRuntime::with_seed(32);
let root = runtime.state.create_root_region(Budget::INFINITE);
let _child = runtime
.state
.create_child_region(root, Budget::INFINITE)
.expect("create child");
runtime
.state
.region(root)
.expect("root exists")
.cancel_request(crate::types::CancelReason::shutdown());
let report = runtime.report();
let cancellation = report
.oracle_report
.entry("cancellation_protocol")
.expect("cancellation_protocol entry");
crate::assert_with_log!(
!cancellation.passed,
"cancellation_protocol failed",
false,
cancellation.passed
);
let has_temporal_tag = report
.invariant_violations
.iter()
.any(|v| v == "temporal:cancellation_protocol");
crate::assert_with_log!(
has_temporal_tag,
"temporal cancellation marker present",
true,
has_temporal_tag
);
crate::test_complete!("report_hydrates_cancellation_propagation_from_state_snapshot");
}
#[test]
fn report_surfaces_refinement_firewall_violation_from_trace_snapshot() {
init_test("report_surfaces_refinement_firewall_violation_from_trace_snapshot");
let mut runtime = LabRuntime::with_seed(33);
let region = RegionId::new_for_test(41, 0);
let task = TaskId::new_for_test(7, 0);
runtime
.state
.trace
.push_event(TraceEvent::spawn(1, Time::ZERO, task, region));
runtime
.state
.trace
.push_event(TraceEvent::spawn(2, Time::ZERO, task, region));
let report = runtime.report();
crate::assert_with_log!(
report.refinement_firewall_rule_id.as_deref() == Some("RFW-SPAWN-001"),
"refinement rule id surfaced",
Some("RFW-SPAWN-001"),
report.refinement_firewall_rule_id.as_deref()
);
crate::assert_with_log!(
report.refinement_firewall_event_index == Some(1),
"refinement event index surfaced",
Some(1usize),
report.refinement_firewall_event_index
);
crate::assert_with_log!(
report.refinement_counterexample_prefix_len == Some(2),
"refinement prefix len surfaced",
Some(2usize),
report.refinement_counterexample_prefix_len
);
let has_marker = report
.invariant_violations
.iter()
.any(|v| v == "refinement_firewall:RFW-SPAWN-001");
crate::assert_with_log!(
has_marker,
"refinement invariant marker present",
true,
has_marker
);
let json = report.to_json();
crate::assert_with_log!(
json["refinement_firewall"]["rule_id"] == "RFW-SPAWN-001",
"refinement json rule id",
"RFW-SPAWN-001",
json["refinement_firewall"]["rule_id"]
);
crate::assert_with_log!(
json["refinement_firewall"]["counterexample_prefix_len"] == 2,
"refinement json prefix len",
2,
json["refinement_firewall"]["counterexample_prefix_len"]
);
crate::assert_with_log!(
json["refinement_firewall"]["skipped_due_to_trace_truncation"] == false,
"refinement json not skipped",
false,
json["refinement_firewall"]["skipped_due_to_trace_truncation"]
);
crate::test_complete!("report_surfaces_refinement_firewall_violation_from_trace_snapshot");
}
#[test]
fn report_skips_refinement_firewall_when_trace_buffer_is_truncated() {
init_test("report_skips_refinement_firewall_when_trace_buffer_is_truncated");
let config = LabConfig::new(35).trace_capacity(1);
let mut runtime = LabRuntime::new(config);
let region = RegionId::new_for_test(43, 0);
let task = TaskId::new_for_test(9, 0);
runtime
.state
.trace
.push_event(TraceEvent::spawn(1, Time::ZERO, task, region));
runtime
.state
.trace
.push_event(TraceEvent::complete(2, Time::ZERO, task, region));
let report = runtime.report();
crate::assert_with_log!(
report.refinement_firewall_skipped_due_to_trace_truncation,
"refinement skipped on truncated trace",
true,
report.refinement_firewall_skipped_due_to_trace_truncation
);
crate::assert_with_log!(
report.refinement_firewall_rule_id.is_none(),
"no refinement violation when skipped",
true,
report.refinement_firewall_rule_id.is_none()
);
let has_refinement_marker = report
.invariant_violations
.iter()
.any(|v| v.starts_with("refinement_firewall:"));
crate::assert_with_log!(
!has_refinement_marker,
"no refinement markers when skipped",
false,
has_refinement_marker
);
let json = report.to_json();
crate::assert_with_log!(
json["refinement_firewall"]["skipped_due_to_trace_truncation"] == true,
"refinement skipped flag serialized",
true,
json["refinement_firewall"]["skipped_due_to_trace_truncation"]
);
crate::test_complete!("report_skips_refinement_firewall_when_trace_buffer_is_truncated");
}
#[test]
fn crashpack_includes_refinement_firewall_markers() {
init_test("crashpack_includes_refinement_firewall_markers");
let mut runtime = LabRuntime::with_seed(34);
let region = RegionId::new_for_test(42, 0);
let task = TaskId::new_for_test(8, 0);
runtime
.state
.trace
.push_event(TraceEvent::spawn(1, Time::ZERO, task, region));
runtime
.state
.trace
.push_event(TraceEvent::spawn(2, Time::ZERO, task, region));
let run = runtime.report();
let crashpack = runtime
.build_crashpack_for_report(&run)
.expect("refinement-firewall failure should build crashpack");
let has_rule_marker = crashpack
.oracle_violations
.iter()
.any(|entry| entry == "refinement_firewall:RFW-SPAWN-001");
crate::assert_with_log!(
has_rule_marker,
"crashpack includes refinement rule marker",
true,
has_rule_marker
);
let has_prefix_marker = crashpack
.oracle_violations
.iter()
.any(|entry| entry == "refinement_firewall:minimal_counterexample_prefix_len=2");
crate::assert_with_log!(
has_prefix_marker,
"crashpack includes refinement prefix marker",
true,
has_prefix_marker
);
crate::test_complete!("crashpack_includes_refinement_firewall_markers");
}
#[test]
#[allow(clippy::too_many_lines)]
fn obligation_trace_events_emitted() {
init_test("obligation_trace_events_emitted");
let mut runtime = LabRuntime::with_seed(21);
let root = runtime.state.create_root_region(Budget::INFINITE);
let task_idx = runtime.state.insert_task(TaskRecord::new(
TaskId::from_arena(ArenaIndex::new(0, 0)),
root,
Budget::INFINITE,
));
let task_id = TaskId::from_arena(task_idx);
runtime.state.task_mut(task_id).unwrap().id = task_id;
runtime.advance_time_to(Time::from_nanos(10));
let ob1 = runtime
.state
.create_obligation(ObligationKind::SendPermit, task_id, root, None)
.unwrap();
runtime.advance_time_to(Time::from_nanos(25));
runtime.state.commit_obligation(ob1).unwrap();
runtime.advance_time_to(Time::from_nanos(30));
let ob2 = runtime
.state
.create_obligation(ObligationKind::Ack, task_id, root, None)
.unwrap();
runtime.advance_time_to(Time::from_nanos(50));
runtime
.state
.abort_obligation(ob2, ObligationAbortReason::Cancel)
.unwrap();
let commit_event = runtime
.trace()
.snapshot()
.into_iter()
.find(|e| e.kind == TraceEventKind::ObligationCommit)
.expect("commit event");
match &commit_event.data {
TraceData::Obligation {
obligation,
task,
region,
kind,
state,
duration_ns,
abort_reason,
} => {
crate::assert_with_log!(*obligation == ob1, "obligation", ob1, *obligation);
crate::assert_with_log!(*task == task_id, "task", task_id, *task);
crate::assert_with_log!(*region == root, "region", root, *region);
crate::assert_with_log!(
*kind == ObligationKind::SendPermit,
"kind",
ObligationKind::SendPermit,
*kind
);
crate::assert_with_log!(
*state == crate::record::ObligationState::Committed,
"state",
crate::record::ObligationState::Committed,
*state
);
crate::assert_with_log!(
duration_ns == &Some(15),
"duration",
&Some(15),
duration_ns
);
crate::assert_with_log!(
abort_reason.is_none(),
"abort_reason",
&None::<crate::record::ObligationAbortReason>,
abort_reason
);
}
other => panic!("unexpected commit data: {other:?}"),
}
let abort_event = runtime
.trace()
.snapshot()
.into_iter()
.find(|e| e.kind == TraceEventKind::ObligationAbort)
.expect("abort event");
match &abort_event.data {
TraceData::Obligation {
obligation,
task,
region,
kind,
state,
duration_ns,
abort_reason,
} => {
crate::assert_with_log!(*obligation == ob2, "obligation", ob2, *obligation);
crate::assert_with_log!(*task == task_id, "task", task_id, *task);
crate::assert_with_log!(*region == root, "region", root, *region);
crate::assert_with_log!(
*kind == ObligationKind::Ack,
"kind",
ObligationKind::Ack,
*kind
);
crate::assert_with_log!(
*state == crate::record::ObligationState::Aborted,
"state",
crate::record::ObligationState::Aborted,
*state
);
crate::assert_with_log!(
duration_ns == &Some(20),
"duration",
&Some(20),
duration_ns
);
crate::assert_with_log!(
abort_reason == &Some(ObligationAbortReason::Cancel),
"abort_reason",
&Some(ObligationAbortReason::Cancel),
abort_reason
);
}
other => panic!("unexpected abort data: {other:?}"),
}
crate::test_complete!("obligation_trace_events_emitted");
}
#[test]
fn obligation_leak_emits_trace_event() {
init_test("obligation_leak_emits_trace_event");
let mut runtime = LabRuntime::with_seed(22);
let root = runtime.state.create_root_region(Budget::INFINITE);
let task_idx = runtime.state.insert_task(TaskRecord::new(
TaskId::from_arena(ArenaIndex::new(0, 0)),
root,
Budget::INFINITE,
));
let task_id = TaskId::from_arena(task_idx);
runtime.state.task_mut(task_id).unwrap().id = task_id;
runtime.advance_time_to(Time::from_nanos(100));
let obligation = runtime
.state
.create_obligation(ObligationKind::Lease, task_id, root, None)
.unwrap();
runtime.advance_time_to(Time::from_nanos(140));
runtime
.state
.task_mut(task_id)
.unwrap()
.complete(Outcome::Ok(()));
let violations = runtime.check_invariants();
let has_leak = violations
.iter()
.any(|v| matches!(v, InvariantViolation::ObligationLeak { .. }));
crate::assert_with_log!(has_leak, "has leak", true, has_leak);
let leak_event = runtime
.trace()
.snapshot()
.into_iter()
.find(|e| e.kind == TraceEventKind::ObligationLeak)
.expect("leak event");
match &leak_event.data {
TraceData::Obligation {
obligation: leaked,
task,
region,
kind,
state,
duration_ns,
abort_reason,
} => {
crate::assert_with_log!(*leaked == obligation, "obligation", obligation, *leaked);
crate::assert_with_log!(*task == task_id, "task", task_id, *task);
crate::assert_with_log!(*region == root, "region", root, *region);
crate::assert_with_log!(
*kind == ObligationKind::Lease,
"kind",
ObligationKind::Lease,
*kind
);
crate::assert_with_log!(
*state == crate::record::ObligationState::Leaked,
"state",
crate::record::ObligationState::Leaked,
*state
);
crate::assert_with_log!(
duration_ns == &Some(40),
"duration",
&Some(40),
duration_ns
);
crate::assert_with_log!(
abort_reason.is_none(),
"abort_reason",
&None::<crate::record::ObligationAbortReason>,
abort_reason
);
}
other => panic!("unexpected leak data: {other:?}"),
}
crate::test_complete!("obligation_leak_emits_trace_event");
}
#[test]
fn contract_json_has_required_top_level_keys() {
crate::test_utils::init_test_logging();
crate::test_phase!("contract_json_has_required_top_level_keys");
let app = crate::app::AppSpec::new("contract_test");
let harness = crate::lab::SporkAppHarness::with_seed(42, app).unwrap();
let report = harness.run_to_report().unwrap();
let json = report.to_json();
let required_keys = [
"schema_version",
"verdict",
"app",
"lab",
"fingerprints",
"run",
"crashpack",
"attachments",
];
for key in &required_keys {
assert!(
json.get(key).is_some(),
"missing required top-level key: {key}"
);
}
assert!(json["app"]["name"].is_string(), "app.name must be a string");
assert!(
json["lab"]["config"].is_object(),
"lab.config must be an object"
);
assert!(
json["lab"]["config_hash"].is_u64(),
"lab.config_hash must be a u64"
);
assert!(
json["fingerprints"]["trace"].is_u64(),
"fingerprints.trace must be a u64"
);
assert!(
json["fingerprints"]["event_hash"].is_u64(),
"fingerprints.event_hash must be a u64"
);
assert!(
json["fingerprints"]["event_count"].is_u64(),
"fingerprints.event_count must be a u64"
);
assert!(
json["fingerprints"]["schedule_hash"].is_u64(),
"fingerprints.schedule_hash must be a u64"
);
assert!(json["run"]["seed"].is_u64(), "run.seed must be a u64");
assert!(
json["run"]["oracles"].is_object(),
"run.oracles must be an object"
);
assert!(
json["run"]["invariants"].is_array(),
"run.invariants must be an array"
);
assert!(
json["run"]["refinement_firewall"].is_object(),
"run.refinement_firewall must be an object"
);
assert!(
json["run"]["refinement_firewall"]["skipped_due_to_trace_truncation"].is_boolean(),
"run.refinement_firewall.skipped_due_to_trace_truncation must be a boolean"
);
assert!(
json["attachments"].is_array(),
"attachments must be an array"
);
assert!(
json["crashpack"].is_null(),
"passing runs should have null crashpack linkage"
);
crate::test_complete!("contract_json_has_required_top_level_keys");
}
#[test]
fn contract_schema_version_is_current() {
crate::test_utils::init_test_logging();
crate::test_phase!("contract_schema_version_is_current");
let app = crate::app::AppSpec::new("version_test");
let harness = crate::lab::SporkAppHarness::with_seed(1, app).unwrap();
let report = harness.run_to_report().unwrap();
assert_eq!(report.schema_version, SporkHarnessReport::SCHEMA_VERSION);
assert_eq!(
report.to_json()["schema_version"],
SporkHarnessReport::SCHEMA_VERSION
);
crate::test_complete!("contract_schema_version_is_current");
}
#[test]
fn contract_config_hash_deterministic() {
crate::test_utils::init_test_logging();
crate::test_phase!("contract_config_hash_deterministic");
let config = LabConfig::new(42);
let summary_a = LabConfigSummary::from_config(&config);
let summary_b = LabConfigSummary::from_config(&config);
assert_eq!(summary_a.config_hash(), summary_b.config_hash());
let config_2 = LabConfig::new(99);
let summary_c = LabConfigSummary::from_config(&config_2);
assert_ne!(summary_a.config_hash(), summary_c.config_hash());
crate::test_complete!("contract_config_hash_deterministic");
}
#[test]
fn contract_verdict_reflects_oracle_state() {
crate::test_utils::init_test_logging();
crate::test_phase!("contract_verdict_reflects_oracle_state");
let app = crate::app::AppSpec::new("verdict_test");
let harness = crate::lab::SporkAppHarness::with_seed(42, app).unwrap();
let report = harness.run_to_report().unwrap();
assert!(report.passed());
assert_eq!(report.to_json()["verdict"], "pass");
assert!(report.summary_line().starts_with("[PASS]"));
crate::test_complete!("contract_verdict_reflects_oracle_state");
}
#[test]
fn contract_convenience_methods_consistent() {
crate::test_utils::init_test_logging();
crate::test_phase!("contract_convenience_methods_consistent");
let app = crate::app::AppSpec::new("ux_test");
let harness = crate::lab::SporkAppHarness::with_seed(42, app).unwrap();
let report = harness.run_to_report().unwrap();
let json = report.to_json();
assert_eq!(
report.trace_fingerprint(),
json["fingerprints"]["trace"].as_u64().unwrap()
);
assert_eq!(report.seed(), json["run"]["seed"].as_u64().unwrap());
assert_eq!(
report.config_hash(),
json["lab"]["config_hash"].as_u64().unwrap()
);
assert!(report.crashpack_path().is_none());
assert!(report.oracle_failures().is_empty());
crate::test_complete!("contract_convenience_methods_consistent");
}
#[test]
fn contract_auto_crashpack_on_failure() {
crate::test_utils::init_test_logging();
crate::test_phase!("contract_auto_crashpack_on_failure");
let config = LabConfig::new(17).panic_on_leak(false);
let mut runtime = LabRuntime::new(config);
let region = runtime.state.create_root_region(Budget::INFINITE);
let (task, _) = runtime
.state
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
runtime.scheduler.lock().schedule(task, 0);
runtime
.state
.create_obligation(
ObligationKind::SendPermit,
task,
region,
Some("intentional leak".to_string()),
)
.expect("create obligation");
runtime.run_until_quiescent();
let report = runtime.spork_report("failing_app", Vec::new());
assert!(!report.passed(), "failing run must not report PASS");
let crashpack_path = report
.crashpack_path()
.expect("failing run should include crashpack attachment");
assert!(
crashpack_path.starts_with("crashpack-"),
"unexpected crashpack path: {crashpack_path}"
);
assert!(
report
.attachments
.iter()
.any(|attachment| attachment.kind == HarnessAttachmentKind::CrashPack),
"crashpack attachment kind must be present"
);
let crashpack_link = report
.crashpack_link()
.expect("failing run should expose crashpack link metadata");
assert_eq!(crashpack_link.path, crashpack_path);
assert_eq!(crashpack_link.fingerprint, report.trace_fingerprint());
assert!(
crashpack_link.id.starts_with("crashpack-"),
"unexpected crashpack id: {}",
crashpack_link.id
);
assert!(
crashpack_link.replay.command_line.contains(crashpack_path),
"replay command should include crashpack path"
);
crate::test_complete!("contract_auto_crashpack_on_failure");
}
#[test]
fn contract_auto_crashpack_contains_divergent_prefix_when_replay_enabled() {
crate::test_utils::init_test_logging();
crate::test_phase!("contract_auto_crashpack_contains_divergent_prefix_when_replay_enabled");
let config = LabConfig::new(1701)
.panic_on_leak(false)
.with_default_replay_recording();
let mut runtime = LabRuntime::new(config);
let region = runtime.state.create_root_region(Budget::INFINITE);
let (task, _) = runtime
.state
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
runtime.scheduler.lock().schedule(task, 0);
runtime
.state
.create_obligation(
ObligationKind::SendPermit,
task,
region,
Some("intentional leak".to_string()),
)
.expect("create obligation");
runtime.run_until_quiescent();
let run = runtime.report();
let crashpack = runtime
.build_crashpack_for_report(&run)
.expect("failing run should build crashpack");
assert!(crashpack.has_divergent_prefix());
assert!(
crashpack
.manifest
.has_attachment(&crate::trace::crashpack::AttachmentKind::DivergentPrefix),
"manifest must include divergent prefix attachment"
);
assert!(
crashpack.replay.is_some(),
"crashpack should carry replay command metadata"
);
crate::test_complete!(
"contract_auto_crashpack_contains_divergent_prefix_when_replay_enabled"
);
}
#[test]
fn contract_manual_crashpack_not_duplicated_on_failure() {
crate::test_utils::init_test_logging();
crate::test_phase!("contract_manual_crashpack_not_duplicated_on_failure");
let config = LabConfig::new(18).panic_on_leak(false);
let mut runtime = LabRuntime::new(config);
let region = runtime.state.create_root_region(Budget::INFINITE);
let (task, _) = runtime
.state
.create_task(region, Budget::INFINITE, async {})
.expect("create task");
runtime.scheduler.lock().schedule(task, 0);
runtime
.state
.create_obligation(
ObligationKind::SendPermit,
task,
region,
Some("intentional leak".to_string()),
)
.expect("create obligation");
runtime.run_until_quiescent();
let report = runtime.spork_report(
"failing_app_manual",
vec![HarnessAttachmentRef::crashpack("manual-crashpack.json")],
);
let crashpack_count = report
.attachments
.iter()
.filter(|attachment| attachment.kind == HarnessAttachmentKind::CrashPack)
.count();
assert_eq!(
crashpack_count, 1,
"manual crashpack should not be duplicated"
);
assert_eq!(report.crashpack_path(), Some("manual-crashpack.json"));
let crashpack_link = report
.crashpack_link()
.expect("manual crashpack should still produce metadata");
assert_eq!(crashpack_link.path, "manual-crashpack.json");
assert!(
crashpack_link
.replay
.command_line
.contains("manual-crashpack.json"),
"replay command should include manual crashpack path"
);
crate::test_complete!("contract_manual_crashpack_not_duplicated_on_failure");
}
#[test]
fn contract_json_deterministic_same_seed() {
crate::test_utils::init_test_logging();
crate::test_phase!("contract_json_deterministic_same_seed");
let json_a = {
let app = crate::app::AppSpec::new("det_contract");
let harness = crate::lab::SporkAppHarness::with_seed(42, app).unwrap();
harness.run_to_report().unwrap().to_json()
};
let json_b = {
let app = crate::app::AppSpec::new("det_contract");
let harness = crate::lab::SporkAppHarness::with_seed(42, app).unwrap();
harness.run_to_report().unwrap().to_json()
};
assert_eq!(json_a, json_b, "same seed must produce identical JSON");
crate::test_complete!("contract_json_deterministic_same_seed");
}
#[test]
fn contract_attachments_sorted_in_json() {
crate::test_utils::init_test_logging();
crate::test_phase!("contract_attachments_sorted_in_json");
let app = crate::app::AppSpec::new("attach_contract");
let mut harness = crate::lab::SporkAppHarness::with_seed(7, app).unwrap();
harness.attach(HarnessAttachmentRef::trace("z_trace.json"));
harness.attach(HarnessAttachmentRef::crashpack("a_crash.tar"));
let report = harness.run_to_report().unwrap();
assert_eq!(report.crashpack_path(), Some("a_crash.tar"));
let json = report.to_json();
let attachments = json["attachments"].as_array().unwrap();
assert_eq!(attachments.len(), 2);
assert_eq!(attachments[0]["kind"], "crashpack");
assert_eq!(attachments[1]["kind"], "trace");
crate::test_complete!("contract_attachments_sorted_in_json");
}
#[test]
fn advance_to_next_timer_empty() {
init_test("advance_to_next_timer_empty");
let mut runtime = LabRuntime::with_seed(42);
let wakeups = runtime.advance_to_next_timer();
crate::assert_with_log!(wakeups == 0, "no timers → 0 wakeups", 0, wakeups);
let deadline = runtime.next_timer_deadline();
crate::assert_with_log!(
deadline.is_none(),
"no pending deadline",
true,
deadline.is_none()
);
crate::test_complete!("advance_to_next_timer_empty");
}
#[test]
fn advance_to_next_timer_fires_timer() {
init_test("advance_to_next_timer_fires_timer");
let mut runtime = LabRuntime::with_seed(42);
let timer_handle = runtime.state.timer_driver_handle().unwrap();
let woken = Arc::new(std::sync::atomic::AtomicBool::new(false));
let waker = Waker::from(Arc::new(FlagWaker(woken.clone())));
let _ = timer_handle.register(Time::from_secs(1), waker);
let count = runtime.pending_timer_count();
crate::assert_with_log!(count == 1, "1 pending timer", 1, count);
let wakeups = runtime.advance_to_next_timer();
crate::assert_with_log!(wakeups == 1, "1 wakeup", 1, wakeups);
let now = runtime.now();
crate::assert_with_log!(
now == Time::from_secs(1),
"time at 1s",
Time::from_secs(1),
now
);
let was_woken = woken.load(std::sync::atomic::Ordering::SeqCst);
crate::assert_with_log!(was_woken, "waker fired", true, was_woken);
crate::test_complete!("advance_to_next_timer_fires_timer");
}
#[test]
fn metamorphic_timer_registration_permutation_preserves_virtual_time_progression() {
init_test("metamorphic_timer_registration_permutation_preserves_virtual_time_progression");
let baseline = collect_timer_advances(&[5, 1, 3, 1, 8], &[]);
let permuted = collect_timer_advances(&[1, 8, 5, 1, 3], &[]);
crate::assert_with_log!(
baseline.advance_points == permuted.advance_points,
"deadline multiset permutation preserves advance points",
&baseline.advance_points,
&permuted.advance_points
);
crate::assert_with_log!(
baseline.total_wakeups == permuted.total_wakeups,
"deadline multiset permutation preserves wakeup count",
baseline.total_wakeups,
permuted.total_wakeups
);
crate::assert_with_log!(
baseline.final_time == permuted.final_time,
"deadline multiset permutation preserves final virtual time",
baseline.final_time,
permuted.final_time
);
crate::assert_with_log!(
baseline.advance_points
== vec![
Time::from_secs(1),
Time::from_secs(3),
Time::from_secs(5),
Time::from_secs(8)
],
"advance points collapse duplicate deadlines without moving backward",
vec![
Time::from_secs(1),
Time::from_secs(3),
Time::from_secs(5),
Time::from_secs(8)
],
baseline.advance_points.clone()
);
crate::test_complete!(
"metamorphic_timer_registration_permutation_preserves_virtual_time_progression"
);
}
#[test]
fn metamorphic_cancelled_timer_does_not_skew_virtual_time_progression() {
init_test("metamorphic_cancelled_timer_does_not_skew_virtual_time_progression");
let baseline = collect_timer_advances(&[2, 4, 9], &[]);
let with_cancelled_timer = collect_timer_advances(&[2, 4, 6, 9], &[2]);
crate::assert_with_log!(
baseline.advance_points == with_cancelled_timer.advance_points,
"cancelling an intermediate timer preserves surviving advance points",
&baseline.advance_points,
&with_cancelled_timer.advance_points
);
crate::assert_with_log!(
baseline.total_wakeups == with_cancelled_timer.total_wakeups,
"cancelling an intermediate timer preserves surviving wakeup count",
baseline.total_wakeups,
with_cancelled_timer.total_wakeups
);
crate::assert_with_log!(
baseline.final_time == with_cancelled_timer.final_time,
"cancelling an intermediate timer preserves final virtual time",
baseline.final_time,
with_cancelled_timer.final_time
);
crate::assert_with_log!(
with_cancelled_timer.cancelled_wakeups == 0,
"cancelled timer never wakes after auto-advance",
0u64,
with_cancelled_timer.cancelled_wakeups
);
crate::test_complete!("metamorphic_cancelled_timer_does_not_skew_virtual_time_progression");
}
#[cfg(unix)]
#[test]
fn run_with_auto_advance_delivers_delayed_reactor_events() {
init_test("run_with_auto_advance_delivers_delayed_reactor_events");
let config = LabConfig::new(42).with_auto_advance().max_steps(32);
let mut runtime = LabRuntime::new(config);
let handle = runtime.state.io_driver_handle().expect("io driver");
let wake_count = Arc::new(std::sync::atomic::AtomicU64::new(0));
let waker = Waker::from(Arc::new(CountWaker(wake_count.clone())));
let source = TestFdSource;
let registration = handle
.register(&source, Interest::READABLE, waker)
.expect("register source");
let token = registration.token();
runtime
.lab_reactor()
.inject_event(token, Event::readable(token), Duration::from_secs(1));
let report = runtime.run_with_auto_advance();
crate::assert_with_log!(
report.auto_advances >= 1,
"auto-advance reaches delayed reactor deadline",
true,
report.auto_advances >= 1
);
crate::assert_with_log!(
runtime.now() >= Time::from_secs(1),
"virtual time advanced to delayed reactor event",
true,
runtime.now() >= Time::from_secs(1)
);
let wakeups = wake_count.load(std::sync::atomic::Ordering::SeqCst);
crate::assert_with_log!(
wakeups == 1,
"reactor event woke registration",
1u64,
wakeups
);
let saw_ready = runtime
.state
.trace
.snapshot()
.iter()
.any(|event| event.kind == TraceEventKind::IoReady);
crate::assert_with_log!(saw_ready, "io ready trace recorded", true, saw_ready);
let next_event = runtime.lab_reactor().next_event_time();
crate::assert_with_log!(
next_event.is_none(),
"delayed reactor event drained",
true,
next_event.is_none()
);
crate::test_complete!("run_with_auto_advance_delivers_delayed_reactor_events");
}
#[test]
fn run_with_auto_advance_basic() {
init_test("run_with_auto_advance_basic");
let config = LabConfig::new(42).with_auto_advance();
let mut runtime = LabRuntime::new(config);
let report = runtime.run_with_auto_advance();
crate::assert_with_log!(report.steps == 0, "0 steps", 0u64, report.steps);
crate::assert_with_log!(
report.auto_advances == 0,
"0 auto-advances",
0u64,
report.auto_advances
);
crate::test_complete!("run_with_auto_advance_basic");
}
#[test]
fn run_with_auto_advance_jumps_past_timer_deadlines() {
init_test("run_with_auto_advance_jumps_past_timer_deadlines");
let config = LabConfig::new(42).with_auto_advance().max_steps(1_000);
let mut runtime = LabRuntime::new(config);
let timer_handle = runtime.state.timer_driver_handle().unwrap();
let wake_count = Arc::new(std::sync::atomic::AtomicU64::new(0));
for secs in [1, 5, 10] {
let waker = Waker::from(Arc::new(CountWaker(wake_count.clone())));
let _ = timer_handle.register(Time::from_secs(secs), waker);
}
let report = runtime.run_with_auto_advance();
crate::assert_with_log!(
report.auto_advances >= 3,
"at least 3 auto-advances",
true,
report.auto_advances >= 3
);
let now = runtime.now();
crate::assert_with_log!(
now >= Time::from_secs(10),
"time >= 10s",
true,
now >= Time::from_secs(10)
);
let count = wake_count.load(std::sync::atomic::Ordering::SeqCst);
crate::assert_with_log!(count == 3, "3 wakeups", 3u64, count);
crate::test_complete!("run_with_auto_advance_jumps_past_timer_deadlines");
}
#[test]
fn virtual_time_24_hour_instant_test() {
init_test("virtual_time_24_hour_instant_test");
let config = LabConfig::new(42).with_auto_advance().max_steps(100_000);
let mut runtime = LabRuntime::new(config);
let timer_handle = runtime.state.timer_driver_handle().unwrap();
let wake_count = Arc::new(std::sync::atomic::AtomicU64::new(0));
for hour in 1..=24 {
let waker = Waker::from(Arc::new(CountWaker(wake_count.clone())));
let _ = timer_handle.register(Time::from_secs(hour * 3600), waker);
}
let wall_start = std::time::Instant::now();
let report = runtime.run_with_auto_advance();
let wall_elapsed = wall_start.elapsed();
crate::assert_with_log!(
report.virtual_elapsed_secs() >= 86400,
"24h virtual",
true,
report.virtual_elapsed_secs() >= 86400
);
let count = wake_count.load(std::sync::atomic::Ordering::SeqCst);
crate::assert_with_log!(count == 24, "24 wakeups", 24u64, count);
let wall_ms = wall_elapsed.as_millis();
crate::assert_with_log!(wall_ms < 1000, "wall time < 1s", true, wall_ms < 1000);
crate::test_complete!("virtual_time_24_hour_instant_test");
}
#[test]
fn clock_pause_resume() {
init_test("clock_pause_resume");
let runtime = LabRuntime::with_seed(42);
let not_paused = !runtime.is_clock_paused();
crate::assert_with_log!(not_paused, "not paused initially", true, not_paused);
runtime.pause_clock();
let paused = runtime.is_clock_paused();
crate::assert_with_log!(paused, "paused", true, paused);
runtime.resume_clock();
let resumed = !runtime.is_clock_paused();
crate::assert_with_log!(resumed, "resumed", true, resumed);
crate::test_complete!("clock_pause_resume");
}
#[test]
fn inject_clock_skew() {
init_test("inject_clock_skew");
let mut runtime = LabRuntime::with_seed(42);
runtime.advance_time(1_000_000_000); let before = runtime.now();
runtime.inject_clock_skew(5_000_000_000);
let after = runtime.now();
let delta = after.as_nanos() - before.as_nanos();
crate::assert_with_log!(
delta == 5_000_000_000,
"5s skew applied",
5_000_000_000u64,
delta
);
crate::assert_with_log!(
after == Time::from_secs(6),
"time at 6s",
Time::from_secs(6),
after
);
crate::test_complete!("inject_clock_skew");
}
#[test]
fn virtual_time_report_conversions() {
init_test("virtual_time_report_conversions");
let report = VirtualTimeReport {
steps: 100,
auto_advances: 5,
total_wakeups: 10,
time_start: Time::ZERO,
time_end: Time::from_secs(3600),
virtual_elapsed_nanos: 3_600_000_000_000,
termination: AutoAdvanceTermination::Quiescent,
};
let ms = report.virtual_elapsed_ms();
crate::assert_with_log!(ms == 3_600_000, "3600000 ms", 3_600_000u64, ms);
let secs = report.virtual_elapsed_secs();
crate::assert_with_log!(secs == 3600, "3600 secs", 3600u64, secs);
crate::test_complete!("virtual_time_report_conversions");
}
#[test]
fn replay_records_correct_severity_for_cancelled_task() {
init_test("replay_records_correct_severity_for_cancelled_task");
let config = LabConfig::new(42)
.panic_on_leak(false)
.with_default_replay_recording();
let mut runtime = LabRuntime::new(config);
let root = runtime
.state
.create_root_region(crate::types::Budget::INFINITE);
let (task_id, _) = runtime
.state
.create_task(root, crate::types::Budget::INFINITE, async {
crate::runtime::yield_now::yield_now().await;
})
.expect("create task");
runtime.scheduler.lock().schedule(task_id, 0);
runtime.step();
if let Some(record) = runtime.state.task_mut(task_id) {
record.request_cancel(crate::types::CancelReason::user("test-cancel"));
let _ = record.acknowledge_cancel();
assert!(
matches!(
record.state,
crate::record::task::TaskState::Cancelling { .. }
),
"task should be in Cancelling state"
);
}
runtime.scheduler.lock().schedule(task_id, 0);
runtime.run_until_quiescent();
if let Some(record) = runtime.state.task(task_id) {
assert!(
matches!(
record.state,
crate::record::task::TaskState::Completed(crate::types::Outcome::Cancelled(_))
),
"task should be Completed(Cancelled), got {:?}",
record.state
);
}
let replay = runtime
.replay_recorder()
.snapshot()
.expect("replay recording enabled");
let completed_events: Vec<_> = replay
.events
.iter()
.filter(|e| matches!(e, crate::trace::replay::ReplayEvent::TaskCompleted { .. }))
.collect();
assert!(
!completed_events.is_empty(),
"should have at least one TaskCompleted event"
);
for event in &completed_events {
if let crate::trace::replay::ReplayEvent::TaskCompleted { outcome, .. } = event {
let expected = crate::types::Severity::Cancelled.as_u8();
crate::assert_with_log!(
*outcome == expected,
"severity should be Cancelled (2)",
expected,
*outcome
);
}
}
crate::test_complete!("replay_records_correct_severity_for_cancelled_task");
}
#[test]
fn replay_recording_metadata_is_stable_for_same_seed() {
init_test("replay_recording_metadata_is_stable_for_same_seed");
let mut first = LabRuntime::new(LabConfig::new(42).with_default_replay_recording());
let mut second = LabRuntime::new(LabConfig::new(42).with_default_replay_recording());
let first_trace = first
.finish_replay_trace()
.expect("first replay recording enabled");
let second_trace = second
.finish_replay_trace()
.expect("second replay recording enabled");
crate::assert_with_log!(
first_trace.metadata == second_trace.metadata,
"replay metadata should match for identical seeds",
&first_trace.metadata,
&second_trace.metadata
);
crate::assert_with_log!(
first_trace.events == second_trace.events,
"replay events should match for identical seeds",
&first_trace.events,
&second_trace.events
);
crate::assert_with_log!(
first_trace.metadata.recorded_at == 0,
"recorded_at defaults to deterministic zero stamp",
0u64,
first_trace.metadata.recorded_at
);
crate::test_complete!("replay_recording_metadata_is_stable_for_same_seed");
}
#[test]
fn lab_trace_certificate_summary_debug_clone_copy_eq() {
let summary = LabTraceCertificateSummary {
event_hash: 123,
event_count: 456,
schedule_hash: 789,
};
let copied = summary;
let cloned = summary;
assert_eq!(copied, cloned);
assert_ne!(
summary,
LabTraceCertificateSummary {
event_hash: 0,
event_count: 456,
schedule_hash: 789,
}
);
let dbg = format!("{summary:?}");
assert!(dbg.contains("LabTraceCertificateSummary"));
}
#[test]
fn virtual_time_report_debug_clone_copy_eq() {
let report = VirtualTimeReport {
steps: 100,
auto_advances: 5,
total_wakeups: 10,
time_start: Time::ZERO,
time_end: Time::from_millis(500),
virtual_elapsed_nanos: 500_000_000,
termination: AutoAdvanceTermination::Quiescent,
};
let copied = report;
assert_eq!(copied, report);
assert_eq!(report.virtual_elapsed_ms(), 500);
assert_eq!(report.virtual_elapsed_secs(), 0);
let dbg = format!("{report:?}");
assert!(dbg.contains("VirtualTimeReport"));
}
#[test]
fn auto_advance_quiescent_termination() {
init_test("auto_advance_quiescent_termination");
let mut lab = LabRuntime::new(LabConfig::new(42));
let report = lab.run_with_auto_advance();
assert_eq!(
report.termination,
AutoAdvanceTermination::Quiescent,
"empty runtime should terminate as quiescent"
);
crate::test_complete!("auto_advance_quiescent_termination");
}
#[test]
fn auto_advance_step_limit_termination() {
init_test("auto_advance_step_limit_termination");
let mut lab = LabRuntime::new(LabConfig::new(42).max_steps(0));
let report = lab.run_with_auto_advance();
assert_eq!(
report.termination,
AutoAdvanceTermination::StepLimitReached,
"zero max_steps should terminate as step-limit-reached"
);
crate::test_complete!("auto_advance_step_limit_termination");
}
#[test]
fn auto_advance_stuck_bailout_termination() {
init_test("auto_advance_stuck_bailout_termination");
let config = LabConfig::new(42)
.with_auto_advance()
.no_step_limit()
.futurelock_max_idle_steps(0);
let mut lab = LabRuntime::new(config);
let root = lab.state.create_root_region(Budget::INFINITE);
let (task_id, _handle) = lab
.state
.create_task(root, Budget::INFINITE, async {
std::future::pending::<()>().await;
})
.expect("create pending task");
lab.scheduler.lock().schedule(task_id, 0);
let report = lab.run_with_auto_advance();
assert_eq!(
report.termination,
AutoAdvanceTermination::StuckBailout,
"pending task without deadlines should terminate via stuck bailout"
);
assert!(
!lab.is_quiescent(),
"stuck bailout should preserve non-quiescent state for diagnosis"
);
assert_eq!(
report.auto_advances, 0,
"stuck bailout path should not auto-advance virtual time without deadlines"
);
crate::test_complete!("auto_advance_stuck_bailout_termination");
}
#[test]
fn auto_advance_termination_display() {
assert_eq!(
format!("{}", AutoAdvanceTermination::Quiescent),
"quiescent"
);
assert_eq!(
format!("{}", AutoAdvanceTermination::StepLimitReached),
"step-limit-reached"
);
assert_eq!(
format!("{}", AutoAdvanceTermination::StuckBailout),
"stuck-bailout"
);
}
#[test]
fn auto_advance_termination_debug_clone_copy_eq_hash() {
use std::collections::HashSet;
let variants = [
AutoAdvanceTermination::Quiescent,
AutoAdvanceTermination::StepLimitReached,
AutoAdvanceTermination::StuckBailout,
];
for &v in &variants {
let copied = v;
let cloned = v;
assert_eq!(copied, cloned);
}
let mut set = HashSet::new();
for &v in &variants {
assert!(set.insert(v));
}
assert_eq!(set.len(), 3);
let dbg = format!("{:?}", AutoAdvanceTermination::StuckBailout);
assert!(dbg.contains("StuckBailout"));
}
#[test]
fn harness_attachment_kind_debug_clone_copy_eq_hash_ord_display() {
use std::collections::HashSet;
let kinds = [
HarnessAttachmentKind::CrashPack,
HarnessAttachmentKind::ReplayTrace,
HarnessAttachmentKind::Trace,
HarnessAttachmentKind::Other,
];
assert_eq!(format!("{}", kinds[0]), "crashpack");
assert_eq!(format!("{}", kinds[1]), "replay_trace");
assert_eq!(format!("{}", kinds[2]), "trace");
assert_eq!(format!("{}", kinds[3]), "other");
for &k in &kinds {
let copied = k;
let cloned = k;
assert_eq!(copied, cloned);
}
let mut set = HashSet::new();
for &k in &kinds {
set.insert(k);
}
assert_eq!(set.len(), 4);
assert!(HarnessAttachmentKind::CrashPack < HarnessAttachmentKind::ReplayTrace);
assert!(HarnessAttachmentKind::ReplayTrace < HarnessAttachmentKind::Trace);
assert!(HarnessAttachmentKind::Trace < HarnessAttachmentKind::Other);
let mut sorted = [kinds[3], kinds[0], kinds[2], kinds[1]];
sorted.sort();
assert_eq!(sorted, kinds);
}
#[test]
fn harness_attachment_ref_debug_clone_eq_hash() {
use std::collections::HashSet;
let ref1 = HarnessAttachmentRef::crashpack("crash.bin");
let ref2 = HarnessAttachmentRef::replay_trace("replay.bin");
let ref3 = HarnessAttachmentRef::trace("trace.ndjson");
assert_eq!(ref1.kind, HarnessAttachmentKind::CrashPack);
assert_eq!(ref2.kind, HarnessAttachmentKind::ReplayTrace);
assert_eq!(ref3.kind, HarnessAttachmentKind::Trace);
let cloned = ref1.clone();
assert_eq!(cloned, ref1);
assert_ne!(ref1, ref2);
let dbg = format!("{ref1:?}");
assert!(dbg.contains("HarnessAttachmentRef"));
let mut set = HashSet::new();
set.insert(ref1.clone());
set.insert(ref2);
set.insert(ref1); assert_eq!(set.len(), 2);
}
#[test]
fn chaos_config_summary_debug_clone_copy_partial_eq() {
let summary = ChaosConfigSummary {
seed: 42,
cancel_probability: 0.1,
delay_probability: 0.2,
io_error_probability: 0.05,
wakeup_storm_probability: 0.01,
budget_exhaust_probability: 0.03,
};
let copied = summary;
let cloned = summary;
assert_eq!(copied, cloned);
let dbg = format!("{summary:?}");
assert!(dbg.contains("ChaosConfigSummary"));
}
#[test]
fn obligation_leak_debug_clone_eq_display() {
let leak = ObligationLeak {
obligation: ObligationId::new_for_test(1, 0),
kind: ObligationKind::SendPermit,
holder: TaskId::from_arena(crate::util::ArenaIndex::new(1, 0)),
region: RegionId::new_for_test(0, 0),
};
let cloned = leak.clone();
assert_eq!(cloned, leak);
let dbg = format!("{leak:?}");
assert!(dbg.contains("ObligationLeak"));
let display = format!("{leak}");
assert!(!display.is_empty());
}
#[test]
fn conformance_identical_seed_identical_trace() {
init_test("conformance_identical_seed_identical_trace");
let seed = 42_u64;
let config = LabConfig::new(seed).worker_count(2).max_steps(1000);
let mut reports = Vec::new();
for run_id in 0..2 {
let mut runtime = LabRuntime::new(config.clone());
let root = runtime.state.create_root_region(Budget::INFINITE);
for i in 0..5 {
let (task_id, _handle) = runtime
.state
.create_task(root, Budget::INFINITE, async move {
for j in 0..10 {
futures_lite::future::yield_now().await;
if (i + j) % 3 == 0 {
let now =
crate::cx::Cx::current().map_or(Time::ZERO, |cx| cx.now());
crate::time::sleep(now, Duration::from_millis(1)).await;
}
}
i * 100 + run_id
})
.expect("create task");
runtime.scheduler.lock().schedule(task_id, 0);
}
let report = runtime.run_until_quiescent_with_report();
reports.push(report);
}
let report1 = &reports[0];
let report2 = &reports[1];
crate::assert_with_log!(
report1.seed == report2.seed,
"seeds should be identical",
report1.seed,
report2.seed
);
crate::assert_with_log!(
report1.trace_fingerprint == report2.trace_fingerprint,
"trace fingerprints should be identical",
report1.trace_fingerprint,
report2.trace_fingerprint
);
crate::assert_with_log!(
report1.trace_certificate.event_hash == report2.trace_certificate.event_hash,
"event hashes should be identical",
report1.trace_certificate.event_hash,
report2.trace_certificate.event_hash
);
crate::assert_with_log!(
report1.trace_certificate.event_count == report2.trace_certificate.event_count,
"event counts should be identical",
report1.trace_certificate.event_count,
report2.trace_certificate.event_count
);
crate::assert_with_log!(
report1.trace_certificate.schedule_hash == report2.trace_certificate.schedule_hash,
"schedule hashes should be identical",
report1.trace_certificate.schedule_hash,
report2.trace_certificate.schedule_hash
);
crate::test_complete!("conformance_identical_seed_identical_trace");
}
#[test]
fn conformance_virtual_time_deterministic_advancement() {
init_test("conformance_virtual_time_deterministic_advancement");
let config = LabConfig::new(123).worker_count(1);
crate::lab::assert_deterministic(config, |runtime| {
let root = runtime.state.create_root_region(Budget::INFINITE);
let initial_time = runtime.now();
let durations = [
Duration::from_millis(10),
Duration::from_millis(5),
Duration::from_millis(15),
Duration::from_millis(1),
];
for (i, duration) in durations.iter().enumerate() {
let dur = *duration;
let (task_id, _handle) = runtime
.state
.create_task(root, Budget::INFINITE, async move {
let now = crate::cx::Cx::current().map_or(Time::ZERO, |cx| cx.now());
crate::time::sleep(now, dur).await;
i
})
.expect("create task");
runtime.scheduler.lock().schedule(task_id, 0);
}
let vtime_report = runtime.run_with_auto_advance();
crate::assert_with_log!(
vtime_report.time_end > initial_time,
"virtual time should have advanced",
vtime_report.time_end,
initial_time
);
crate::assert_with_log!(
vtime_report.auto_advances > 0,
"should have auto-advanced virtual time",
vtime_report.auto_advances,
0
);
crate::assert_with_log!(
vtime_report.termination == AutoAdvanceTermination::Quiescent,
"should reach quiescence",
vtime_report.termination,
AutoAdvanceTermination::Quiescent
);
});
crate::test_complete!("conformance_virtual_time_deterministic_advancement");
}
#[test]
fn conformance_scheduler_deterministic_lottery() {
init_test("conformance_scheduler_deterministic_lottery");
let config = LabConfig::new(456).worker_count(4);
let mut schedule_sequences = Vec::new();
for run in 0..2 {
let mut runtime = LabRuntime::new(config.clone());
let root = runtime.state.create_root_region(Budget::INFINITE);
let mut task_order = Vec::new();
for task_idx in 0..20 {
let (task_id, _handle) = runtime
.state
.create_task(root, Budget::INFINITE, async move {
for _ in 0..3 {
futures_lite::future::yield_now().await;
}
task_idx
})
.expect("create task");
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_quiescent();
let cert = runtime.certificate();
task_order.push((run, cert.decisions(), cert.hash()));
schedule_sequences.push(task_order);
}
let seq1 = &schedule_sequences[0];
let seq2 = &schedule_sequences[1];
crate::assert_with_log!(
seq1.len() == seq2.len(),
"should have same number of scheduling decision points",
seq1.len(),
seq2.len()
);
for (i, ((_run1, count1, hash1), (_run2, count2, hash2))) in
seq1.iter().zip(seq2.iter()).enumerate()
{
crate::assert_with_log!(
count1 == count2,
&format!("decision count should be identical at point {}", i),
count1,
count2
);
crate::assert_with_log!(
hash1 == hash2,
&format!("schedule hash should be identical at point {}", i),
hash1,
hash2
);
}
crate::test_complete!("conformance_scheduler_deterministic_lottery");
}
#[test]
fn conformance_chaos_injection_deterministic() {
init_test("conformance_chaos_injection_deterministic");
let chaos_config = crate::lab::chaos::ChaosConfig::new(789)
.with_cancel_probability(0.1)
.with_delay_probability(0.05)
.with_io_error_probability(0.02);
let config = LabConfig::new(999).with_chaos(chaos_config);
crate::lab::assert_deterministic(config, |runtime| {
let root = runtime.state.create_root_region(Budget::INFINITE);
for i in 0..10 {
let (task_id, _handle) = runtime
.state
.create_task(root, Budget::INFINITE, async move {
for j in 0..20 {
futures_lite::future::yield_now().await;
if j % 5 == 0 {
let now =
crate::cx::Cx::current().map_or(Time::ZERO, |cx| cx.now());
crate::time::sleep(now, Duration::from_millis(1)).await;
}
}
i
})
.expect("create task");
runtime.scheduler.lock().schedule(task_id, 0);
}
runtime.run_until_quiescent();
let chaos_stats = runtime.chaos_stats();
let total_decisions = chaos_stats.decision_points;
crate::assert_with_log!(
total_decisions > 0,
"chaos should have made some decisions",
total_decisions,
0
);
});
crate::test_complete!("conformance_chaos_injection_deterministic");
}
#[test]
fn conformance_panic_semantics_deterministic() {
init_test("conformance_panic_semantics_deterministic");
let config = LabConfig::new(333)
.worker_count(3)
.panic_on_leak(false)
.max_steps(10_000);
crate::lab::assert_deterministic(config, |runtime| {
let root = runtime.state.create_root_region(Budget::INFINITE);
for i in 0..5 {
let (task_id, _handle) = runtime
.state
.create_task(root, Budget::INFINITE, async move {
assert!(i != 2, "deterministic panic in task {}", i);
for _j in 0..10 {
futures_lite::future::yield_now().await;
}
i * 10
})
.expect("create task");
runtime.scheduler.lock().schedule(task_id, 0);
}
let report = runtime.run_until_quiescent_with_report();
let trace_events = runtime.trace().snapshot();
let complete_events = trace_events
.iter()
.filter(|event| event.kind == TraceEventKind::Complete)
.count();
crate::assert_with_log!(
complete_events > 0,
"should have recorded task completion activity",
complete_events,
0
);
crate::assert_with_log!(
report.quiescent,
"runtime should reach quiescence despite panic",
report.quiescent,
false
);
});
crate::test_complete!("conformance_panic_semantics_deterministic");
}
#[test]
fn conformance_comprehensive_determinism_stress() {
init_test("conformance_comprehensive_determinism_stress");
let chaos_config = crate::lab::chaos::ChaosConfig::new(555)
.with_cancel_probability(0.05)
.with_delay_probability(0.03);
let config = LabConfig::new(777)
.worker_count(4)
.with_chaos(chaos_config)
.max_steps(5000);
crate::lab::assert_deterministic_multi(&config, 3, |runtime| {
let root = runtime.state.create_root_region(Budget::INFINITE);
for i in 0..15 {
let (task_id, _handle) = runtime
.state
.create_task(root, Budget::INFINITE, async move {
for j in 0..30 {
match (i + j) % 4 {
0 => futures_lite::future::yield_now().await,
1 => {
let now =
crate::cx::Cx::current().map_or(Time::ZERO, |cx| cx.now());
crate::time::sleep(now, Duration::from_millis(j as u64 % 5))
.await;
}
2 => {
let mut sum = 0_u64;
for k in 0..100 {
sum = sum.wrapping_add(k);
}
let _ = sum;
}
_ => futures_lite::future::yield_now().await,
}
}
i * 1000 + 42
})
.expect("create task");
runtime.scheduler.lock().schedule(task_id, 0);
}
let vtime_report = runtime.run_with_auto_advance();
crate::assert_with_log!(
vtime_report.termination == AutoAdvanceTermination::Quiescent,
"comprehensive workload should reach quiescence",
vtime_report.termination,
AutoAdvanceTermination::Quiescent
);
});
crate::test_complete!("conformance_comprehensive_determinism_stress");
}
#[test]
#[allow(clippy::literal_string_with_formatting_args)]
fn non_test_lab_runtime_paths_do_not_use_stray_stdout_debug_prints() {
let source =
std::fs::read_to_string(std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join(file!()))
.expect("lab runtime source must be readable");
for message in [
"LabScheduler already scheduled {task:?}",
"LabScheduler scheduling {task:?}",
"Executing {:?} at step {}",
"rng_value = {}, worker_hint = {}",
] {
let stdout_call = format!("print{}!(\"{message}\"", "ln");
assert!(
!source.contains(&stdout_call),
"non-test LabRuntime debug print regressed: {message}"
);
}
}
}