use crate::types::Time;
use std::fmt;
#[derive(Debug, Clone, Copy)]
pub struct PotentialWeights {
pub w_tasks: f64,
pub w_obligation_age: f64,
pub w_draining_regions: f64,
pub w_deadline_pressure: f64,
}
impl PotentialWeights {
#[must_use]
pub const fn uniform(w: f64) -> Self {
Self {
w_tasks: w,
w_obligation_age: w,
w_draining_regions: w,
w_deadline_pressure: w,
}
}
#[must_use]
pub const fn obligation_focused() -> Self {
Self {
w_tasks: 1.0,
w_obligation_age: 10.0,
w_draining_regions: 5.0,
w_deadline_pressure: 2.0,
}
}
#[must_use]
pub const fn deadline_focused() -> Self {
Self {
w_tasks: 1.0,
w_obligation_age: 2.0,
w_draining_regions: 3.0,
w_deadline_pressure: 10.0,
}
}
#[must_use]
pub fn is_valid(&self) -> bool {
self.w_tasks >= 0.0
&& self.w_tasks.is_finite()
&& self.w_obligation_age >= 0.0
&& self.w_obligation_age.is_finite()
&& self.w_draining_regions >= 0.0
&& self.w_draining_regions.is_finite()
&& self.w_deadline_pressure >= 0.0
&& self.w_deadline_pressure.is_finite()
}
}
impl Default for PotentialWeights {
fn default() -> Self {
Self {
w_tasks: 1.0,
w_obligation_age: 5.0,
w_draining_regions: 3.0,
w_deadline_pressure: 2.0,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct StateSnapshot {
pub time: Time,
pub live_tasks: u32,
pub pending_obligations: u32,
pub obligation_age_sum_ns: u64,
pub draining_regions: u32,
pub deadline_pressure: f64,
pub pending_send_permits: u32,
pub pending_acks: u32,
pub pending_leases: u32,
pub pending_io_ops: u32,
pub cancel_requested_tasks: u32,
pub cancelling_tasks: u32,
pub finalizing_tasks: u32,
pub ready_queue_depth: u32,
}
impl StateSnapshot {
#[allow(dead_code)]
#[inline]
fn accumulate_cancel_phase_counts(
task_state: &crate::record::task::TaskState,
cancel_requested_tasks: &mut u32,
cancelling_tasks: &mut u32,
finalizing_tasks: &mut u32,
) {
match task_state {
crate::record::task::TaskState::CancelRequested { .. } => {
*cancel_requested_tasks = cancel_requested_tasks.saturating_add(1);
}
crate::record::task::TaskState::Cancelling { .. } => {
*cancelling_tasks = cancelling_tasks.saturating_add(1);
}
crate::record::task::TaskState::Finalizing { .. } => {
*finalizing_tasks = finalizing_tasks.saturating_add(1);
}
_ => {}
}
}
#[must_use]
pub fn from_runtime_state(state: &crate::runtime::RuntimeState) -> Self {
use crate::record::obligation::ObligationKind;
use crate::record::task::TaskPhase;
const DEADLINE_PRESSURE_D0_NS: u64 = 1_000_000_000;
let now = state.now;
let live_tasks = state.tasks.live_task_count() as u32;
let cancel_requested_tasks = state.tasks.count_in_phase(TaskPhase::CancelRequested) as u32;
let cancelling_tasks = state.tasks.count_in_phase(TaskPhase::Cancelling) as u32;
let finalizing_tasks = state.tasks.count_in_phase(TaskPhase::Finalizing) as u32;
let tasks_with_deadline = state.tasks.tasks_with_deadline_count();
let deadline_pressure = if tasks_with_deadline > 0 {
let deadline_sum_ns = state.tasks.deadline_sum_ns();
let now_ns = u128::from(now.as_nanos());
#[allow(clippy::cast_precision_loss)]
let count = tasks_with_deadline as f64;
#[allow(clippy::cast_precision_loss)]
let d0 = DEADLINE_PRESSURE_D0_NS as f64;
#[allow(clippy::cast_precision_loss)]
let sum_d = deadline_sum_ns as f64;
#[allow(clippy::cast_precision_loss)]
let now_f = now_ns as f64;
let p = count - (sum_d / d0) + (count * now_f / d0);
p.max(0.0)
} else {
0.0
};
#[allow(clippy::cast_possible_truncation)]
let pending_obligations: u32 = state.pending_obligation_count() as u32;
let now_ns = u128::from(now.as_nanos());
let pending_reserved_at_sum = state.pending_obligation_reserved_at_sum_ns();
let total_pending_nanos = now_ns.saturating_mul(u128::from(pending_obligations));
let obligation_age_sum_ns: u64 = total_pending_nanos
.saturating_sub(pending_reserved_at_sum)
.min(u128::from(u64::MAX)) as u64;
#[allow(clippy::cast_possible_truncation)]
let pending_send_permits: u32 =
state.pending_obligation_count_for_kind(ObligationKind::SendPermit) as u32;
#[allow(clippy::cast_possible_truncation)]
let pending_acks: u32 = state.pending_obligation_count_for_kind(ObligationKind::Ack) as u32;
#[allow(clippy::cast_possible_truncation)]
let pending_leases: u32 = (state
.pending_obligation_count_for_kind(ObligationKind::Lease)
.saturating_add(
state.pending_obligation_count_for_kind(ObligationKind::SemaphorePermit),
)) as u32;
#[allow(clippy::cast_possible_truncation)]
let pending_io_ops: u32 =
state.pending_obligation_count_for_kind(ObligationKind::IoOp) as u32;
let draining_regions = state.draining_region_count_for_snapshot() as u32;
Self {
time: now,
live_tasks,
pending_obligations,
obligation_age_sum_ns,
draining_regions,
deadline_pressure,
pending_send_permits,
pending_acks,
pending_leases,
pending_io_ops,
cancel_requested_tasks,
cancelling_tasks,
finalizing_tasks,
ready_queue_depth: 0, }
}
#[must_use]
pub fn is_quiescent(&self) -> bool {
self.live_tasks == 0
&& self.pending_obligations == 0
&& self.draining_regions == 0
&& self.deadline_pressure.abs() < f64::EPSILON
}
#[must_use]
pub fn with_ready_queue_depth(mut self, depth: u32) -> Self {
self.ready_queue_depth = depth;
self
}
#[must_use]
pub fn total_cancelling_tasks(&self) -> u32 {
self.cancel_requested_tasks
.saturating_add(self.cancelling_tasks)
.saturating_add(self.finalizing_tasks)
}
}
impl fmt::Display for StateSnapshot {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Σ(t={}, tasks={}, obligations={}[sp={},ack={},lease={},io={}], \
age_sum={}ns, draining={}, cancel={}/{}/{}, queue={}, deadline_p={:.2})",
self.time,
self.live_tasks,
self.pending_obligations,
self.pending_send_permits,
self.pending_acks,
self.pending_leases,
self.pending_io_ops,
self.obligation_age_sum_ns,
self.draining_regions,
self.cancel_requested_tasks,
self.cancelling_tasks,
self.finalizing_tasks,
self.ready_queue_depth,
self.deadline_pressure,
)
}
}
#[derive(Debug, Clone)]
pub struct PotentialRecord {
pub snapshot: StateSnapshot,
pub total: f64,
pub task_component: f64,
pub obligation_component: f64,
pub region_component: f64,
pub deadline_component: f64,
}
impl PotentialRecord {
#[must_use]
pub fn is_zero(&self) -> bool {
self.total.abs() < f64::EPSILON
}
}
impl fmt::Display for PotentialRecord {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"V={:.2} [tasks={:.2}, obligations={:.2}, regions={:.2}, deadlines={:.2}]",
self.total,
self.task_component,
self.obligation_component,
self.region_component,
self.deadline_component,
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SchedulingSuggestion {
DrainObligations,
DrainRegions,
MeetDeadlines,
NoPreference,
}
impl SchedulingSuggestion {
#[must_use]
pub const fn description(self) -> &'static str {
match self {
Self::DrainObligations => "prioritize obligation holders",
Self::DrainRegions => "prioritize draining region tasks",
Self::MeetDeadlines => "prioritize deadline-critical tasks",
Self::NoPreference => "no scheduling preference",
}
}
}
impl fmt::Display for SchedulingSuggestion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.description())
}
}
#[derive(Debug, Clone)]
pub struct ConvergenceVerdict {
pub monotone: bool,
pub reached_quiescence: bool,
pub v_max: f64,
pub v_final: f64,
pub increase_count: usize,
pub max_increase: f64,
pub steps: usize,
}
impl ConvergenceVerdict {
#[must_use]
pub fn converged(&self) -> bool {
self.monotone && self.reached_quiescence
}
}
impl fmt::Display for ConvergenceVerdict {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Convergence Verdict")?;
writeln!(f, "===================")?;
writeln!(f, "Steps: {}", self.steps)?;
writeln!(f, "Monotone: {}", self.monotone)?;
writeln!(f, "Quiescent: {}", self.reached_quiescence)?;
writeln!(f, "Converged: {}", self.converged())?;
writeln!(f, "V_max: {:.4}", self.v_max)?;
writeln!(f, "V_final: {:.4}", self.v_final)?;
if !self.monotone {
writeln!(f, "Violations: {}", self.increase_count)?;
writeln!(f, "Max increase: {:.4}", self.max_increase)?;
}
Ok(())
}
}
#[derive(Debug)]
pub struct LyapunovGovernor {
weights: PotentialWeights,
history: Vec<PotentialRecord>,
}
impl LyapunovGovernor {
const MAX_HISTORY: usize = 8192;
#[must_use]
pub fn new(weights: PotentialWeights) -> Self {
assert!(weights.is_valid(), "weights must be non-negative");
Self {
weights,
history: Vec::new(),
}
}
#[must_use]
pub fn with_defaults() -> Self {
Self::new(PotentialWeights::default())
}
pub fn compute_potential(&mut self, snapshot: &StateSnapshot) -> f64 {
let record = self.compute(snapshot);
let total = record.total;
self.history.push(record);
if self.history.len() > Self::MAX_HISTORY {
let drain_count = Self::MAX_HISTORY / 2;
self.history.drain(..drain_count);
}
total
}
#[must_use]
pub fn compute_record(&self, snapshot: &StateSnapshot) -> PotentialRecord {
self.compute(snapshot)
}
#[must_use]
pub fn suggest(&self, snapshot: &StateSnapshot) -> SchedulingSuggestion {
if snapshot.is_quiescent() {
return SchedulingSuggestion::NoPreference;
}
let record = self.compute(snapshot);
let components = [
(
record.obligation_component,
SchedulingSuggestion::DrainObligations,
),
(record.region_component, SchedulingSuggestion::DrainRegions),
(
record.deadline_component,
SchedulingSuggestion::MeetDeadlines,
),
];
components
.iter()
.max_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal))
.filter(|(v, _)| *v > 0.0)
.map_or(SchedulingSuggestion::NoPreference, |(_, s)| *s)
}
#[must_use]
pub fn analyze_convergence(&self) -> ConvergenceVerdict {
if self.history.is_empty() {
return ConvergenceVerdict {
monotone: true,
reached_quiescence: false,
v_max: 0.0,
v_final: 0.0,
increase_count: 0,
max_increase: 0.0,
steps: 0,
};
}
let mut monotone = true;
let mut increase_count: usize = 0;
let mut max_increase = 0.0_f64;
let mut v_max = 0.0_f64;
for window in self.history.windows(2) {
let prev = window[0].total;
let curr = window[1].total;
v_max = v_max.max(prev).max(curr);
let delta = curr - prev;
if delta > f64::EPSILON {
monotone = false;
increase_count = increase_count.saturating_add(1);
max_increase = max_increase.max(delta);
}
}
v_max = v_max.max(self.history.first().map_or(0.0, |r| r.total));
let v_final = self.history.last().map_or(0.0, |r| r.total);
let reached_quiescence = v_final.abs() < f64::EPSILON;
ConvergenceVerdict {
monotone,
reached_quiescence,
v_max,
v_final,
increase_count,
max_increase,
steps: self.history.len(),
}
}
#[must_use]
pub fn history(&self) -> &[PotentialRecord] {
&self.history
}
pub fn clear_history(&mut self) {
self.history.clear();
}
#[must_use]
pub const fn weights(&self) -> &PotentialWeights {
&self.weights
}
fn compute(&self, snapshot: &StateSnapshot) -> PotentialRecord {
let task_component = self.weights.w_tasks * f64::from(snapshot.live_tasks);
#[allow(clippy::cast_precision_loss)]
let age_seconds = snapshot.obligation_age_sum_ns as f64 / 1_000_000_000.0;
let obligation_component = self.weights.w_obligation_age * age_seconds;
let region_component =
self.weights.w_draining_regions * f64::from(snapshot.draining_regions);
let deadline_component = self.weights.w_deadline_pressure * snapshot.deadline_pressure;
let total = task_component + obligation_component + region_component + deadline_component;
PotentialRecord {
snapshot: snapshot.clone(),
total,
task_component,
obligation_component,
region_component,
deadline_component,
}
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
use crate::lab::runtime::InvariantViolation;
use crate::record::ObligationKind;
use crate::record::obligation::ObligationRecord;
use crate::record::region::{RegionRecord, RegionState};
use crate::record::task::{TaskPhase, TaskRecord};
use crate::runtime::RuntimeState;
use crate::runtime::state::ReadBiasedRegionSnapshotStats;
use crate::types::Budget;
use proptest::prelude::*;
use serde::Deserialize;
use serde_json::{Value, json};
use std::collections::{BTreeMap, BTreeSet, hash_map::DefaultHasher};
use std::fs;
use std::hash::{Hash, Hasher};
use std::mem::size_of;
use std::path::Path;
use std::time::Instant;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn quiescent_snapshot() -> StateSnapshot {
StateSnapshot {
time: Time::ZERO,
live_tasks: 0,
pending_obligations: 0,
obligation_age_sum_ns: 0,
draining_regions: 0,
deadline_pressure: 0.0,
pending_send_permits: 0,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
}
}
fn active_snapshot(tasks: u32, obligations: u32, age_ns: u64, draining: u32) -> StateSnapshot {
StateSnapshot {
time: Time::from_nanos(age_ns),
live_tasks: tasks,
pending_obligations: obligations,
obligation_age_sum_ns: age_ns,
draining_regions: draining,
deadline_pressure: 0.0,
pending_send_permits: obligations, pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
}
}
fn snapshot_with_components(
tasks: u32,
send_permits: u32,
age_ns: u64,
draining: u32,
deadline_pressure: f64,
) -> StateSnapshot {
StateSnapshot {
time: Time::from_nanos(age_ns),
live_tasks: tasks,
pending_obligations: send_permits,
obligation_age_sum_ns: age_ns,
draining_regions: draining,
deadline_pressure,
pending_send_permits: send_permits,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
}
}
#[test]
fn snapshot_from_runtime_counts_tasks_obligations_and_regions() {
init_test("snapshot_from_runtime_counts_tasks_obligations_and_regions");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::unlimited());
let (task_id, _handle) = state
.create_task(root, Budget::unlimited(), async {})
.expect("create_task must succeed");
let obligation_id = state
.create_obligation(ObligationKind::SendPermit, task_id, root, None)
.expect("create_obligation must succeed");
state.now = Time::from_nanos(100);
let snap = StateSnapshot::from_runtime_state(&state);
crate::assert_with_log!(snap.time == state.now, "time", state.now, snap.time);
crate::assert_with_log!(snap.live_tasks == 1, "live_tasks", 1, snap.live_tasks);
crate::assert_with_log!(
snap.pending_obligations == 1,
"pending_obligations",
1,
snap.pending_obligations
);
crate::assert_with_log!(
snap.obligation_age_sum_ns == 100,
"obligation_age_sum_ns",
100,
snap.obligation_age_sum_ns
);
crate::assert_with_log!(
snap.draining_regions == 0,
"draining_regions",
0,
snap.draining_regions
);
crate::assert_with_log!(
snap.pending_send_permits == 1,
"pending_send_permits",
1,
snap.pending_send_permits
);
crate::assert_with_log!(snap.pending_acks == 0, "pending_acks", 0, snap.pending_acks);
crate::assert_with_log!(
snap.pending_leases == 0,
"pending_leases",
0,
snap.pending_leases
);
crate::assert_with_log!(
snap.pending_io_ops == 0,
"pending_io_ops",
0,
snap.pending_io_ops
);
{
let region = state.region(root).expect("root region exists");
let ok = region.begin_close(None);
crate::assert_with_log!(ok, "begin_close", true, ok);
let ok = region.begin_drain();
crate::assert_with_log!(ok, "begin_drain", true, ok);
}
let snap2 = StateSnapshot::from_runtime_state(&state);
crate::assert_with_log!(
snap2.draining_regions == 1,
"draining_regions after begin_drain",
1,
snap2.draining_regions
);
state
.commit_obligation(obligation_id)
.expect("commit_obligation must succeed");
let snap3 = StateSnapshot::from_runtime_state(&state);
crate::assert_with_log!(
snap3.pending_obligations == 0,
"pending_obligations after commit",
0,
snap3.pending_obligations
);
crate::assert_with_log!(
snap3.pending_send_permits == 0,
"pending_send_permits after commit",
0,
snap3.pending_send_permits
);
crate::test_complete!("snapshot_from_runtime_counts_tasks_obligations_and_regions");
}
#[test]
fn snapshot_from_runtime_computes_deadline_pressure() {
init_test("snapshot_from_runtime_computes_deadline_pressure");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::unlimited());
let (_task_id, _handle) = state
.create_task(root, Budget::with_deadline_ns(500_000_000), async {})
.expect("create_task must succeed");
state.now = Time::ZERO;
let snap = StateSnapshot::from_runtime_state(&state);
let expected = 0.5_f64;
let ok = (snap.deadline_pressure - expected).abs() < 1e-9;
crate::assert_with_log!(
ok,
"deadline_pressure at t=0",
expected,
snap.deadline_pressure
);
state.now = Time::from_nanos(600_000_000);
let snap2 = StateSnapshot::from_runtime_state(&state);
let expected_overdue = 1.1_f64;
let ok2 = (snap2.deadline_pressure - expected_overdue).abs() < 1e-9;
crate::assert_with_log!(
ok2,
"deadline_pressure overdue",
expected_overdue,
snap2.deadline_pressure
);
crate::test_complete!("snapshot_from_runtime_computes_deadline_pressure");
}
#[test]
fn with_ready_queue_depth_sets_field() {
init_test("with_ready_queue_depth_sets_field");
let snap = quiescent_snapshot().with_ready_queue_depth(42);
crate::assert_with_log!(
snap.ready_queue_depth == 42,
"ready_queue_depth",
42,
snap.ready_queue_depth
);
crate::test_complete!("with_ready_queue_depth_sets_field");
}
#[test]
fn total_cancelling_tasks_sums_phases() {
init_test("total_cancelling_tasks_sums_phases");
let mut snap = quiescent_snapshot();
snap.cancel_requested_tasks = 3;
snap.cancelling_tasks = 2;
snap.finalizing_tasks = 1;
let total = snap.total_cancelling_tasks();
crate::assert_with_log!(total == 6, "total_cancelling", 6, total);
crate::test_complete!("total_cancelling_tasks_sums_phases");
}
#[test]
fn per_kind_obligation_breakdown_sums_to_total() {
init_test("per_kind_obligation_breakdown_sums_to_total");
let snap = StateSnapshot {
time: Time::ZERO,
live_tasks: 4,
pending_obligations: 7,
obligation_age_sum_ns: 0,
draining_regions: 0,
deadline_pressure: 0.0,
pending_send_permits: 2,
pending_acks: 1,
pending_leases: 3,
pending_io_ops: 1,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
};
let sum = snap.pending_send_permits
+ snap.pending_acks
+ snap.pending_leases
+ snap.pending_io_ops;
crate::assert_with_log!(
sum == snap.pending_obligations,
"per-kind sums to total",
snap.pending_obligations,
sum
);
crate::test_complete!("per_kind_obligation_breakdown_sums_to_total");
}
#[test]
fn display_includes_extended_fields() {
init_test("display_includes_extended_fields");
let mut snap = active_snapshot(3, 2, 100_000_000, 1);
snap.cancel_requested_tasks = 1;
snap.cancelling_tasks = 1;
snap.ready_queue_depth = 5;
let s = format!("{snap}");
let has_cancel = s.contains("cancel=1/1/0");
crate::assert_with_log!(has_cancel, "display shows cancel phases", true, has_cancel);
let has_queue = s.contains("queue=5");
crate::assert_with_log!(has_queue, "display shows queue depth", true, has_queue);
let has_kind = s.contains("sp=2");
crate::assert_with_log!(has_kind, "display shows per-kind", true, has_kind);
crate::test_complete!("display_includes_extended_fields");
}
#[test]
fn potential_zero_iff_quiescent() {
init_test("potential_zero_iff_quiescent");
let governor = LyapunovGovernor::with_defaults();
let v = governor.compute_record(&quiescent_snapshot());
let is_zero = v.is_zero();
crate::assert_with_log!(is_zero, "quiescent is zero", true, is_zero);
let v_active = governor.compute_record(&active_snapshot(1, 0, 0, 0));
let not_zero = !v_active.is_zero();
crate::assert_with_log!(not_zero, "active is not zero", true, not_zero);
crate::test_complete!("potential_zero_iff_quiescent");
}
#[test]
fn potential_non_negative() {
init_test("potential_non_negative");
let governor = LyapunovGovernor::with_defaults();
let configs = [
(0, 0, 0, 0),
(1, 0, 0, 0),
(0, 1, 100, 0),
(5, 3, 1000, 2),
(100, 50, 1_000_000_000, 10),
];
for (tasks, obligations, age, draining) in configs {
let snap = active_snapshot(tasks, obligations, age, draining);
let v = governor.compute_record(&snap);
let non_neg = v.total >= 0.0;
crate::assert_with_log!(non_neg, format!("non-negative for {snap}"), true, non_neg);
}
crate::test_complete!("potential_non_negative");
}
#[test]
fn potential_increases_with_more_tasks() {
init_test("potential_increases_with_more_tasks");
let governor = LyapunovGovernor::with_defaults();
let v1 = governor.compute_record(&active_snapshot(1, 0, 0, 0));
let v2 = governor.compute_record(&active_snapshot(5, 0, 0, 0));
let v3 = governor.compute_record(&active_snapshot(10, 0, 0, 0));
let inc1 = v2.total > v1.total;
crate::assert_with_log!(inc1, "more tasks = higher V", true, inc1);
let inc2 = v3.total > v2.total;
crate::assert_with_log!(inc2, "even more tasks", true, inc2);
crate::test_complete!("potential_increases_with_more_tasks");
}
#[test]
fn potential_increases_with_obligation_age() {
init_test("potential_increases_with_obligation_age");
let governor = LyapunovGovernor::with_defaults();
let v1 = governor.compute_record(&active_snapshot(1, 1, 100, 0));
let v2 = governor.compute_record(&active_snapshot(1, 1, 1_000_000_000, 0));
let inc = v2.total > v1.total;
crate::assert_with_log!(inc, "older obligations = higher V", true, inc);
crate::test_complete!("potential_increases_with_obligation_age");
}
#[test]
fn potential_increases_with_draining_regions() {
init_test("potential_increases_with_draining_regions");
let governor = LyapunovGovernor::with_defaults();
let v1 = governor.compute_record(&active_snapshot(1, 0, 0, 0));
let v2 = governor.compute_record(&active_snapshot(1, 0, 0, 3));
let inc = v2.total > v1.total;
crate::assert_with_log!(inc, "draining regions increase V", true, inc);
crate::test_complete!("potential_increases_with_draining_regions");
}
#[test]
fn potential_deadline_pressure() {
init_test("potential_deadline_pressure");
let governor = LyapunovGovernor::with_defaults();
let snap_no_pressure = StateSnapshot {
time: Time::ZERO,
live_tasks: 1,
pending_obligations: 0,
obligation_age_sum_ns: 0,
draining_regions: 0,
deadline_pressure: 0.0,
pending_send_permits: 0,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
};
let v1 = governor.compute_record(&snap_no_pressure);
let snap_high_pressure = StateSnapshot {
deadline_pressure: 5.0,
..snap_no_pressure
};
let v2 = governor.compute_record(&snap_high_pressure);
let inc = v2.total > v1.total;
crate::assert_with_log!(inc, "deadline pressure increases V", true, inc);
crate::test_complete!("potential_deadline_pressure");
}
proptest! {
#[test]
fn metamorphic_componentwise_reduction_never_increases_potential(
tasks in 0u32..40,
obligations in 0u32..40,
age_ns in 0u64..2_000_000_000,
draining in 0u32..20,
deadline_millis in 0u32..20_000,
task_reduction in 0u32..40,
obligation_reduction in 0u32..40,
age_reduction in 0u64..2_000_000_000,
draining_reduction in 0u32..20,
deadline_reduction_millis in 0u32..20_000,
) {
let reduced_tasks = tasks.saturating_sub(task_reduction);
let reduced_obligations = obligations.saturating_sub(obligation_reduction);
let reduced_age_ns = age_ns.saturating_sub(age_reduction);
let reduced_draining = draining.saturating_sub(draining_reduction);
let deadline_pressure = f64::from(deadline_millis) / 1000.0;
let reduced_deadline_pressure =
f64::from(deadline_millis.saturating_sub(deadline_reduction_millis)) / 1000.0;
let fuller = snapshot_with_components(
tasks,
obligations,
age_ns,
draining,
deadline_pressure,
);
let reduced = snapshot_with_components(
reduced_tasks,
reduced_obligations,
reduced_age_ns,
reduced_draining,
reduced_deadline_pressure,
);
let weights = [
PotentialWeights::default(),
PotentialWeights::uniform(1.0),
PotentialWeights::obligation_focused(),
PotentialWeights::deadline_focused(),
];
for weight_set in weights {
let governor = LyapunovGovernor::new(weight_set);
let fuller_record = governor.compute_record(&fuller);
let reduced_record = governor.compute_record(&reduced);
prop_assert!(
reduced_record.total <= fuller_record.total + f64::EPSILON,
"component-wise reduction increased total potential: full={fuller_record:?}, reduced={reduced_record:?}, weights={weight_set:?}"
);
prop_assert!(
reduced_record.task_component <= fuller_record.task_component + f64::EPSILON,
"task component increased under task reduction"
);
prop_assert!(
reduced_record.obligation_component <= fuller_record.obligation_component + f64::EPSILON,
"obligation component increased under age reduction"
);
prop_assert!(
reduced_record.region_component <= fuller_record.region_component + f64::EPSILON,
"region component increased under draining reduction"
);
prop_assert!(
reduced_record.deadline_component <= fuller_record.deadline_component + f64::EPSILON,
"deadline component increased under deadline-pressure reduction"
);
}
}
}
#[test]
fn convergence_monotone_drain() {
init_test("convergence_monotone_drain");
let mut governor = LyapunovGovernor::with_defaults();
let trajectory = vec![
active_snapshot(10, 5, 500_000_000, 3),
active_snapshot(8, 4, 400_000_000, 3),
active_snapshot(6, 3, 250_000_000, 2),
active_snapshot(4, 2, 100_000_000, 1),
active_snapshot(2, 1, 30_000_000, 1),
active_snapshot(1, 0, 0, 0),
quiescent_snapshot(),
];
for snap in &trajectory {
governor.compute_potential(snap);
}
let verdict = governor.analyze_convergence();
let mono = verdict.monotone;
crate::assert_with_log!(mono, "monotone", true, mono);
let converged = verdict.converged();
crate::assert_with_log!(converged, "converged", true, converged);
let v_final = verdict.v_final;
crate::assert_with_log!(v_final.abs() < f64::EPSILON, "v_final", 0.0, v_final);
crate::test_complete!("convergence_monotone_drain");
}
#[test]
fn convergence_non_monotone_detected() {
init_test("convergence_non_monotone_detected");
let mut governor = LyapunovGovernor::with_defaults();
let trajectory = vec![
active_snapshot(5, 2, 100_000_000, 1),
active_snapshot(3, 1, 50_000_000, 1),
active_snapshot(6, 3, 200_000_000, 2), active_snapshot(4, 2, 100_000_000, 1),
active_snapshot(1, 0, 0, 0),
quiescent_snapshot(),
];
for snap in &trajectory {
governor.compute_potential(snap);
}
let verdict = governor.analyze_convergence();
let not_mono = !verdict.monotone;
crate::assert_with_log!(not_mono, "not monotone", true, not_mono);
let violations = verdict.increase_count;
crate::assert_with_log!(violations >= 1, "has violations", true, violations >= 1);
let quiescent = verdict.reached_quiescence;
crate::assert_with_log!(quiescent, "reached quiescence", true, quiescent);
crate::test_complete!("convergence_non_monotone_detected");
}
#[test]
fn convergence_stuck_not_quiescent() {
init_test("convergence_stuck_not_quiescent");
let mut governor = LyapunovGovernor::with_defaults();
let trajectory = vec![
active_snapshot(5, 3, 300_000_000, 2),
active_snapshot(3, 2, 200_000_000, 1),
active_snapshot(2, 2, 200_000_000, 1),
active_snapshot(2, 2, 200_000_000, 1), ];
for snap in &trajectory {
governor.compute_potential(snap);
}
let verdict = governor.analyze_convergence();
let not_converged = !verdict.converged();
crate::assert_with_log!(not_converged, "not converged", true, not_converged);
let not_quiescent = !verdict.reached_quiescence;
crate::assert_with_log!(not_quiescent, "not quiescent", true, not_quiescent);
crate::test_complete!("convergence_stuck_not_quiescent");
}
#[test]
fn suggest_no_preference_when_quiescent() {
init_test("suggest_no_preference_when_quiescent");
let governor = LyapunovGovernor::with_defaults();
let suggestion = governor.suggest(&quiescent_snapshot());
let is_no_pref = suggestion == SchedulingSuggestion::NoPreference;
crate::assert_with_log!(is_no_pref, "no preference when quiescent", true, is_no_pref);
crate::test_complete!("suggest_no_preference_when_quiescent");
}
#[test]
fn suggest_drain_obligations_when_dominant() {
init_test("suggest_drain_obligations_when_dominant");
let governor = LyapunovGovernor::new(PotentialWeights::obligation_focused());
let snap = StateSnapshot {
time: Time::from_nanos(1_000_000_000),
live_tasks: 1,
pending_obligations: 10,
obligation_age_sum_ns: 5_000_000_000, draining_regions: 0,
deadline_pressure: 0.0,
pending_send_permits: 10,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
};
let suggestion = governor.suggest(&snap);
let is_obligations = suggestion == SchedulingSuggestion::DrainObligations;
crate::assert_with_log!(
is_obligations,
"suggests draining obligations",
true,
is_obligations
);
crate::test_complete!("suggest_drain_obligations_when_dominant");
}
#[test]
fn suggest_drain_regions_when_dominant() {
init_test("suggest_drain_regions_when_dominant");
let governor = LyapunovGovernor::with_defaults();
let snap = StateSnapshot {
time: Time::ZERO,
live_tasks: 1,
pending_obligations: 0,
obligation_age_sum_ns: 0,
draining_regions: 10, deadline_pressure: 0.0,
pending_send_permits: 0,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
};
let suggestion = governor.suggest(&snap);
let is_regions = suggestion == SchedulingSuggestion::DrainRegions;
crate::assert_with_log!(is_regions, "suggests draining regions", true, is_regions);
crate::test_complete!("suggest_drain_regions_when_dominant");
}
#[test]
fn suggest_meet_deadlines_when_dominant() {
init_test("suggest_meet_deadlines_when_dominant");
let governor = LyapunovGovernor::new(PotentialWeights::deadline_focused());
let snap = StateSnapshot {
time: Time::ZERO,
live_tasks: 1,
pending_obligations: 0,
obligation_age_sum_ns: 0,
draining_regions: 0,
deadline_pressure: 10.0, pending_send_permits: 0,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
};
let suggestion = governor.suggest(&snap);
let is_deadlines = suggestion == SchedulingSuggestion::MeetDeadlines;
crate::assert_with_log!(
is_deadlines,
"suggests meeting deadlines",
true,
is_deadlines
);
crate::test_complete!("suggest_meet_deadlines_when_dominant");
}
#[test]
fn weights_uniform() {
init_test("weights_uniform");
let w = PotentialWeights::uniform(1.0);
let valid = w.is_valid();
crate::assert_with_log!(valid, "uniform valid", true, valid);
let eps = f64::EPSILON;
let all_eq = (w.w_tasks - w.w_obligation_age).abs() < eps
&& (w.w_obligation_age - w.w_draining_regions).abs() < eps
&& (w.w_draining_regions - w.w_deadline_pressure).abs() < eps;
crate::assert_with_log!(all_eq, "all equal", true, all_eq);
crate::test_complete!("weights_uniform");
}
#[test]
fn weights_obligation_focused() {
init_test("weights_obligation_focused");
let w = PotentialWeights::obligation_focused();
let valid = w.is_valid();
crate::assert_with_log!(valid, "obligation focused valid", true, valid);
let ob_dominant = w.w_obligation_age > w.w_tasks;
crate::assert_with_log!(
ob_dominant,
"obligations weighted higher",
true,
ob_dominant
);
crate::test_complete!("weights_obligation_focused");
}
#[test]
fn weights_deadline_focused() {
init_test("weights_deadline_focused");
let w = PotentialWeights::deadline_focused();
let valid = w.is_valid();
crate::assert_with_log!(valid, "deadline focused valid", true, valid);
let dl_dominant = w.w_deadline_pressure > w.w_tasks;
crate::assert_with_log!(dl_dominant, "deadlines weighted higher", true, dl_dominant);
crate::test_complete!("weights_deadline_focused");
}
#[test]
fn component_isolation_tasks_only() {
init_test("component_isolation_tasks_only");
let governor = LyapunovGovernor::new(PotentialWeights {
w_tasks: 1.0,
w_obligation_age: 0.0,
w_draining_regions: 0.0,
w_deadline_pressure: 0.0,
});
let snap = active_snapshot(5, 3, 1_000_000_000, 2);
let record = governor.compute_record(&snap);
let only_tasks = record.obligation_component.abs() < f64::EPSILON
&& record.region_component.abs() < f64::EPSILON
&& record.deadline_component.abs() < f64::EPSILON;
crate::assert_with_log!(only_tasks, "only task component", true, only_tasks);
let expected = 5.0;
let close = (record.total - expected).abs() < f64::EPSILON;
crate::assert_with_log!(close, "total = 5.0", true, close);
crate::test_complete!("component_isolation_tasks_only");
}
#[test]
fn governor_reuse_and_clear() {
init_test("governor_reuse_and_clear");
let mut governor = LyapunovGovernor::with_defaults();
governor.compute_potential(&active_snapshot(5, 3, 100_000_000, 1));
governor.compute_potential(&quiescent_snapshot());
let len = governor.history().len();
crate::assert_with_log!(len == 2, "history has 2 entries", 2, len);
governor.clear_history();
let len = governor.history().len();
crate::assert_with_log!(len == 0, "cleared", 0, len);
crate::test_complete!("governor_reuse_and_clear");
}
#[test]
#[allow(clippy::too_many_lines)]
fn experiment_cancel_drain_converges() {
init_test("experiment_cancel_drain_converges");
let mut governor = LyapunovGovernor::new(PotentialWeights::obligation_focused());
governor.compute_potential(&StateSnapshot {
time: Time::ZERO,
live_tasks: 5,
pending_obligations: 5,
obligation_age_sum_ns: 500_000_000, draining_regions: 1,
deadline_pressure: 0.0,
pending_send_permits: 5,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 5,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
});
governor.compute_potential(&StateSnapshot {
time: Time::from_nanos(100_000_000),
live_tasks: 4,
pending_obligations: 4,
obligation_age_sum_ns: 480_000_000,
draining_regions: 1,
deadline_pressure: 0.0,
pending_send_permits: 4,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 4,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
});
governor.compute_potential(&StateSnapshot {
time: Time::from_nanos(200_000_000),
live_tasks: 3,
pending_obligations: 3,
obligation_age_sum_ns: 360_000_000,
draining_regions: 1,
deadline_pressure: 0.0,
pending_send_permits: 3,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 3,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
});
governor.compute_potential(&StateSnapshot {
time: Time::from_nanos(300_000_000),
live_tasks: 2,
pending_obligations: 2,
obligation_age_sum_ns: 220_000_000,
draining_regions: 1,
deadline_pressure: 0.0,
pending_send_permits: 2,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 2,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
});
governor.compute_potential(&StateSnapshot {
time: Time::from_nanos(400_000_000),
live_tasks: 1,
pending_obligations: 1,
obligation_age_sum_ns: 80_000_000,
draining_regions: 1,
deadline_pressure: 0.0,
pending_send_permits: 1,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 1,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
});
governor.compute_potential(&StateSnapshot {
time: Time::from_nanos(500_000_000),
live_tasks: 0,
pending_obligations: 0,
obligation_age_sum_ns: 0,
draining_regions: 0,
deadline_pressure: 0.0,
pending_send_permits: 0,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
});
let verdict = governor.analyze_convergence();
let converged = verdict.converged();
crate::assert_with_log!(converged, "cancel drain converges", true, converged);
let mono = verdict.monotone;
crate::assert_with_log!(mono, "monotone decrease", true, mono);
let v_max = verdict.v_max;
let has_max = v_max > 0.0;
crate::assert_with_log!(has_max, "had nonzero peak", true, has_max);
for (i, record) in governor.history().iter().enumerate() {
tracing::info!("Step {i}: {record}");
}
crate::test_complete!("experiment_cancel_drain_converges");
}
#[test]
fn experiment_deadline_aware_drain() {
init_test("experiment_deadline_aware_drain");
let governor = LyapunovGovernor::new(PotentialWeights::deadline_focused());
let snap = StateSnapshot {
time: Time::from_nanos(900_000_000), live_tasks: 3,
pending_obligations: 2,
obligation_age_sum_ns: 200_000_000,
draining_regions: 1,
deadline_pressure: 8.5, pending_send_permits: 2,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
};
let suggestion = governor.suggest(&snap);
let is_deadlines = suggestion == SchedulingSuggestion::MeetDeadlines;
crate::assert_with_log!(
is_deadlines,
"deadline-focused governor meets deadlines",
true,
is_deadlines
);
let record = governor.compute_record(&snap);
let dl_dominant = record.deadline_component > record.obligation_component
&& record.deadline_component > record.region_component;
crate::assert_with_log!(
dl_dominant,
"deadline component dominates",
true,
dl_dominant
);
crate::test_complete!("experiment_deadline_aware_drain");
}
#[test]
fn display_impls() {
init_test("lyapunov_display_impls");
let snap = active_snapshot(3, 2, 100_000_000, 1);
let s = format!("{snap}");
let has_sigma = s.contains("Σ(");
crate::assert_with_log!(has_sigma, "snapshot display", true, has_sigma);
let governor = LyapunovGovernor::with_defaults();
let record = governor.compute_record(&snap);
let s = format!("{record}");
let has_v = s.contains("V=");
crate::assert_with_log!(has_v, "record display", true, has_v);
let suggestion = SchedulingSuggestion::DrainObligations;
let s = format!("{suggestion}");
let has_priority = s.contains("prioritize");
crate::assert_with_log!(has_priority, "suggestion display", true, has_priority);
let verdict = ConvergenceVerdict {
monotone: true,
reached_quiescence: true,
v_max: 10.0,
v_final: 0.0,
increase_count: 0,
max_increase: 0.0,
steps: 5,
};
let s = format!("{verdict}");
let has_converged = s.contains("Converged");
crate::assert_with_log!(has_converged, "verdict display", true, has_converged);
crate::test_complete!("lyapunov_display_impls");
}
async fn yield_once() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct YieldOnce {
yielded: bool,
}
impl Future for YieldOnce {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.yielded {
Poll::Ready(())
} else {
self.yielded = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
YieldOnce { yielded: false }.await;
}
fn run_cancel_drain_potential_trajectory(
seed: u64,
task_count: usize,
warmup_steps: usize,
) -> (LyapunovGovernor, bool) {
run_cancel_drain_with_weights(seed, task_count, warmup_steps, PotentialWeights::default())
}
fn run_cancel_drain_with_weights(
seed: u64,
task_count: usize,
warmup_steps: usize,
weights: PotentialWeights,
) -> (LyapunovGovernor, bool) {
use crate::lab::{LabConfig, LabRuntime};
use crate::types::CancelReason;
let mut runtime = LabRuntime::new(LabConfig::new(seed));
let region = runtime.state.create_root_region(Budget::unlimited());
for _ in 0..task_count {
let (task_id, _handle) = runtime
.state
.create_task(region, Budget::unlimited(), async {
for _ in 0..20 {
let Some(cx) = crate::cx::Cx::current() else {
return;
};
if cx.checkpoint().is_err() {
return;
}
yield_once().await;
}
})
.expect("create task");
runtime.scheduler.lock().schedule(task_id, 0);
}
for _ in 0..warmup_steps {
runtime.step_for_test();
}
let cancel_reason = CancelReason::shutdown();
let tasks_to_cancel = runtime.state.cancel_request(region, &cancel_reason, None);
{
let mut scheduler = runtime.scheduler.lock();
for (task_id, priority) in tasks_to_cancel {
scheduler.schedule_cancel(task_id, priority);
}
}
let mut governor = LyapunovGovernor::new(weights);
governor.compute_potential(&StateSnapshot::from_runtime_state(&runtime.state));
let max_drain_steps = 10_000_u64;
let mut drain_steps = 0_u64;
while !runtime.is_quiescent() && drain_steps < max_drain_steps {
runtime.step_for_test();
drain_steps = drain_steps.saturating_add(1);
governor.compute_potential(&StateSnapshot::from_runtime_state(&runtime.state));
}
(governor, runtime.is_quiescent())
}
#[test]
fn lab_cancel_drain_monotone_potential_decrease() {
init_test("lab_cancel_drain_monotone_potential_decrease");
let (governor, is_quiescent) = run_cancel_drain_potential_trajectory(0xBD25_0201, 8, 16);
crate::assert_with_log!(is_quiescent, "quiescent", true, is_quiescent);
let verdict = governor.analyze_convergence();
for (i, record) in governor.history().iter().enumerate() {
tracing::info!("Step {i}: {record}");
}
tracing::info!("{verdict}");
crate::assert_with_log!(verdict.monotone, "monotone", true, verdict.monotone);
crate::assert_with_log!(
verdict.reached_quiescence,
"V=0",
true,
verdict.reached_quiescence
);
crate::assert_with_log!(verdict.converged(), "converged", true, verdict.converged());
let had_activity = verdict.v_max > 0.0;
crate::assert_with_log!(had_activity, "peak V > 0", true, had_activity);
crate::test_complete!("lab_cancel_drain_monotone_potential_decrease");
}
#[test]
fn lab_cancel_drain_deterministic_potential_trajectory() {
init_test("lab_cancel_drain_deterministic_potential_trajectory");
let seed = 0xBD25_DEAD;
let (gov1, q1) = run_cancel_drain_potential_trajectory(seed, 8, 16);
let (gov2, q2) = run_cancel_drain_potential_trajectory(seed, 8, 16);
crate::assert_with_log!(q1 && q2, "both quiescent", true, q1 && q2);
let h1: Vec<f64> = gov1.history().iter().map(|r| r.total).collect();
let h2: Vec<f64> = gov2.history().iter().map(|r| r.total).collect();
crate::assert_with_log!(h1.len() == h2.len(), "same length", h1.len(), h2.len());
let all_match = h1
.iter()
.zip(h2.iter())
.all(|(a, b)| (a - b).abs() < f64::EPSILON);
crate::assert_with_log!(all_match, "trajectories match", true, all_match);
crate::test_complete!("lab_cancel_drain_deterministic_potential_trajectory");
}
#[test]
fn lab_quiescence_invariants_after_cancel_drain() {
init_test("lab_quiescence_invariants_after_cancel_drain");
let (governor, is_quiescent) = run_cancel_drain_potential_trajectory(0xBD25_CAFE, 12, 8);
crate::assert_with_log!(is_quiescent, "quiescent", true, is_quiescent);
let final_record = governor.history().last().expect("non-empty history");
let snap = &final_record.snapshot;
crate::assert_with_log!(snap.live_tasks == 0, "no live tasks", 0, snap.live_tasks);
crate::assert_with_log!(
snap.pending_obligations == 0,
"no obligations",
0,
snap.pending_obligations
);
crate::assert_with_log!(
snap.draining_regions == 0,
"no draining regions",
0,
snap.draining_regions
);
crate::assert_with_log!(
snap.is_quiescent(),
"snapshot quiescent",
true,
snap.is_quiescent()
);
crate::assert_with_log!(
snap.pending_send_permits == 0,
"no sp",
0,
snap.pending_send_permits
);
crate::assert_with_log!(snap.pending_acks == 0, "no ack", 0, snap.pending_acks);
crate::assert_with_log!(snap.pending_leases == 0, "no lease", 0, snap.pending_leases);
crate::assert_with_log!(snap.pending_io_ops == 0, "no io", 0, snap.pending_io_ops);
crate::assert_with_log!(
snap.cancel_requested_tasks == 0,
"no cancel_requested",
0,
snap.cancel_requested_tasks
);
crate::assert_with_log!(
snap.cancelling_tasks == 0,
"no cancelling",
0,
snap.cancelling_tasks
);
crate::assert_with_log!(
snap.finalizing_tasks == 0,
"no finalizing",
0,
snap.finalizing_tasks
);
let v_zero = final_record.total.abs() < f64::EPSILON;
crate::assert_with_log!(v_zero, "V = 0", true, v_zero);
crate::test_complete!("lab_quiescence_invariants_after_cancel_drain");
}
#[test]
fn lab_cancel_drain_with_many_tasks_converges() {
init_test("lab_cancel_drain_with_many_tasks_converges");
let (governor, is_quiescent) = run_cancel_drain_potential_trajectory(0xBD25_A1B0, 12, 24);
crate::assert_with_log!(is_quiescent, "quiescent", true, is_quiescent);
let verdict = governor.analyze_convergence();
for (i, record) in governor.history().iter().enumerate() {
tracing::info!("Step {i}: {record}");
}
tracing::info!("{verdict}");
crate::assert_with_log!(verdict.monotone, "monotone", true, verdict.monotone);
crate::assert_with_log!(verdict.converged(), "converged", true, verdict.converged());
crate::test_complete!("lab_cancel_drain_with_many_tasks_converges");
}
#[test]
fn lab_potential_decreases_across_weight_configurations() {
init_test("lab_potential_decreases_across_weight_configurations");
let weight_configs = [
("default", PotentialWeights::default()),
("uniform", PotentialWeights::uniform(1.0)),
("obligation_focused", PotentialWeights::obligation_focused()),
("deadline_focused", PotentialWeights::deadline_focused()),
];
for (label, weights) in &weight_configs {
let (governor, is_quiescent) =
run_cancel_drain_with_weights(0xBD25_0815, 6, 8, *weights);
crate::assert_with_log!(
is_quiescent,
format!("{label}: quiescent"),
true,
is_quiescent
);
let verdict = governor.analyze_convergence();
tracing::info!("Weights={label}: {verdict}");
crate::assert_with_log!(
verdict.monotone,
format!("{label}: monotone"),
true,
verdict.monotone
);
crate::assert_with_log!(
verdict.converged(),
format!("{label}: converged"),
true,
verdict.converged()
);
}
crate::test_complete!("lab_potential_decreases_across_weight_configurations");
}
fn run_cancel_drain_with_obligations(
seed: u64,
task_count: usize,
obligations_per_task: usize,
warmup_steps: usize,
weights: PotentialWeights,
) -> (LyapunovGovernor, bool, usize) {
use crate::lab::{LabConfig, LabRuntime};
use crate::record::ObligationKind;
use crate::types::CancelReason;
let mut runtime = LabRuntime::new(LabConfig::new(seed).panic_on_leak(false));
let region = runtime.state.create_root_region(Budget::unlimited());
let obligation_kinds = [
ObligationKind::SendPermit,
ObligationKind::Ack,
ObligationKind::Lease,
ObligationKind::IoOp,
];
let mut obligation_ids = Vec::new();
for t_idx in 0..task_count {
let (task_id, _handle) = runtime
.state
.create_task(region, Budget::unlimited(), async {
for _ in 0..1_000 {
let Some(cx) = crate::cx::Cx::current() else {
return;
};
if cx.checkpoint().is_err() {
return;
}
yield_once().await;
}
})
.expect("create task");
for o_idx in 0..obligations_per_task {
let kind = obligation_kinds[(t_idx + o_idx) % obligation_kinds.len()];
if let Ok(obl_id) = runtime.state.create_obligation(
kind,
task_id,
region,
Some(format!("test-obl-t{t_idx}-o{o_idx}")),
) {
obligation_ids.push(obl_id);
}
}
runtime.scheduler.lock().schedule(task_id, 0);
}
for _ in 0..warmup_steps {
runtime.step_for_test();
}
runtime.advance_time(1_000_000_000);
let mut governor = LyapunovGovernor::new(weights);
governor.compute_potential(&StateSnapshot::from_runtime_state(&runtime.state));
let cancel_reason = CancelReason::shutdown();
let tasks_to_cancel = runtime.state.cancel_request(region, &cancel_reason, None);
{
let mut scheduler = runtime.scheduler.lock();
for (task_id, priority) in tasks_to_cancel {
scheduler.schedule_cancel(task_id, priority);
}
}
for obl_id in &obligation_ids {
let _ = runtime
.state
.abort_obligation(*obl_id, crate::record::ObligationAbortReason::Cancel);
}
governor.compute_potential(&StateSnapshot::from_runtime_state(&runtime.state));
let mut drain_steps = 0_u64;
while !runtime.is_quiescent() && drain_steps < 10_000 {
runtime.step_for_test();
drain_steps = drain_steps.saturating_add(1);
governor.compute_potential(&StateSnapshot::from_runtime_state(&runtime.state));
}
let violations = runtime.check_invariants();
let leak_count = violations
.iter()
.filter(|v| matches!(v, InvariantViolation::ObligationLeak { .. }))
.count();
(governor, runtime.is_quiescent(), leak_count)
}
#[test]
fn lab_cancel_drain_with_obligations_monotone_decrease() {
init_test("lab_cancel_drain_with_obligations_monotone_decrease");
let (governor, is_quiescent, leak_count) =
run_cancel_drain_with_obligations(0xBD25_0B01, 8, 2, 16, PotentialWeights::default());
crate::assert_with_log!(is_quiescent, "quiescent", true, is_quiescent);
crate::assert_with_log!(leak_count == 0, "no obligation leaks", 0usize, leak_count);
let verdict = governor.analyze_convergence();
for (i, record) in governor.history().iter().enumerate() {
tracing::info!("Step {i}: {record}");
}
tracing::info!("{verdict}");
crate::assert_with_log!(verdict.monotone, "monotone", true, verdict.monotone);
crate::assert_with_log!(
verdict.reached_quiescence,
"V=0",
true,
verdict.reached_quiescence
);
crate::assert_with_log!(verdict.converged(), "converged", true, verdict.converged());
let first = &governor.history()[0];
crate::assert_with_log!(
first.snapshot.pending_obligations > 0,
"initial pending obligations > 0",
true,
first.snapshot.pending_obligations > 0
);
crate::test_complete!("lab_cancel_drain_with_obligations_monotone_decrease");
}
#[test]
fn lab_obligation_leak_oracle_clean_after_drain() {
init_test("lab_obligation_leak_oracle_clean_after_drain");
let (governor, is_quiescent, leak_count) =
run_cancel_drain_with_obligations(0xBD25_1EAC, 10, 3, 8, PotentialWeights::default());
crate::assert_with_log!(is_quiescent, "quiescent", true, is_quiescent);
crate::assert_with_log!(leak_count == 0, "zero obligation leaks", 0usize, leak_count);
let final_record = governor.history().last().expect("non-empty history");
let snap = &final_record.snapshot;
crate::assert_with_log!(
snap.pending_obligations == 0,
"no pending",
0,
snap.pending_obligations
);
crate::assert_with_log!(
snap.pending_send_permits == 0,
"no sp",
0,
snap.pending_send_permits
);
crate::assert_with_log!(snap.pending_acks == 0, "no acks", 0, snap.pending_acks);
crate::assert_with_log!(
snap.pending_leases == 0,
"no leases",
0,
snap.pending_leases
);
crate::assert_with_log!(
snap.pending_io_ops == 0,
"no io_ops",
0,
snap.pending_io_ops
);
crate::test_complete!("lab_obligation_leak_oracle_clean_after_drain");
}
#[test]
fn lab_cancel_drain_with_obligations_deterministic() {
init_test("lab_cancel_drain_with_obligations_deterministic");
let seed = 0xBD25_DE70;
let w = PotentialWeights::default();
let (gov1, q1, l1) = run_cancel_drain_with_obligations(seed, 6, 2, 12, w);
let (gov2, q2, l2) = run_cancel_drain_with_obligations(seed, 6, 2, 12, w);
crate::assert_with_log!(q1 && q2, "both quiescent", true, q1 && q2);
crate::assert_with_log!(l1 == 0 && l2 == 0, "no leaks", true, l1 == 0 && l2 == 0);
let h1: Vec<f64> = gov1.history().iter().map(|r| r.total).collect();
let h2: Vec<f64> = gov2.history().iter().map(|r| r.total).collect();
crate::assert_with_log!(h1.len() == h2.len(), "same length", h1.len(), h2.len());
let all_match = h1
.iter()
.zip(h2.iter())
.all(|(a, b)| (a - b).abs() < f64::EPSILON);
crate::assert_with_log!(all_match, "trajectories match", true, all_match);
crate::test_complete!("lab_cancel_drain_with_obligations_deterministic");
}
#[test]
fn lab_obligation_focused_weights_converge_with_obligations() {
init_test("lab_obligation_focused_weights_converge_with_obligations");
let weights = PotentialWeights::obligation_focused();
let (governor, is_quiescent, leak_count) =
run_cancel_drain_with_obligations(0xBD25_0B1F, 8, 3, 8, weights);
crate::assert_with_log!(is_quiescent, "quiescent", true, is_quiescent);
crate::assert_with_log!(leak_count == 0, "no leaks", 0usize, leak_count);
let verdict = governor.analyze_convergence();
tracing::info!("{verdict}");
crate::assert_with_log!(verdict.monotone, "monotone", true, verdict.monotone);
crate::assert_with_log!(verdict.converged(), "converged", true, verdict.converged());
let first = &governor.history()[0];
let obl_fraction = if first.total > 0.0 {
first.obligation_component / first.total
} else {
0.0
};
tracing::info!(
"Obligation fraction of initial V: {:.2}% ({:.4} / {:.4})",
obl_fraction * 100.0,
first.obligation_component,
first.total,
);
crate::test_complete!("lab_obligation_focused_weights_converge_with_obligations");
}
#[test]
fn lab_quiescence_snapshot_zero_with_obligations() {
init_test("lab_quiescence_snapshot_zero_with_obligations");
let (governor, is_quiescent, leak_count) =
run_cancel_drain_with_obligations(0xBD25_0520, 12, 2, 10, PotentialWeights::default());
crate::assert_with_log!(is_quiescent, "quiescent", true, is_quiescent);
crate::assert_with_log!(leak_count == 0, "no leaks", 0usize, leak_count);
let final_record = governor.history().last().expect("non-empty history");
let snap = &final_record.snapshot;
crate::assert_with_log!(snap.live_tasks == 0, "no live tasks", 0, snap.live_tasks);
crate::assert_with_log!(
snap.pending_obligations == 0,
"no obl",
0,
snap.pending_obligations
);
crate::assert_with_log!(
snap.draining_regions == 0,
"no draining",
0,
snap.draining_regions
);
crate::assert_with_log!(
snap.obligation_age_sum_ns == 0,
"age zero",
0u64,
snap.obligation_age_sum_ns
);
crate::assert_with_log!(
snap.cancel_requested_tasks == 0,
"no cr",
0,
snap.cancel_requested_tasks
);
crate::assert_with_log!(
snap.cancelling_tasks == 0,
"no cancelling",
0,
snap.cancelling_tasks
);
crate::assert_with_log!(
snap.finalizing_tasks == 0,
"no finalizing",
0,
snap.finalizing_tasks
);
crate::assert_with_log!(
snap.is_quiescent(),
"quiescent snap",
true,
snap.is_quiescent()
);
let v_zero = final_record.total.abs() < f64::EPSILON;
crate::assert_with_log!(v_zero, "V = 0", true, v_zero);
crate::test_complete!("lab_quiescence_snapshot_zero_with_obligations");
}
const GOVERNOR_STATE_SNAPSHOT_CONTRACT_PATH_ENV: &str =
"ASUPERSYNC_GOVERNOR_STATE_SNAPSHOT_CONTRACT_PATH";
const GOVERNOR_STATE_SNAPSHOT_SCENARIO_ENV: &str =
"ASUPERSYNC_GOVERNOR_STATE_SNAPSHOT_SCENARIO";
const GOVERNOR_STATE_SNAPSHOT_REPORT_PATH_ENV: &str =
"ASUPERSYNC_GOVERNOR_STATE_SNAPSHOT_REPORT_PATH";
const GOVERNOR_STATE_SNAPSHOT_REPORT_SCHEMA_VERSION: &str = "governor-state-snapshot-report-v1";
const GOVERNOR_STATE_SNAPSHOT_PROJECTION_SCHEMA_VERSION: &str =
"governor-state-snapshot-projection-v1";
const GOVERNOR_STATE_SNAPSHOT_BASELINE_SCENARIO_ID: &str =
"AA-GOVERNOR-SNAPSHOT-EQUIVALENCE-BASELINE";
const GOVERNOR_STATE_SNAPSHOT_MANUAL_FALLBACK_SCENARIO_ID: &str =
"AA-GOVERNOR-SNAPSHOT-EQUIVALENCE-MANUAL-FALLBACK";
#[derive(Debug, Clone, Deserialize)]
struct GovernorStateSnapshotSmokeContract {
smoke_scenarios: Vec<GovernorStateSnapshotScenario>,
}
#[derive(Debug, Clone, Deserialize)]
struct GovernorStateSnapshotScenario {
scenario_id: String,
description: String,
fixture: GovernorStateSnapshotFixture,
expected_report_projection: Value,
}
#[derive(Debug, Clone, Deserialize)]
struct GovernorStateSnapshotFixture {
region_count: usize,
tasks_per_region: usize,
read_biased_enabled: bool,
manual_invalidation_step: Option<usize>,
}
#[derive(Debug)]
struct GovernorStateScenarioState {
state: RuntimeState,
child_regions: Vec<crate::types::RegionId>,
task_ids: Vec<crate::types::TaskId>,
obligation_ids: Vec<crate::types::ObligationId>,
}
fn default_governor_state_snapshot_scenarios() -> Vec<GovernorStateSnapshotScenario> {
let changed_component_union = json!([
"cancel_requested_tasks",
"deadline_pressure",
"live_tasks",
"obligation_age_sum_ns",
"pending_obligations",
"pending_send_permits",
"time"
]);
vec![
GovernorStateSnapshotScenario {
scenario_id: GOVERNOR_STATE_SNAPSHOT_BASELINE_SCENARIO_ID.to_string(),
description: "Drive a deterministic cancel storm with the conservative region-scan path pinned, then prove the current O(1) summary counters remain equivalent to an authoritative full scan.".to_string(),
fixture: GovernorStateSnapshotFixture {
region_count: 4,
tasks_per_region: 2,
read_biased_enabled: false,
manual_invalidation_step: None,
},
expected_report_projection: json!({
"schema_version": GOVERNOR_STATE_SNAPSHOT_PROJECTION_SCHEMA_VERSION,
"scenario_id": GOVERNOR_STATE_SNAPSHOT_BASELINE_SCENARIO_ID,
"read_biased_enabled": false,
"step_count": 5,
"full_region_scan_steps": 5,
"cached_region_count_steps": 0,
"manual_invalidation_steps": 0,
"all_steps_equivalent": true,
"repeated_run_hash_match": true,
"changed_component_union": changed_component_union,
"fallback_reason_counts": {
"disabled_exact_baseline": 5
}
}),
},
GovernorStateSnapshotScenario {
scenario_id: GOVERNOR_STATE_SNAPSHOT_MANUAL_FALLBACK_SCENARIO_ID.to_string(),
description: "Drive the same cancel storm with the read-biased region counter enabled, then force exactly one manual invalidation to prove cached snapshots and authoritative fallback remain equivalent.".to_string(),
fixture: GovernorStateSnapshotFixture {
region_count: 4,
tasks_per_region: 2,
read_biased_enabled: true,
manual_invalidation_step: Some(4),
},
expected_report_projection: json!({
"schema_version": GOVERNOR_STATE_SNAPSHOT_PROJECTION_SCHEMA_VERSION,
"scenario_id": GOVERNOR_STATE_SNAPSHOT_MANUAL_FALLBACK_SCENARIO_ID,
"read_biased_enabled": true,
"step_count": 5,
"full_region_scan_steps": 1,
"cached_region_count_steps": 4,
"manual_invalidation_steps": 1,
"all_steps_equivalent": true,
"repeated_run_hash_match": true,
"changed_component_union": changed_component_union,
"fallback_reason_counts": {
"cached_region_count": 4,
"manual_invalidation": 1
}
}),
},
]
}
fn load_governor_state_snapshot_scenarios() -> Vec<GovernorStateSnapshotScenario> {
let Some(contract_path) = std::env::var(GOVERNOR_STATE_SNAPSHOT_CONTRACT_PATH_ENV).ok()
else {
return default_governor_state_snapshot_scenarios();
};
let contract: GovernorStateSnapshotSmokeContract = serde_json::from_str(
&fs::read_to_string(&contract_path)
.expect("read governor state snapshot smoke contract"),
)
.expect("parse governor state snapshot smoke contract");
contract.smoke_scenarios
}
fn selected_governor_state_snapshot_scenario() -> String {
std::env::var(GOVERNOR_STATE_SNAPSHOT_SCENARIO_ENV)
.unwrap_or_else(|_| GOVERNOR_STATE_SNAPSHOT_BASELINE_SCENARIO_ID.to_string())
}
fn maybe_write_governor_state_snapshot_report(path: &str, report: &Value) {
let report_path = Path::new(path);
if let Some(parent) = report_path.parent() {
fs::create_dir_all(parent).expect("create governor snapshot report directory");
}
fs::write(
report_path,
serde_json::to_string_pretty(report).expect("serialize governor snapshot report"),
)
.expect("write governor snapshot report");
}
fn round4(value: f64) -> f64 {
(value * 10_000.0).round() / 10_000.0
}
fn percentile_slice_u64(samples: &[u64], numerator: usize, denominator: usize) -> u64 {
if samples.is_empty() {
return 0;
}
let mut sorted = samples.to_vec();
sorted.sort_unstable();
let index = ((sorted.len() - 1) * numerator) / denominator;
sorted[index]
}
fn mean_u64(samples: &[u64]) -> f64 {
if samples.is_empty() {
return 0.0;
}
round4(samples.iter().map(|sample| *sample as f64).sum::<f64>() / samples.len() as f64)
}
fn latency_summary(samples: &[u64]) -> Value {
json!({
"sample_count": samples.len(),
"min_ns": samples.iter().copied().min().unwrap_or(0),
"p50_ns": percentile_slice_u64(samples, 50, 100),
"p95_ns": percentile_slice_u64(samples, 95, 100),
"p99_ns": percentile_slice_u64(samples, 99, 100),
"max_ns": samples.iter().copied().max().unwrap_or(0),
"mean_ns": mean_u64(samples),
})
}
fn hash_json_value(value: &Value) -> u64 {
let mut hasher = DefaultHasher::new();
serde_json::to_string(value)
.expect("serialize stable governor snapshot hash input")
.hash(&mut hasher);
hasher.finish()
}
fn changed_components(
previous: Option<&StateSnapshot>,
current: &StateSnapshot,
) -> Vec<String> {
let Some(previous) = previous else {
return vec!["initial_capture".to_string()];
};
let mut changed = Vec::new();
if previous.time != current.time {
changed.push("time".to_string());
}
if previous.live_tasks != current.live_tasks {
changed.push("live_tasks".to_string());
}
if previous.pending_obligations != current.pending_obligations {
changed.push("pending_obligations".to_string());
}
if previous.obligation_age_sum_ns != current.obligation_age_sum_ns {
changed.push("obligation_age_sum_ns".to_string());
}
if previous.draining_regions != current.draining_regions {
changed.push("draining_regions".to_string());
}
if (previous.deadline_pressure - current.deadline_pressure).abs() > 1e-9 {
changed.push("deadline_pressure".to_string());
}
if previous.pending_send_permits != current.pending_send_permits {
changed.push("pending_send_permits".to_string());
}
if previous.pending_acks != current.pending_acks {
changed.push("pending_acks".to_string());
}
if previous.pending_leases != current.pending_leases {
changed.push("pending_leases".to_string());
}
if previous.pending_io_ops != current.pending_io_ops {
changed.push("pending_io_ops".to_string());
}
if previous.cancel_requested_tasks != current.cancel_requested_tasks {
changed.push("cancel_requested_tasks".to_string());
}
if previous.cancelling_tasks != current.cancelling_tasks {
changed.push("cancelling_tasks".to_string());
}
if previous.finalizing_tasks != current.finalizing_tasks {
changed.push("finalizing_tasks".to_string());
}
if previous.ready_queue_depth != current.ready_queue_depth {
changed.push("ready_queue_depth".to_string());
}
changed
}
fn authoritative_state_snapshot(state: &RuntimeState) -> StateSnapshot {
const DEADLINE_PRESSURE_D0_NS: f64 = 1_000_000_000.0;
let now = state.now;
let mut live_tasks = 0u32;
let mut cancel_requested_tasks = 0u32;
let mut cancelling_tasks = 0u32;
let mut finalizing_tasks = 0u32;
let mut tasks_with_deadline = 0u64;
let mut deadline_sum_ns = 0u128;
for (_, record) in state.tasks.iter() {
let phase = record.phase();
let is_live = !phase.is_terminal();
if is_live {
live_tasks = live_tasks.saturating_add(1);
}
match phase {
TaskPhase::CancelRequested => {
cancel_requested_tasks = cancel_requested_tasks.saturating_add(1);
}
TaskPhase::Cancelling => {
cancelling_tasks = cancelling_tasks.saturating_add(1);
}
TaskPhase::Finalizing => {
finalizing_tasks = finalizing_tasks.saturating_add(1);
}
_ => {}
}
if is_live {
if let Some(deadline) = record.deadline {
tasks_with_deadline = tasks_with_deadline.saturating_add(1);
deadline_sum_ns =
deadline_sum_ns.saturating_add(u128::from(deadline.as_nanos()));
}
}
}
let deadline_pressure = if tasks_with_deadline > 0 {
let now_ns = u128::from(now.as_nanos());
#[allow(clippy::cast_precision_loss)]
let count = tasks_with_deadline as f64;
let sum_d = deadline_sum_ns as f64;
let now_f = now_ns as f64;
let p = count - (sum_d / DEADLINE_PRESSURE_D0_NS)
+ (count * now_f / DEADLINE_PRESSURE_D0_NS);
p.max(0.0)
} else {
0.0
};
let mut pending_obligations = 0u32;
let mut obligation_age_sum_ns = 0u64;
let mut pending_send_permits = 0u32;
let mut pending_acks = 0u32;
let mut pending_leases = 0u32;
let mut pending_io_ops = 0u32;
for (_, record) in state.obligations.iter() {
if !record.is_pending() {
continue;
}
pending_obligations = pending_obligations.saturating_add(1);
let age_ns = now.as_nanos().saturating_sub(record.reserved_at.as_nanos());
obligation_age_sum_ns = obligation_age_sum_ns.saturating_add(age_ns);
match record.kind {
ObligationKind::SendPermit => {
pending_send_permits = pending_send_permits.saturating_add(1);
}
ObligationKind::Ack => {
pending_acks = pending_acks.saturating_add(1);
}
ObligationKind::Lease | ObligationKind::SemaphorePermit => {
pending_leases = pending_leases.saturating_add(1);
}
ObligationKind::IoOp => {
pending_io_ops = pending_io_ops.saturating_add(1);
}
}
}
let mut draining_regions = 0u32;
for (_, region) in state.regions.iter() {
if matches!(
region.state(),
RegionState::Draining | RegionState::Finalizing
) {
draining_regions = draining_regions.saturating_add(1);
}
}
StateSnapshot {
time: now,
live_tasks,
pending_obligations,
obligation_age_sum_ns,
draining_regions,
deadline_pressure,
pending_send_permits,
pending_acks,
pending_leases,
pending_io_ops,
cancel_requested_tasks,
cancelling_tasks,
finalizing_tasks,
ready_queue_depth: 0,
}
}
fn snapshots_equivalent(expected: &StateSnapshot, actual: &StateSnapshot) -> bool {
expected.time == actual.time
&& expected.live_tasks == actual.live_tasks
&& expected.pending_obligations == actual.pending_obligations
&& expected.obligation_age_sum_ns == actual.obligation_age_sum_ns
&& expected.draining_regions == actual.draining_regions
&& (expected.deadline_pressure - actual.deadline_pressure).abs() < 1e-9
&& expected.pending_send_permits == actual.pending_send_permits
&& expected.pending_acks == actual.pending_acks
&& expected.pending_leases == actual.pending_leases
&& expected.pending_io_ops == actual.pending_io_ops
&& expected.cancel_requested_tasks == actual.cancel_requested_tasks
&& expected.cancelling_tasks == actual.cancelling_tasks
&& expected.finalizing_tasks == actual.finalizing_tasks
&& expected.ready_queue_depth == actual.ready_queue_depth
}
fn summary_snapshot_bytes_copied_estimate() -> usize {
size_of::<StateSnapshot>()
}
fn authoritative_scan_bytes_estimate(state: &RuntimeState) -> usize {
state.tasks.iter().count() * size_of::<TaskRecord>()
+ state.obligations.iter().count() * size_of::<ObligationRecord>()
+ state.regions.iter().count() * size_of::<RegionRecord>()
+ size_of::<StateSnapshot>()
}
fn governor_snapshot_fallback_reason(
state: &RuntimeState,
before: ReadBiasedRegionSnapshotStats,
after: ReadBiasedRegionSnapshotStats,
) -> &'static str {
if !state.read_biased_region_snapshot_enabled() {
return "disabled_exact_baseline";
}
if after.fallback_scans > before.fallback_scans {
if after.invalidations > before.invalidations {
return "manual_invalidation";
}
if after.write_heavy_fallbacks > before.write_heavy_fallbacks {
return "write_heavy_threshold_exceeded";
}
return "authoritative_region_scan";
}
"cached_region_count"
}
fn build_governor_state_snapshot_state(
fixture: &GovernorStateSnapshotFixture,
) -> GovernorStateScenarioState {
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::unlimited());
state.set_read_biased_region_snapshot(fixture.read_biased_enabled);
let mut child_regions = Vec::with_capacity(fixture.region_count);
for _ in 0..fixture.region_count {
let child = state
.create_child_region(root, Budget::unlimited())
.expect("child region");
let _grandchild = state
.create_child_region(child, Budget::unlimited())
.expect("grandchild region");
child_regions.push(child);
}
let mut task_ids = Vec::with_capacity(fixture.region_count * fixture.tasks_per_region);
let mut obligation_ids =
Vec::with_capacity(fixture.region_count * fixture.tasks_per_region);
for (region_index, region_id) in child_regions.iter().enumerate() {
for task_index in 0..fixture.tasks_per_region {
let deadline_ns = 1_000_000_000
+ (region_index * fixture.tasks_per_region + task_index) as u64 * 50_000_000;
let budget = Budget::INFINITE.with_deadline(Time::from_nanos(deadline_ns));
let (task_id, _handle) = state
.create_task(*region_id, budget, async {})
.expect("create task");
let obligation_id = state
.create_obligation(ObligationKind::SendPermit, task_id, *region_id, None)
.expect("create obligation");
task_ids.push(task_id);
obligation_ids.push(obligation_id);
}
}
state.now = Time::from_nanos(250_000_000);
GovernorStateScenarioState {
state,
child_regions,
task_ids,
obligation_ids,
}
}
fn scenario_step_plan() -> [(u64, usize, usize, usize); 5] {
[
(0, 0, 0, 0),
(50_000_000, 2, 0, 0),
(50_000_000, 0, 3, 0),
(50_000_000, 0, 0, 2),
(50_000_000, 0, 0, 0),
]
}
fn bump_time(state: &mut RuntimeState, delta_ns: u64) {
state.now = Time::from_nanos(state.now.as_nanos().saturating_add(delta_ns));
}
fn execute_governor_state_snapshot_scenario_once(
scenario: &GovernorStateSnapshotScenario,
) -> (Value, u64) {
let mut scenario_state = build_governor_state_snapshot_state(&scenario.fixture);
let mut previous_summary = None;
let mut step_reports = Vec::new();
let mut summary_latencies = Vec::new();
let mut authoritative_latencies = Vec::new();
let mut fallback_reason_counts = BTreeMap::<String, u64>::new();
let mut changed_component_union = BTreeSet::<String>::new();
let step_plan = scenario_step_plan();
for (step_index, (advance_ns, cancel_regions, commit_obligations, complete_tasks)) in
step_plan.into_iter().enumerate()
{
if advance_ns > 0 {
bump_time(&mut scenario_state.state, advance_ns);
}
if cancel_regions > 0 {
for region_id in scenario_state.child_regions.iter().take(cancel_regions) {
let _ = scenario_state.state.cancel_request(
*region_id,
&crate::types::CancelReason::shutdown(),
None,
);
}
}
if commit_obligations > 0 {
for obligation_id in scenario_state
.obligation_ids
.iter()
.take(commit_obligations)
.copied()
{
scenario_state
.state
.commit_obligation(obligation_id)
.expect("commit obligation");
}
}
if complete_tasks > 0 {
for task_id in scenario_state.task_ids.iter().take(complete_tasks).copied() {
let _ = scenario_state
.state
.complete_task(task_id, crate::types::Outcome::Ok(()));
}
}
let before_stats = scenario_state.state.read_biased_region_snapshot_stats();
if scenario.fixture.manual_invalidation_step == Some(step_index) {
scenario_state
.state
.invalidate_read_biased_region_snapshot_for_testing();
}
let summary_started = Instant::now();
let summary_snapshot = StateSnapshot::from_runtime_state(&scenario_state.state);
let summary_latency_ns = summary_started
.elapsed()
.as_nanos()
.min(u128::from(u64::MAX)) as u64;
let after_stats = scenario_state.state.read_biased_region_snapshot_stats();
let authoritative_started = Instant::now(); let authoritative_snapshot = authoritative_state_snapshot(&scenario_state.state);
let authoritative_latency_ns = authoritative_started
.elapsed()
.as_nanos()
.min(u128::from(u64::MAX)) as u64;
let equivalent = snapshots_equivalent(&summary_snapshot, &authoritative_snapshot);
let changed = changed_components(previous_summary.as_ref(), &summary_snapshot);
for component in changed
.iter()
.filter(|component| component.as_str() != "initial_capture")
{
changed_component_union.insert(component.clone());
}
let fallback_reason =
governor_snapshot_fallback_reason(&scenario_state.state, before_stats, after_stats);
*fallback_reason_counts
.entry(fallback_reason.to_string())
.or_insert(0) += 1;
summary_latencies.push(summary_latency_ns);
authoritative_latencies.push(authoritative_latency_ns);
step_reports.push(json!({
"step_index": step_index,
"changed_components": changed,
"fallback_reason": fallback_reason,
"summary_snapshot": {
"time_ns": summary_snapshot.time.as_nanos(),
"live_tasks": summary_snapshot.live_tasks,
"pending_obligations": summary_snapshot.pending_obligations,
"obligation_age_sum_ns": summary_snapshot.obligation_age_sum_ns,
"draining_regions": summary_snapshot.draining_regions,
"deadline_pressure": round4(summary_snapshot.deadline_pressure),
"pending_send_permits": summary_snapshot.pending_send_permits,
"cancel_requested_tasks": summary_snapshot.cancel_requested_tasks,
},
"authoritative_snapshot": {
"time_ns": authoritative_snapshot.time.as_nanos(),
"live_tasks": authoritative_snapshot.live_tasks,
"pending_obligations": authoritative_snapshot.pending_obligations,
"obligation_age_sum_ns": authoritative_snapshot.obligation_age_sum_ns,
"draining_regions": authoritative_snapshot.draining_regions,
"deadline_pressure": round4(authoritative_snapshot.deadline_pressure),
"pending_send_permits": authoritative_snapshot.pending_send_permits,
"cancel_requested_tasks": authoritative_snapshot.cancel_requested_tasks,
},
"equivalent": equivalent,
"summary_latency_ns": summary_latency_ns,
"authoritative_latency_ns": authoritative_latency_ns,
"summary_bytes_copied_estimate": summary_snapshot_bytes_copied_estimate(),
"authoritative_scan_bytes_estimate": authoritative_scan_bytes_estimate(&scenario_state.state),
}));
previous_summary = Some(summary_snapshot);
}
let full_region_scan_steps = fallback_reason_counts
.iter()
.filter(|(reason, _)| reason.as_str() != "cached_region_count")
.map(|(_, count)| *count)
.sum::<u64>();
let cached_region_count_steps = *fallback_reason_counts
.get("cached_region_count")
.unwrap_or(&0);
let manual_invalidation_steps = *fallback_reason_counts
.get("manual_invalidation")
.unwrap_or(&0);
let step_count = step_reports.len() as u64;
let all_steps_equivalent = step_reports
.iter()
.all(|step| step["equivalent"].as_bool() == Some(true));
let changed_component_union_json: Vec<Value> = changed_component_union
.into_iter()
.map(Value::String)
.collect();
let projection_without_repeat = json!({
"schema_version": GOVERNOR_STATE_SNAPSHOT_PROJECTION_SCHEMA_VERSION,
"scenario_id": scenario.scenario_id,
"read_biased_enabled": scenario.fixture.read_biased_enabled,
"step_count": step_count,
"full_region_scan_steps": full_region_scan_steps,
"cached_region_count_steps": cached_region_count_steps,
"manual_invalidation_steps": manual_invalidation_steps,
"all_steps_equivalent": all_steps_equivalent,
"changed_component_union": changed_component_union_json,
"fallback_reason_counts": fallback_reason_counts,
});
let stable_hash = hash_json_value(&json!({
"projection": projection_without_repeat,
"steps": step_reports.iter().map(|step| {
json!({
"step_index": step["step_index"],
"changed_components": step["changed_components"],
"fallback_reason": step["fallback_reason"],
"summary_snapshot": step["summary_snapshot"],
"authoritative_snapshot": step["authoritative_snapshot"],
"equivalent": step["equivalent"],
})
}).collect::<Vec<_>>(),
}));
let report = json!({
"schema_version": GOVERNOR_STATE_SNAPSHOT_REPORT_SCHEMA_VERSION,
"scenario_id": scenario.scenario_id,
"description": scenario.description,
"fixture": {
"region_count": scenario.fixture.region_count,
"tasks_per_region": scenario.fixture.tasks_per_region,
"read_biased_enabled": scenario.fixture.read_biased_enabled,
"manual_invalidation_step": scenario.fixture.manual_invalidation_step,
},
"summary_path": {
"latency_summary": latency_summary(&summary_latencies),
"bytes_copied_estimate": summary_snapshot_bytes_copied_estimate(),
},
"authoritative_full_scan": {
"latency_summary": latency_summary(&authoritative_latencies),
"bytes_copied_estimate": step_reports
.last()
.and_then(|step| step["authoritative_scan_bytes_estimate"].as_u64())
.unwrap_or(0),
},
"equivalence_verdict": {
"all_steps_equivalent": all_steps_equivalent,
"repeated_run_hash": stable_hash,
},
"step_reports": step_reports,
"report_projection": projection_without_repeat,
});
(report, stable_hash)
}
fn run_governor_state_snapshot_scenario(scenario: &GovernorStateSnapshotScenario) -> Value {
let (mut report, first_hash) = execute_governor_state_snapshot_scenario_once(scenario);
let (_, second_hash) = execute_governor_state_snapshot_scenario_once(scenario);
let repeated_run_hash_match = first_hash == second_hash;
report["equivalence_verdict"]["repeated_run_hash_match"] =
Value::Bool(repeated_run_hash_match);
let mut projection = report["report_projection"]
.as_object()
.expect("projection object")
.clone();
projection.insert(
"repeated_run_hash_match".to_string(),
Value::Bool(repeated_run_hash_match),
);
report["report_projection"] = Value::Object(projection);
report
}
#[test]
fn governor_state_snapshot_smoke_contract_emits_report() {
init_test("governor_state_snapshot_smoke_contract_emits_report");
let selected_scenario = selected_governor_state_snapshot_scenario();
let mut selected_report = None;
for scenario in load_governor_state_snapshot_scenarios() {
let report = run_governor_state_snapshot_scenario(&scenario);
let actual_projection = report["report_projection"].clone();
let assertion_actual = json!({
"projection": actual_projection,
"report": report,
});
crate::assert_with_log!(
actual_projection == scenario.expected_report_projection,
"governor snapshot smoke projection should remain stable",
scenario.expected_report_projection.to_string(),
assertion_actual.to_string()
);
if scenario.scenario_id == selected_scenario {
selected_report = Some(report);
}
}
if let Ok(report_path) = std::env::var(GOVERNOR_STATE_SNAPSHOT_REPORT_PATH_ENV) {
let report =
selected_report.expect("selected governor snapshot scenario should emit report");
maybe_write_governor_state_snapshot_report(&report_path, &report);
println!("governor_state_snapshot_report_path={report_path}");
println!("GOVERNOR_STATE_SNAPSHOT_REPORT_JSON_BEGIN");
println!(
"{}",
serde_json::to_string(&report).expect("serialize compact governor snapshot report")
);
println!("GOVERNOR_STATE_SNAPSHOT_REPORT_JSON_END");
}
crate::test_complete!("governor_state_snapshot_smoke_contract_emits_report");
}
#[test]
fn potential_weights_debug_clone_copy_default() {
let w = PotentialWeights::default();
let dbg = format!("{w:?}");
assert!(dbg.contains("PotentialWeights"));
let w2 = w;
assert!((w2.w_tasks - 1.0).abs() < f64::EPSILON);
let w3 = w;
assert!((w3.w_obligation_age - 5.0).abs() < f64::EPSILON);
}
#[test]
fn scheduling_suggestion_debug_clone_copy_eq() {
let s = SchedulingSuggestion::DrainObligations;
let dbg = format!("{s:?}");
assert!(dbg.contains("DrainObligations"));
let s2 = s;
assert_eq!(s, s2);
let s3 = s;
assert_eq!(s, s3);
assert_ne!(
SchedulingSuggestion::DrainObligations,
SchedulingSuggestion::MeetDeadlines
);
}
#[test]
fn potential_record_debug_clone() {
let snap = StateSnapshot {
time: Time::ZERO,
live_tasks: 0,
pending_obligations: 0,
obligation_age_sum_ns: 0,
draining_regions: 0,
deadline_pressure: 0.0,
pending_send_permits: 0,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
};
let rec = PotentialRecord {
snapshot: snap,
total: 0.0,
task_component: 0.0,
obligation_component: 0.0,
region_component: 0.0,
deadline_component: 0.0,
};
let dbg = format!("{rec:?}");
assert!(dbg.contains("PotentialRecord"));
let rec2 = rec;
assert!(rec2.is_zero());
}
}