use crate::types::Time;
use std::fmt;
#[derive(Debug, Clone, Copy)]
pub struct PotentialWeights {
pub w_tasks: f64,
pub w_obligation_age: f64,
pub w_draining_regions: f64,
pub w_deadline_pressure: f64,
}
impl PotentialWeights {
#[must_use]
pub const fn uniform(w: f64) -> Self {
Self {
w_tasks: w,
w_obligation_age: w,
w_draining_regions: w,
w_deadline_pressure: w,
}
}
#[must_use]
pub const fn obligation_focused() -> Self {
Self {
w_tasks: 1.0,
w_obligation_age: 10.0,
w_draining_regions: 5.0,
w_deadline_pressure: 2.0,
}
}
#[must_use]
pub const fn deadline_focused() -> Self {
Self {
w_tasks: 1.0,
w_obligation_age: 2.0,
w_draining_regions: 3.0,
w_deadline_pressure: 10.0,
}
}
#[must_use]
pub fn is_valid(&self) -> bool {
self.w_tasks >= 0.0
&& self.w_tasks.is_finite()
&& self.w_obligation_age >= 0.0
&& self.w_obligation_age.is_finite()
&& self.w_draining_regions >= 0.0
&& self.w_draining_regions.is_finite()
&& self.w_deadline_pressure >= 0.0
&& self.w_deadline_pressure.is_finite()
}
}
impl Default for PotentialWeights {
fn default() -> Self {
Self {
w_tasks: 1.0,
w_obligation_age: 5.0,
w_draining_regions: 3.0,
w_deadline_pressure: 2.0,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct StateSnapshot {
pub time: Time,
pub live_tasks: u32,
pub pending_obligations: u32,
pub obligation_age_sum_ns: u64,
pub draining_regions: u32,
pub deadline_pressure: f64,
pub pending_send_permits: u32,
pub pending_acks: u32,
pub pending_leases: u32,
pub pending_io_ops: u32,
pub cancel_requested_tasks: u32,
pub cancelling_tasks: u32,
pub finalizing_tasks: u32,
pub ready_queue_depth: u32,
}
impl StateSnapshot {
#[inline]
fn accumulate_cancel_phase_counts(
task_state: &crate::record::task::TaskState,
cancel_requested_tasks: &mut u32,
cancelling_tasks: &mut u32,
finalizing_tasks: &mut u32,
) {
match task_state {
crate::record::task::TaskState::CancelRequested { .. } => {
*cancel_requested_tasks = cancel_requested_tasks.saturating_add(1);
}
crate::record::task::TaskState::Cancelling { .. } => {
*cancelling_tasks = cancelling_tasks.saturating_add(1);
}
crate::record::task::TaskState::Finalizing { .. } => {
*finalizing_tasks = finalizing_tasks.saturating_add(1);
}
_ => {}
}
}
#[must_use]
pub fn from_runtime_state(state: &crate::runtime::RuntimeState) -> Self {
use crate::record::obligation::ObligationKind;
const DEADLINE_PRESSURE_D0_NS: u64 = 1_000_000_000;
let now = state.now;
let mut live_tasks: u32 = 0;
let mut cancel_requested_tasks: u32 = 0;
let mut cancelling_tasks: u32 = 0;
let mut finalizing_tasks: u32 = 0;
let mut deadline_pressure = 0.0_f64;
for (_, task) in state.tasks_iter() {
if task.state.is_terminal() {
continue;
}
live_tasks = live_tasks.saturating_add(1);
Self::accumulate_cancel_phase_counts(
&task.state,
&mut cancel_requested_tasks,
&mut cancelling_tasks,
&mut finalizing_tasks,
);
let Some(cx_inner) = task.cx_inner.as_ref() else {
continue;
};
let deadline = {
let inner = cx_inner.read();
inner.budget.deadline
};
let Some(deadline) = deadline else {
continue;
};
let deadline_ns = i128::from(deadline.as_nanos());
let now_ns = i128::from(now.as_nanos());
let slack_ns = deadline_ns - now_ns;
#[allow(clippy::cast_precision_loss)]
let slack = slack_ns as f64;
#[allow(clippy::cast_precision_loss)]
let d0 = DEADLINE_PRESSURE_D0_NS as f64;
let term = 1.0 - (slack / d0);
if term > 0.0 {
deadline_pressure += term;
}
}
let mut pending_obligations: u32 = 0;
let mut obligation_age_sum_ns: u64 = 0;
let mut pending_send_permits: u32 = 0;
let mut pending_acks: u32 = 0;
let mut pending_leases: u32 = 0;
let mut pending_io_ops: u32 = 0;
for (_, obligation) in state.obligations_iter() {
if !obligation.is_pending() {
continue;
}
pending_obligations = pending_obligations.saturating_add(1);
obligation_age_sum_ns =
obligation_age_sum_ns.saturating_add(now.duration_since(obligation.reserved_at));
match obligation.kind {
ObligationKind::SendPermit => {
pending_send_permits = pending_send_permits.saturating_add(1);
}
ObligationKind::Ack => {
pending_acks = pending_acks.saturating_add(1);
}
ObligationKind::Lease => {
pending_leases = pending_leases.saturating_add(1);
}
ObligationKind::IoOp => {
pending_io_ops = pending_io_ops.saturating_add(1);
}
ObligationKind::SemaphorePermit => {
pending_leases = pending_leases.saturating_add(1);
}
}
}
let mut draining_regions: u32 = 0;
for (_, region) in state.regions_iter() {
match region.state() {
crate::record::region::RegionState::Draining
| crate::record::region::RegionState::Finalizing => {
draining_regions = draining_regions.saturating_add(1);
}
_ => {}
}
}
Self {
time: now,
live_tasks,
pending_obligations,
obligation_age_sum_ns,
draining_regions,
deadline_pressure,
pending_send_permits,
pending_acks,
pending_leases,
pending_io_ops,
cancel_requested_tasks,
cancelling_tasks,
finalizing_tasks,
ready_queue_depth: 0, }
}
#[must_use]
pub fn is_quiescent(&self) -> bool {
self.live_tasks == 0
&& self.pending_obligations == 0
&& self.draining_regions == 0
&& self.deadline_pressure.abs() < f64::EPSILON
}
#[must_use]
pub fn with_ready_queue_depth(mut self, depth: u32) -> Self {
self.ready_queue_depth = depth;
self
}
#[must_use]
pub fn total_cancelling_tasks(&self) -> u32 {
self.cancel_requested_tasks
.saturating_add(self.cancelling_tasks)
.saturating_add(self.finalizing_tasks)
}
}
impl fmt::Display for StateSnapshot {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Σ(t={}, tasks={}, obligations={}[sp={},ack={},lease={},io={}], \
age_sum={}ns, draining={}, cancel={}/{}/{}, queue={}, deadline_p={:.2})",
self.time,
self.live_tasks,
self.pending_obligations,
self.pending_send_permits,
self.pending_acks,
self.pending_leases,
self.pending_io_ops,
self.obligation_age_sum_ns,
self.draining_regions,
self.cancel_requested_tasks,
self.cancelling_tasks,
self.finalizing_tasks,
self.ready_queue_depth,
self.deadline_pressure,
)
}
}
#[derive(Debug, Clone)]
pub struct PotentialRecord {
pub snapshot: StateSnapshot,
pub total: f64,
pub task_component: f64,
pub obligation_component: f64,
pub region_component: f64,
pub deadline_component: f64,
}
impl PotentialRecord {
#[must_use]
pub fn is_zero(&self) -> bool {
self.total.abs() < f64::EPSILON
}
}
impl fmt::Display for PotentialRecord {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"V={:.2} [tasks={:.2}, obligations={:.2}, regions={:.2}, deadlines={:.2}]",
self.total,
self.task_component,
self.obligation_component,
self.region_component,
self.deadline_component,
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SchedulingSuggestion {
DrainObligations,
DrainRegions,
MeetDeadlines,
NoPreference,
}
impl SchedulingSuggestion {
#[must_use]
pub const fn description(self) -> &'static str {
match self {
Self::DrainObligations => "prioritize obligation holders",
Self::DrainRegions => "prioritize draining region tasks",
Self::MeetDeadlines => "prioritize deadline-critical tasks",
Self::NoPreference => "no scheduling preference",
}
}
}
impl fmt::Display for SchedulingSuggestion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.description())
}
}
#[derive(Debug, Clone)]
pub struct ConvergenceVerdict {
pub monotone: bool,
pub reached_quiescence: bool,
pub v_max: f64,
pub v_final: f64,
pub increase_count: usize,
pub max_increase: f64,
pub steps: usize,
}
impl ConvergenceVerdict {
#[must_use]
pub fn converged(&self) -> bool {
self.monotone && self.reached_quiescence
}
}
impl fmt::Display for ConvergenceVerdict {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Convergence Verdict")?;
writeln!(f, "===================")?;
writeln!(f, "Steps: {}", self.steps)?;
writeln!(f, "Monotone: {}", self.monotone)?;
writeln!(f, "Quiescent: {}", self.reached_quiescence)?;
writeln!(f, "Converged: {}", self.converged())?;
writeln!(f, "V_max: {:.4}", self.v_max)?;
writeln!(f, "V_final: {:.4}", self.v_final)?;
if !self.monotone {
writeln!(f, "Violations: {}", self.increase_count)?;
writeln!(f, "Max increase: {:.4}", self.max_increase)?;
}
Ok(())
}
}
#[derive(Debug)]
pub struct LyapunovGovernor {
weights: PotentialWeights,
history: Vec<PotentialRecord>,
}
impl LyapunovGovernor {
const MAX_HISTORY: usize = 8192;
#[must_use]
pub fn new(weights: PotentialWeights) -> Self {
assert!(weights.is_valid(), "weights must be non-negative");
Self {
weights,
history: Vec::new(),
}
}
#[must_use]
pub fn with_defaults() -> Self {
Self::new(PotentialWeights::default())
}
pub fn compute_potential(&mut self, snapshot: &StateSnapshot) -> f64 {
let record = self.compute(snapshot);
let total = record.total;
self.history.push(record);
if self.history.len() > Self::MAX_HISTORY {
let drain_count = Self::MAX_HISTORY / 2;
self.history.drain(..drain_count);
}
total
}
#[must_use]
pub fn compute_record(&self, snapshot: &StateSnapshot) -> PotentialRecord {
self.compute(snapshot)
}
#[must_use]
pub fn suggest(&self, snapshot: &StateSnapshot) -> SchedulingSuggestion {
if snapshot.is_quiescent() {
return SchedulingSuggestion::NoPreference;
}
let record = self.compute(snapshot);
let components = [
(
record.obligation_component,
SchedulingSuggestion::DrainObligations,
),
(record.region_component, SchedulingSuggestion::DrainRegions),
(
record.deadline_component,
SchedulingSuggestion::MeetDeadlines,
),
];
components
.iter()
.max_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal))
.filter(|(v, _)| *v > 0.0)
.map_or(SchedulingSuggestion::NoPreference, |(_, s)| *s)
}
#[must_use]
pub fn analyze_convergence(&self) -> ConvergenceVerdict {
if self.history.is_empty() {
return ConvergenceVerdict {
monotone: true,
reached_quiescence: false,
v_max: 0.0,
v_final: 0.0,
increase_count: 0,
max_increase: 0.0,
steps: 0,
};
}
let mut monotone = true;
let mut increase_count = 0;
let mut max_increase = 0.0_f64;
let mut v_max = 0.0_f64;
for window in self.history.windows(2) {
let prev = window[0].total;
let curr = window[1].total;
v_max = v_max.max(prev).max(curr);
let delta = curr - prev;
if delta > f64::EPSILON {
monotone = false;
increase_count += 1;
max_increase = max_increase.max(delta);
}
}
v_max = v_max.max(self.history.first().map_or(0.0, |r| r.total));
let v_final = self.history.last().map_or(0.0, |r| r.total);
let reached_quiescence = v_final.abs() < f64::EPSILON;
ConvergenceVerdict {
monotone,
reached_quiescence,
v_max,
v_final,
increase_count,
max_increase,
steps: self.history.len(),
}
}
#[must_use]
pub fn history(&self) -> &[PotentialRecord] {
&self.history
}
pub fn clear_history(&mut self) {
self.history.clear();
}
#[must_use]
pub const fn weights(&self) -> &PotentialWeights {
&self.weights
}
fn compute(&self, snapshot: &StateSnapshot) -> PotentialRecord {
let task_component = self.weights.w_tasks * f64::from(snapshot.live_tasks);
#[allow(clippy::cast_precision_loss)]
let age_seconds = snapshot.obligation_age_sum_ns as f64 / 1_000_000_000.0;
let obligation_component = self.weights.w_obligation_age * age_seconds;
let region_component =
self.weights.w_draining_regions * f64::from(snapshot.draining_regions);
let deadline_component = self.weights.w_deadline_pressure * snapshot.deadline_pressure;
let total = task_component + obligation_component + region_component + deadline_component;
PotentialRecord {
snapshot: snapshot.clone(),
total,
task_component,
obligation_component,
region_component,
deadline_component,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::lab::runtime::InvariantViolation;
use crate::record::ObligationKind;
use crate::runtime::RuntimeState;
use crate::types::Budget;
use proptest::prelude::*;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
fn quiescent_snapshot() -> StateSnapshot {
StateSnapshot {
time: Time::ZERO,
live_tasks: 0,
pending_obligations: 0,
obligation_age_sum_ns: 0,
draining_regions: 0,
deadline_pressure: 0.0,
pending_send_permits: 0,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
}
}
fn active_snapshot(tasks: u32, obligations: u32, age_ns: u64, draining: u32) -> StateSnapshot {
StateSnapshot {
time: Time::from_nanos(age_ns),
live_tasks: tasks,
pending_obligations: obligations,
obligation_age_sum_ns: age_ns,
draining_regions: draining,
deadline_pressure: 0.0,
pending_send_permits: obligations, pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
}
}
fn snapshot_with_components(
tasks: u32,
send_permits: u32,
age_ns: u64,
draining: u32,
deadline_pressure: f64,
) -> StateSnapshot {
StateSnapshot {
time: Time::from_nanos(age_ns),
live_tasks: tasks,
pending_obligations: send_permits,
obligation_age_sum_ns: age_ns,
draining_regions: draining,
deadline_pressure,
pending_send_permits: send_permits,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
}
}
#[test]
fn snapshot_from_runtime_counts_tasks_obligations_and_regions() {
init_test("snapshot_from_runtime_counts_tasks_obligations_and_regions");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::unlimited());
let (task_id, _handle) = state
.create_task(root, Budget::unlimited(), async {})
.expect("create_task must succeed");
let obligation_id = state
.create_obligation(ObligationKind::SendPermit, task_id, root, None)
.expect("create_obligation must succeed");
state.now = Time::from_nanos(100);
let snap = StateSnapshot::from_runtime_state(&state);
crate::assert_with_log!(snap.time == state.now, "time", state.now, snap.time);
crate::assert_with_log!(snap.live_tasks == 1, "live_tasks", 1, snap.live_tasks);
crate::assert_with_log!(
snap.pending_obligations == 1,
"pending_obligations",
1,
snap.pending_obligations
);
crate::assert_with_log!(
snap.obligation_age_sum_ns == 100,
"obligation_age_sum_ns",
100,
snap.obligation_age_sum_ns
);
crate::assert_with_log!(
snap.draining_regions == 0,
"draining_regions",
0,
snap.draining_regions
);
crate::assert_with_log!(
snap.pending_send_permits == 1,
"pending_send_permits",
1,
snap.pending_send_permits
);
crate::assert_with_log!(snap.pending_acks == 0, "pending_acks", 0, snap.pending_acks);
crate::assert_with_log!(
snap.pending_leases == 0,
"pending_leases",
0,
snap.pending_leases
);
crate::assert_with_log!(
snap.pending_io_ops == 0,
"pending_io_ops",
0,
snap.pending_io_ops
);
{
let region = state.region(root).expect("root region exists");
let ok = region.begin_close(None);
crate::assert_with_log!(ok, "begin_close", true, ok);
let ok = region.begin_drain();
crate::assert_with_log!(ok, "begin_drain", true, ok);
}
let snap2 = StateSnapshot::from_runtime_state(&state);
crate::assert_with_log!(
snap2.draining_regions == 1,
"draining_regions after begin_drain",
1,
snap2.draining_regions
);
state
.commit_obligation(obligation_id)
.expect("commit_obligation must succeed");
let snap3 = StateSnapshot::from_runtime_state(&state);
crate::assert_with_log!(
snap3.pending_obligations == 0,
"pending_obligations after commit",
0,
snap3.pending_obligations
);
crate::assert_with_log!(
snap3.pending_send_permits == 0,
"pending_send_permits after commit",
0,
snap3.pending_send_permits
);
crate::test_complete!("snapshot_from_runtime_counts_tasks_obligations_and_regions");
}
#[test]
fn snapshot_from_runtime_computes_deadline_pressure() {
init_test("snapshot_from_runtime_computes_deadline_pressure");
let mut state = RuntimeState::new();
let root = state.create_root_region(Budget::unlimited());
let (_task_id, _handle) = state
.create_task(root, Budget::with_deadline_ns(500_000_000), async {})
.expect("create_task must succeed");
state.now = Time::ZERO;
let snap = StateSnapshot::from_runtime_state(&state);
let expected = 0.5_f64;
let ok = (snap.deadline_pressure - expected).abs() < 1e-9;
crate::assert_with_log!(
ok,
"deadline_pressure at t=0",
expected,
snap.deadline_pressure
);
state.now = Time::from_nanos(600_000_000);
let snap2 = StateSnapshot::from_runtime_state(&state);
let expected_overdue = 1.1_f64;
let ok2 = (snap2.deadline_pressure - expected_overdue).abs() < 1e-9;
crate::assert_with_log!(
ok2,
"deadline_pressure overdue",
expected_overdue,
snap2.deadline_pressure
);
crate::test_complete!("snapshot_from_runtime_computes_deadline_pressure");
}
#[test]
fn with_ready_queue_depth_sets_field() {
init_test("with_ready_queue_depth_sets_field");
let snap = quiescent_snapshot().with_ready_queue_depth(42);
crate::assert_with_log!(
snap.ready_queue_depth == 42,
"ready_queue_depth",
42,
snap.ready_queue_depth
);
crate::test_complete!("with_ready_queue_depth_sets_field");
}
#[test]
fn total_cancelling_tasks_sums_phases() {
init_test("total_cancelling_tasks_sums_phases");
let mut snap = quiescent_snapshot();
snap.cancel_requested_tasks = 3;
snap.cancelling_tasks = 2;
snap.finalizing_tasks = 1;
let total = snap.total_cancelling_tasks();
crate::assert_with_log!(total == 6, "total_cancelling", 6, total);
crate::test_complete!("total_cancelling_tasks_sums_phases");
}
#[test]
fn per_kind_obligation_breakdown_sums_to_total() {
init_test("per_kind_obligation_breakdown_sums_to_total");
let snap = StateSnapshot {
time: Time::ZERO,
live_tasks: 4,
pending_obligations: 7,
obligation_age_sum_ns: 0,
draining_regions: 0,
deadline_pressure: 0.0,
pending_send_permits: 2,
pending_acks: 1,
pending_leases: 3,
pending_io_ops: 1,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
};
let sum = snap.pending_send_permits
+ snap.pending_acks
+ snap.pending_leases
+ snap.pending_io_ops;
crate::assert_with_log!(
sum == snap.pending_obligations,
"per-kind sums to total",
snap.pending_obligations,
sum
);
crate::test_complete!("per_kind_obligation_breakdown_sums_to_total");
}
#[test]
fn display_includes_extended_fields() {
init_test("display_includes_extended_fields");
let mut snap = active_snapshot(3, 2, 100_000_000, 1);
snap.cancel_requested_tasks = 1;
snap.cancelling_tasks = 1;
snap.ready_queue_depth = 5;
let s = format!("{snap}");
let has_cancel = s.contains("cancel=1/1/0");
crate::assert_with_log!(has_cancel, "display shows cancel phases", true, has_cancel);
let has_queue = s.contains("queue=5");
crate::assert_with_log!(has_queue, "display shows queue depth", true, has_queue);
let has_kind = s.contains("sp=2");
crate::assert_with_log!(has_kind, "display shows per-kind", true, has_kind);
crate::test_complete!("display_includes_extended_fields");
}
#[test]
fn potential_zero_iff_quiescent() {
init_test("potential_zero_iff_quiescent");
let governor = LyapunovGovernor::with_defaults();
let v = governor.compute_record(&quiescent_snapshot());
let is_zero = v.is_zero();
crate::assert_with_log!(is_zero, "quiescent is zero", true, is_zero);
let v_active = governor.compute_record(&active_snapshot(1, 0, 0, 0));
let not_zero = !v_active.is_zero();
crate::assert_with_log!(not_zero, "active is not zero", true, not_zero);
crate::test_complete!("potential_zero_iff_quiescent");
}
#[test]
fn potential_non_negative() {
init_test("potential_non_negative");
let governor = LyapunovGovernor::with_defaults();
let configs = [
(0, 0, 0, 0),
(1, 0, 0, 0),
(0, 1, 100, 0),
(5, 3, 1000, 2),
(100, 50, 1_000_000_000, 10),
];
for (tasks, obligations, age, draining) in configs {
let snap = active_snapshot(tasks, obligations, age, draining);
let v = governor.compute_record(&snap);
let non_neg = v.total >= 0.0;
crate::assert_with_log!(non_neg, format!("non-negative for {snap}"), true, non_neg);
}
crate::test_complete!("potential_non_negative");
}
#[test]
fn potential_increases_with_more_tasks() {
init_test("potential_increases_with_more_tasks");
let governor = LyapunovGovernor::with_defaults();
let v1 = governor.compute_record(&active_snapshot(1, 0, 0, 0));
let v2 = governor.compute_record(&active_snapshot(5, 0, 0, 0));
let v3 = governor.compute_record(&active_snapshot(10, 0, 0, 0));
let inc1 = v2.total > v1.total;
crate::assert_with_log!(inc1, "more tasks = higher V", true, inc1);
let inc2 = v3.total > v2.total;
crate::assert_with_log!(inc2, "even more tasks", true, inc2);
crate::test_complete!("potential_increases_with_more_tasks");
}
#[test]
fn potential_increases_with_obligation_age() {
init_test("potential_increases_with_obligation_age");
let governor = LyapunovGovernor::with_defaults();
let v1 = governor.compute_record(&active_snapshot(1, 1, 100, 0));
let v2 = governor.compute_record(&active_snapshot(1, 1, 1_000_000_000, 0));
let inc = v2.total > v1.total;
crate::assert_with_log!(inc, "older obligations = higher V", true, inc);
crate::test_complete!("potential_increases_with_obligation_age");
}
#[test]
fn potential_increases_with_draining_regions() {
init_test("potential_increases_with_draining_regions");
let governor = LyapunovGovernor::with_defaults();
let v1 = governor.compute_record(&active_snapshot(1, 0, 0, 0));
let v2 = governor.compute_record(&active_snapshot(1, 0, 0, 3));
let inc = v2.total > v1.total;
crate::assert_with_log!(inc, "draining regions increase V", true, inc);
crate::test_complete!("potential_increases_with_draining_regions");
}
#[test]
fn potential_deadline_pressure() {
init_test("potential_deadline_pressure");
let governor = LyapunovGovernor::with_defaults();
let snap_no_pressure = StateSnapshot {
time: Time::ZERO,
live_tasks: 1,
pending_obligations: 0,
obligation_age_sum_ns: 0,
draining_regions: 0,
deadline_pressure: 0.0,
pending_send_permits: 0,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
};
let v1 = governor.compute_record(&snap_no_pressure);
let snap_high_pressure = StateSnapshot {
deadline_pressure: 5.0,
..snap_no_pressure
};
let v2 = governor.compute_record(&snap_high_pressure);
let inc = v2.total > v1.total;
crate::assert_with_log!(inc, "deadline pressure increases V", true, inc);
crate::test_complete!("potential_deadline_pressure");
}
proptest! {
#[test]
fn metamorphic_componentwise_reduction_never_increases_potential(
tasks in 0u32..40,
obligations in 0u32..40,
age_ns in 0u64..2_000_000_000,
draining in 0u32..20,
deadline_millis in 0u32..20_000,
task_reduction in 0u32..40,
obligation_reduction in 0u32..40,
age_reduction in 0u64..2_000_000_000,
draining_reduction in 0u32..20,
deadline_reduction_millis in 0u32..20_000,
) {
let reduced_tasks = tasks.saturating_sub(task_reduction);
let reduced_obligations = obligations.saturating_sub(obligation_reduction);
let reduced_age_ns = age_ns.saturating_sub(age_reduction);
let reduced_draining = draining.saturating_sub(draining_reduction);
let deadline_pressure = f64::from(deadline_millis) / 1000.0;
let reduced_deadline_pressure =
f64::from(deadline_millis.saturating_sub(deadline_reduction_millis)) / 1000.0;
let fuller = snapshot_with_components(
tasks,
obligations,
age_ns,
draining,
deadline_pressure,
);
let reduced = snapshot_with_components(
reduced_tasks,
reduced_obligations,
reduced_age_ns,
reduced_draining,
reduced_deadline_pressure,
);
let weights = [
PotentialWeights::default(),
PotentialWeights::uniform(1.0),
PotentialWeights::obligation_focused(),
PotentialWeights::deadline_focused(),
];
for weight_set in weights {
let governor = LyapunovGovernor::new(weight_set);
let fuller_record = governor.compute_record(&fuller);
let reduced_record = governor.compute_record(&reduced);
prop_assert!(
reduced_record.total <= fuller_record.total + f64::EPSILON,
"component-wise reduction increased total potential: full={fuller_record:?}, reduced={reduced_record:?}, weights={weight_set:?}"
);
prop_assert!(
reduced_record.task_component <= fuller_record.task_component + f64::EPSILON,
"task component increased under task reduction"
);
prop_assert!(
reduced_record.obligation_component <= fuller_record.obligation_component + f64::EPSILON,
"obligation component increased under age reduction"
);
prop_assert!(
reduced_record.region_component <= fuller_record.region_component + f64::EPSILON,
"region component increased under draining reduction"
);
prop_assert!(
reduced_record.deadline_component <= fuller_record.deadline_component + f64::EPSILON,
"deadline component increased under deadline-pressure reduction"
);
}
}
}
#[test]
fn convergence_monotone_drain() {
init_test("convergence_monotone_drain");
let mut governor = LyapunovGovernor::with_defaults();
let trajectory = vec![
active_snapshot(10, 5, 500_000_000, 3),
active_snapshot(8, 4, 400_000_000, 3),
active_snapshot(6, 3, 250_000_000, 2),
active_snapshot(4, 2, 100_000_000, 1),
active_snapshot(2, 1, 30_000_000, 1),
active_snapshot(1, 0, 0, 0),
quiescent_snapshot(),
];
for snap in &trajectory {
governor.compute_potential(snap);
}
let verdict = governor.analyze_convergence();
let mono = verdict.monotone;
crate::assert_with_log!(mono, "monotone", true, mono);
let converged = verdict.converged();
crate::assert_with_log!(converged, "converged", true, converged);
let v_final = verdict.v_final;
crate::assert_with_log!(v_final.abs() < f64::EPSILON, "v_final", 0.0, v_final);
crate::test_complete!("convergence_monotone_drain");
}
#[test]
fn convergence_non_monotone_detected() {
init_test("convergence_non_monotone_detected");
let mut governor = LyapunovGovernor::with_defaults();
let trajectory = vec![
active_snapshot(5, 2, 100_000_000, 1),
active_snapshot(3, 1, 50_000_000, 1),
active_snapshot(6, 3, 200_000_000, 2), active_snapshot(4, 2, 100_000_000, 1),
active_snapshot(1, 0, 0, 0),
quiescent_snapshot(),
];
for snap in &trajectory {
governor.compute_potential(snap);
}
let verdict = governor.analyze_convergence();
let not_mono = !verdict.monotone;
crate::assert_with_log!(not_mono, "not monotone", true, not_mono);
let violations = verdict.increase_count;
crate::assert_with_log!(violations >= 1, "has violations", true, violations >= 1);
let quiescent = verdict.reached_quiescence;
crate::assert_with_log!(quiescent, "reached quiescence", true, quiescent);
crate::test_complete!("convergence_non_monotone_detected");
}
#[test]
fn convergence_stuck_not_quiescent() {
init_test("convergence_stuck_not_quiescent");
let mut governor = LyapunovGovernor::with_defaults();
let trajectory = vec![
active_snapshot(5, 3, 300_000_000, 2),
active_snapshot(3, 2, 200_000_000, 1),
active_snapshot(2, 2, 200_000_000, 1),
active_snapshot(2, 2, 200_000_000, 1), ];
for snap in &trajectory {
governor.compute_potential(snap);
}
let verdict = governor.analyze_convergence();
let not_converged = !verdict.converged();
crate::assert_with_log!(not_converged, "not converged", true, not_converged);
let not_quiescent = !verdict.reached_quiescence;
crate::assert_with_log!(not_quiescent, "not quiescent", true, not_quiescent);
crate::test_complete!("convergence_stuck_not_quiescent");
}
#[test]
fn suggest_no_preference_when_quiescent() {
init_test("suggest_no_preference_when_quiescent");
let governor = LyapunovGovernor::with_defaults();
let suggestion = governor.suggest(&quiescent_snapshot());
let is_no_pref = suggestion == SchedulingSuggestion::NoPreference;
crate::assert_with_log!(is_no_pref, "no preference when quiescent", true, is_no_pref);
crate::test_complete!("suggest_no_preference_when_quiescent");
}
#[test]
fn suggest_drain_obligations_when_dominant() {
init_test("suggest_drain_obligations_when_dominant");
let governor = LyapunovGovernor::new(PotentialWeights::obligation_focused());
let snap = StateSnapshot {
time: Time::from_nanos(1_000_000_000),
live_tasks: 1,
pending_obligations: 10,
obligation_age_sum_ns: 5_000_000_000, draining_regions: 0,
deadline_pressure: 0.0,
pending_send_permits: 10,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
};
let suggestion = governor.suggest(&snap);
let is_obligations = suggestion == SchedulingSuggestion::DrainObligations;
crate::assert_with_log!(
is_obligations,
"suggests draining obligations",
true,
is_obligations
);
crate::test_complete!("suggest_drain_obligations_when_dominant");
}
#[test]
fn suggest_drain_regions_when_dominant() {
init_test("suggest_drain_regions_when_dominant");
let governor = LyapunovGovernor::with_defaults();
let snap = StateSnapshot {
time: Time::ZERO,
live_tasks: 1,
pending_obligations: 0,
obligation_age_sum_ns: 0,
draining_regions: 10, deadline_pressure: 0.0,
pending_send_permits: 0,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
};
let suggestion = governor.suggest(&snap);
let is_regions = suggestion == SchedulingSuggestion::DrainRegions;
crate::assert_with_log!(is_regions, "suggests draining regions", true, is_regions);
crate::test_complete!("suggest_drain_regions_when_dominant");
}
#[test]
fn suggest_meet_deadlines_when_dominant() {
init_test("suggest_meet_deadlines_when_dominant");
let governor = LyapunovGovernor::new(PotentialWeights::deadline_focused());
let snap = StateSnapshot {
time: Time::ZERO,
live_tasks: 1,
pending_obligations: 0,
obligation_age_sum_ns: 0,
draining_regions: 0,
deadline_pressure: 10.0, pending_send_permits: 0,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
};
let suggestion = governor.suggest(&snap);
let is_deadlines = suggestion == SchedulingSuggestion::MeetDeadlines;
crate::assert_with_log!(
is_deadlines,
"suggests meeting deadlines",
true,
is_deadlines
);
crate::test_complete!("suggest_meet_deadlines_when_dominant");
}
#[test]
fn weights_uniform() {
init_test("weights_uniform");
let w = PotentialWeights::uniform(1.0);
let valid = w.is_valid();
crate::assert_with_log!(valid, "uniform valid", true, valid);
let eps = f64::EPSILON;
let all_eq = (w.w_tasks - w.w_obligation_age).abs() < eps
&& (w.w_obligation_age - w.w_draining_regions).abs() < eps
&& (w.w_draining_regions - w.w_deadline_pressure).abs() < eps;
crate::assert_with_log!(all_eq, "all equal", true, all_eq);
crate::test_complete!("weights_uniform");
}
#[test]
fn weights_obligation_focused() {
init_test("weights_obligation_focused");
let w = PotentialWeights::obligation_focused();
let valid = w.is_valid();
crate::assert_with_log!(valid, "obligation focused valid", true, valid);
let ob_dominant = w.w_obligation_age > w.w_tasks;
crate::assert_with_log!(
ob_dominant,
"obligations weighted higher",
true,
ob_dominant
);
crate::test_complete!("weights_obligation_focused");
}
#[test]
fn weights_deadline_focused() {
init_test("weights_deadline_focused");
let w = PotentialWeights::deadline_focused();
let valid = w.is_valid();
crate::assert_with_log!(valid, "deadline focused valid", true, valid);
let dl_dominant = w.w_deadline_pressure > w.w_tasks;
crate::assert_with_log!(dl_dominant, "deadlines weighted higher", true, dl_dominant);
crate::test_complete!("weights_deadline_focused");
}
#[test]
fn component_isolation_tasks_only() {
init_test("component_isolation_tasks_only");
let governor = LyapunovGovernor::new(PotentialWeights {
w_tasks: 1.0,
w_obligation_age: 0.0,
w_draining_regions: 0.0,
w_deadline_pressure: 0.0,
});
let snap = active_snapshot(5, 3, 1_000_000_000, 2);
let record = governor.compute_record(&snap);
let only_tasks = record.obligation_component.abs() < f64::EPSILON
&& record.region_component.abs() < f64::EPSILON
&& record.deadline_component.abs() < f64::EPSILON;
crate::assert_with_log!(only_tasks, "only task component", true, only_tasks);
let expected = 5.0;
let close = (record.total - expected).abs() < f64::EPSILON;
crate::assert_with_log!(close, "total = 5.0", true, close);
crate::test_complete!("component_isolation_tasks_only");
}
#[test]
fn governor_reuse_and_clear() {
init_test("governor_reuse_and_clear");
let mut governor = LyapunovGovernor::with_defaults();
governor.compute_potential(&active_snapshot(5, 3, 100_000_000, 1));
governor.compute_potential(&quiescent_snapshot());
let len = governor.history().len();
crate::assert_with_log!(len == 2, "history has 2 entries", 2, len);
governor.clear_history();
let len = governor.history().len();
crate::assert_with_log!(len == 0, "cleared", 0, len);
crate::test_complete!("governor_reuse_and_clear");
}
#[test]
#[allow(clippy::too_many_lines)]
fn experiment_cancel_drain_converges() {
init_test("experiment_cancel_drain_converges");
let mut governor = LyapunovGovernor::new(PotentialWeights::obligation_focused());
governor.compute_potential(&StateSnapshot {
time: Time::ZERO,
live_tasks: 5,
pending_obligations: 5,
obligation_age_sum_ns: 500_000_000, draining_regions: 1,
deadline_pressure: 0.0,
pending_send_permits: 5,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 5,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
});
governor.compute_potential(&StateSnapshot {
time: Time::from_nanos(100_000_000),
live_tasks: 4,
pending_obligations: 4,
obligation_age_sum_ns: 480_000_000,
draining_regions: 1,
deadline_pressure: 0.0,
pending_send_permits: 4,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 4,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
});
governor.compute_potential(&StateSnapshot {
time: Time::from_nanos(200_000_000),
live_tasks: 3,
pending_obligations: 3,
obligation_age_sum_ns: 360_000_000,
draining_regions: 1,
deadline_pressure: 0.0,
pending_send_permits: 3,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 3,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
});
governor.compute_potential(&StateSnapshot {
time: Time::from_nanos(300_000_000),
live_tasks: 2,
pending_obligations: 2,
obligation_age_sum_ns: 220_000_000,
draining_regions: 1,
deadline_pressure: 0.0,
pending_send_permits: 2,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 2,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
});
governor.compute_potential(&StateSnapshot {
time: Time::from_nanos(400_000_000),
live_tasks: 1,
pending_obligations: 1,
obligation_age_sum_ns: 80_000_000,
draining_regions: 1,
deadline_pressure: 0.0,
pending_send_permits: 1,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 1,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
});
governor.compute_potential(&StateSnapshot {
time: Time::from_nanos(500_000_000),
live_tasks: 0,
pending_obligations: 0,
obligation_age_sum_ns: 0,
draining_regions: 0,
deadline_pressure: 0.0,
pending_send_permits: 0,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
});
let verdict = governor.analyze_convergence();
let converged = verdict.converged();
crate::assert_with_log!(converged, "cancel drain converges", true, converged);
let mono = verdict.monotone;
crate::assert_with_log!(mono, "monotone decrease", true, mono);
let v_max = verdict.v_max;
let has_max = v_max > 0.0;
crate::assert_with_log!(has_max, "had nonzero peak", true, has_max);
for (i, record) in governor.history().iter().enumerate() {
tracing::info!("Step {i}: {record}");
}
crate::test_complete!("experiment_cancel_drain_converges");
}
#[test]
fn experiment_deadline_aware_drain() {
init_test("experiment_deadline_aware_drain");
let governor = LyapunovGovernor::new(PotentialWeights::deadline_focused());
let snap = StateSnapshot {
time: Time::from_nanos(900_000_000), live_tasks: 3,
pending_obligations: 2,
obligation_age_sum_ns: 200_000_000,
draining_regions: 1,
deadline_pressure: 8.5, pending_send_permits: 2,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
};
let suggestion = governor.suggest(&snap);
let is_deadlines = suggestion == SchedulingSuggestion::MeetDeadlines;
crate::assert_with_log!(
is_deadlines,
"deadline-focused governor meets deadlines",
true,
is_deadlines
);
let record = governor.compute_record(&snap);
let dl_dominant = record.deadline_component > record.obligation_component
&& record.deadline_component > record.region_component;
crate::assert_with_log!(
dl_dominant,
"deadline component dominates",
true,
dl_dominant
);
crate::test_complete!("experiment_deadline_aware_drain");
}
#[test]
fn display_impls() {
init_test("lyapunov_display_impls");
let snap = active_snapshot(3, 2, 100_000_000, 1);
let s = format!("{snap}");
let has_sigma = s.contains("Σ(");
crate::assert_with_log!(has_sigma, "snapshot display", true, has_sigma);
let governor = LyapunovGovernor::with_defaults();
let record = governor.compute_record(&snap);
let s = format!("{record}");
let has_v = s.contains("V=");
crate::assert_with_log!(has_v, "record display", true, has_v);
let suggestion = SchedulingSuggestion::DrainObligations;
let s = format!("{suggestion}");
let has_priority = s.contains("prioritize");
crate::assert_with_log!(has_priority, "suggestion display", true, has_priority);
let verdict = ConvergenceVerdict {
monotone: true,
reached_quiescence: true,
v_max: 10.0,
v_final: 0.0,
increase_count: 0,
max_increase: 0.0,
steps: 5,
};
let s = format!("{verdict}");
let has_converged = s.contains("Converged");
crate::assert_with_log!(has_converged, "verdict display", true, has_converged);
crate::test_complete!("lyapunov_display_impls");
}
async fn yield_once() {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct YieldOnce {
yielded: bool,
}
impl Future for YieldOnce {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.yielded {
Poll::Ready(())
} else {
self.yielded = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
YieldOnce { yielded: false }.await;
}
fn run_cancel_drain_potential_trajectory(
seed: u64,
task_count: usize,
warmup_steps: usize,
) -> (LyapunovGovernor, bool) {
run_cancel_drain_with_weights(seed, task_count, warmup_steps, PotentialWeights::default())
}
fn run_cancel_drain_with_weights(
seed: u64,
task_count: usize,
warmup_steps: usize,
weights: PotentialWeights,
) -> (LyapunovGovernor, bool) {
use crate::lab::{LabConfig, LabRuntime};
use crate::types::CancelReason;
let mut runtime = LabRuntime::new(LabConfig::new(seed));
let region = runtime.state.create_root_region(Budget::unlimited());
for _ in 0..task_count {
let (task_id, _handle) = runtime
.state
.create_task(region, Budget::unlimited(), async {
for _ in 0..20 {
let Some(cx) = crate::cx::Cx::current() else {
return;
};
if cx.checkpoint().is_err() {
return;
}
yield_once().await;
}
})
.expect("create task");
runtime.scheduler.lock().schedule(task_id, 0);
}
for _ in 0..warmup_steps {
runtime.step_for_test();
}
let cancel_reason = CancelReason::shutdown();
let tasks_to_cancel = runtime.state.cancel_request(region, &cancel_reason, None);
{
let mut scheduler = runtime.scheduler.lock();
for (task_id, priority) in tasks_to_cancel {
scheduler.schedule_cancel(task_id, priority);
}
}
let mut governor = LyapunovGovernor::new(weights);
governor.compute_potential(&StateSnapshot::from_runtime_state(&runtime.state));
let max_drain_steps = 10_000_u64;
let mut drain_steps = 0_u64;
while !runtime.is_quiescent() && drain_steps < max_drain_steps {
runtime.step_for_test();
drain_steps += 1;
governor.compute_potential(&StateSnapshot::from_runtime_state(&runtime.state));
}
(governor, runtime.is_quiescent())
}
#[test]
fn lab_cancel_drain_monotone_potential_decrease() {
init_test("lab_cancel_drain_monotone_potential_decrease");
let (governor, is_quiescent) = run_cancel_drain_potential_trajectory(0xBD25_0201, 8, 16);
crate::assert_with_log!(is_quiescent, "quiescent", true, is_quiescent);
let verdict = governor.analyze_convergence();
for (i, record) in governor.history().iter().enumerate() {
tracing::info!("Step {i}: {record}");
}
tracing::info!("{verdict}");
crate::assert_with_log!(verdict.monotone, "monotone", true, verdict.monotone);
crate::assert_with_log!(
verdict.reached_quiescence,
"V=0",
true,
verdict.reached_quiescence
);
crate::assert_with_log!(verdict.converged(), "converged", true, verdict.converged());
let had_activity = verdict.v_max > 0.0;
crate::assert_with_log!(had_activity, "peak V > 0", true, had_activity);
crate::test_complete!("lab_cancel_drain_monotone_potential_decrease");
}
#[test]
fn lab_cancel_drain_deterministic_potential_trajectory() {
init_test("lab_cancel_drain_deterministic_potential_trajectory");
let seed = 0xBD25_DEAD;
let (gov1, q1) = run_cancel_drain_potential_trajectory(seed, 8, 16);
let (gov2, q2) = run_cancel_drain_potential_trajectory(seed, 8, 16);
crate::assert_with_log!(q1 && q2, "both quiescent", true, q1 && q2);
let h1: Vec<f64> = gov1.history().iter().map(|r| r.total).collect();
let h2: Vec<f64> = gov2.history().iter().map(|r| r.total).collect();
crate::assert_with_log!(h1.len() == h2.len(), "same length", h1.len(), h2.len());
let all_match = h1
.iter()
.zip(h2.iter())
.all(|(a, b)| (a - b).abs() < f64::EPSILON);
crate::assert_with_log!(all_match, "trajectories match", true, all_match);
crate::test_complete!("lab_cancel_drain_deterministic_potential_trajectory");
}
#[test]
fn lab_quiescence_invariants_after_cancel_drain() {
init_test("lab_quiescence_invariants_after_cancel_drain");
let (governor, is_quiescent) = run_cancel_drain_potential_trajectory(0xBD25_CAFE, 12, 8);
crate::assert_with_log!(is_quiescent, "quiescent", true, is_quiescent);
let final_record = governor.history().last().expect("non-empty history");
let snap = &final_record.snapshot;
crate::assert_with_log!(snap.live_tasks == 0, "no live tasks", 0, snap.live_tasks);
crate::assert_with_log!(
snap.pending_obligations == 0,
"no obligations",
0,
snap.pending_obligations
);
crate::assert_with_log!(
snap.draining_regions == 0,
"no draining regions",
0,
snap.draining_regions
);
crate::assert_with_log!(
snap.is_quiescent(),
"snapshot quiescent",
true,
snap.is_quiescent()
);
crate::assert_with_log!(
snap.pending_send_permits == 0,
"no sp",
0,
snap.pending_send_permits
);
crate::assert_with_log!(snap.pending_acks == 0, "no ack", 0, snap.pending_acks);
crate::assert_with_log!(snap.pending_leases == 0, "no lease", 0, snap.pending_leases);
crate::assert_with_log!(snap.pending_io_ops == 0, "no io", 0, snap.pending_io_ops);
crate::assert_with_log!(
snap.cancel_requested_tasks == 0,
"no cancel_requested",
0,
snap.cancel_requested_tasks
);
crate::assert_with_log!(
snap.cancelling_tasks == 0,
"no cancelling",
0,
snap.cancelling_tasks
);
crate::assert_with_log!(
snap.finalizing_tasks == 0,
"no finalizing",
0,
snap.finalizing_tasks
);
let v_zero = final_record.total.abs() < f64::EPSILON;
crate::assert_with_log!(v_zero, "V = 0", true, v_zero);
crate::test_complete!("lab_quiescence_invariants_after_cancel_drain");
}
#[test]
fn lab_cancel_drain_with_many_tasks_converges() {
init_test("lab_cancel_drain_with_many_tasks_converges");
let (governor, is_quiescent) = run_cancel_drain_potential_trajectory(0xBD25_A1B0, 12, 24);
crate::assert_with_log!(is_quiescent, "quiescent", true, is_quiescent);
let verdict = governor.analyze_convergence();
for (i, record) in governor.history().iter().enumerate() {
tracing::info!("Step {i}: {record}");
}
tracing::info!("{verdict}");
crate::assert_with_log!(verdict.monotone, "monotone", true, verdict.monotone);
crate::assert_with_log!(verdict.converged(), "converged", true, verdict.converged());
crate::test_complete!("lab_cancel_drain_with_many_tasks_converges");
}
#[test]
fn lab_potential_decreases_across_weight_configurations() {
init_test("lab_potential_decreases_across_weight_configurations");
let weight_configs = [
("default", PotentialWeights::default()),
("uniform", PotentialWeights::uniform(1.0)),
("obligation_focused", PotentialWeights::obligation_focused()),
("deadline_focused", PotentialWeights::deadline_focused()),
];
for (label, weights) in &weight_configs {
let (governor, is_quiescent) =
run_cancel_drain_with_weights(0xBD25_0815, 6, 8, *weights);
crate::assert_with_log!(
is_quiescent,
format!("{label}: quiescent"),
true,
is_quiescent
);
let verdict = governor.analyze_convergence();
tracing::info!("Weights={label}: {verdict}");
crate::assert_with_log!(
verdict.monotone,
format!("{label}: monotone"),
true,
verdict.monotone
);
crate::assert_with_log!(
verdict.converged(),
format!("{label}: converged"),
true,
verdict.converged()
);
}
crate::test_complete!("lab_potential_decreases_across_weight_configurations");
}
fn run_cancel_drain_with_obligations(
seed: u64,
task_count: usize,
obligations_per_task: usize,
warmup_steps: usize,
weights: PotentialWeights,
) -> (LyapunovGovernor, bool, usize) {
use crate::lab::{LabConfig, LabRuntime};
use crate::record::ObligationKind;
use crate::types::CancelReason;
let mut runtime = LabRuntime::new(LabConfig::new(seed).panic_on_leak(false));
let region = runtime.state.create_root_region(Budget::unlimited());
let obligation_kinds = [
ObligationKind::SendPermit,
ObligationKind::Ack,
ObligationKind::Lease,
ObligationKind::IoOp,
];
let mut obligation_ids = Vec::new();
for t_idx in 0..task_count {
let (task_id, _handle) = runtime
.state
.create_task(region, Budget::unlimited(), async {
for _ in 0..1_000 {
let Some(cx) = crate::cx::Cx::current() else {
return;
};
if cx.checkpoint().is_err() {
return;
}
yield_once().await;
}
})
.expect("create task");
for o_idx in 0..obligations_per_task {
let kind = obligation_kinds[(t_idx + o_idx) % obligation_kinds.len()];
if let Ok(obl_id) = runtime.state.create_obligation(
kind,
task_id,
region,
Some(format!("test-obl-t{t_idx}-o{o_idx}")),
) {
obligation_ids.push(obl_id);
}
}
runtime.scheduler.lock().schedule(task_id, 0);
}
for _ in 0..warmup_steps {
runtime.step_for_test();
}
runtime.advance_time(1_000_000_000);
let mut governor = LyapunovGovernor::new(weights);
governor.compute_potential(&StateSnapshot::from_runtime_state(&runtime.state));
let cancel_reason = CancelReason::shutdown();
let tasks_to_cancel = runtime.state.cancel_request(region, &cancel_reason, None);
{
let mut scheduler = runtime.scheduler.lock();
for (task_id, priority) in tasks_to_cancel {
scheduler.schedule_cancel(task_id, priority);
}
}
for obl_id in &obligation_ids {
let _ = runtime
.state
.abort_obligation(*obl_id, crate::record::ObligationAbortReason::Cancel);
}
governor.compute_potential(&StateSnapshot::from_runtime_state(&runtime.state));
let mut drain_steps = 0_u64;
while !runtime.is_quiescent() && drain_steps < 10_000 {
runtime.step_for_test();
drain_steps += 1;
governor.compute_potential(&StateSnapshot::from_runtime_state(&runtime.state));
}
let violations = runtime.check_invariants();
let leak_count = violations
.iter()
.filter(|v| matches!(v, InvariantViolation::ObligationLeak { .. }))
.count();
(governor, runtime.is_quiescent(), leak_count)
}
#[test]
fn lab_cancel_drain_with_obligations_monotone_decrease() {
init_test("lab_cancel_drain_with_obligations_monotone_decrease");
let (governor, is_quiescent, leak_count) =
run_cancel_drain_with_obligations(0xBD25_0B01, 8, 2, 16, PotentialWeights::default());
crate::assert_with_log!(is_quiescent, "quiescent", true, is_quiescent);
crate::assert_with_log!(leak_count == 0, "no obligation leaks", 0usize, leak_count);
let verdict = governor.analyze_convergence();
for (i, record) in governor.history().iter().enumerate() {
tracing::info!("Step {i}: {record}");
}
tracing::info!("{verdict}");
crate::assert_with_log!(verdict.monotone, "monotone", true, verdict.monotone);
crate::assert_with_log!(
verdict.reached_quiescence,
"V=0",
true,
verdict.reached_quiescence
);
crate::assert_with_log!(verdict.converged(), "converged", true, verdict.converged());
let first = &governor.history()[0];
crate::assert_with_log!(
first.snapshot.pending_obligations > 0,
"initial pending obligations > 0",
true,
first.snapshot.pending_obligations > 0
);
crate::test_complete!("lab_cancel_drain_with_obligations_monotone_decrease");
}
#[test]
fn lab_obligation_leak_oracle_clean_after_drain() {
init_test("lab_obligation_leak_oracle_clean_after_drain");
let (governor, is_quiescent, leak_count) =
run_cancel_drain_with_obligations(0xBD25_1EAC, 10, 3, 8, PotentialWeights::default());
crate::assert_with_log!(is_quiescent, "quiescent", true, is_quiescent);
crate::assert_with_log!(leak_count == 0, "zero obligation leaks", 0usize, leak_count);
let final_record = governor.history().last().expect("non-empty history");
let snap = &final_record.snapshot;
crate::assert_with_log!(
snap.pending_obligations == 0,
"no pending",
0,
snap.pending_obligations
);
crate::assert_with_log!(
snap.pending_send_permits == 0,
"no sp",
0,
snap.pending_send_permits
);
crate::assert_with_log!(snap.pending_acks == 0, "no acks", 0, snap.pending_acks);
crate::assert_with_log!(
snap.pending_leases == 0,
"no leases",
0,
snap.pending_leases
);
crate::assert_with_log!(
snap.pending_io_ops == 0,
"no io_ops",
0,
snap.pending_io_ops
);
crate::test_complete!("lab_obligation_leak_oracle_clean_after_drain");
}
#[test]
fn lab_cancel_drain_with_obligations_deterministic() {
init_test("lab_cancel_drain_with_obligations_deterministic");
let seed = 0xBD25_DE70;
let w = PotentialWeights::default();
let (gov1, q1, l1) = run_cancel_drain_with_obligations(seed, 6, 2, 12, w);
let (gov2, q2, l2) = run_cancel_drain_with_obligations(seed, 6, 2, 12, w);
crate::assert_with_log!(q1 && q2, "both quiescent", true, q1 && q2);
crate::assert_with_log!(l1 == 0 && l2 == 0, "no leaks", true, l1 == 0 && l2 == 0);
let h1: Vec<f64> = gov1.history().iter().map(|r| r.total).collect();
let h2: Vec<f64> = gov2.history().iter().map(|r| r.total).collect();
crate::assert_with_log!(h1.len() == h2.len(), "same length", h1.len(), h2.len());
let all_match = h1
.iter()
.zip(h2.iter())
.all(|(a, b)| (a - b).abs() < f64::EPSILON);
crate::assert_with_log!(all_match, "trajectories match", true, all_match);
crate::test_complete!("lab_cancel_drain_with_obligations_deterministic");
}
#[test]
fn lab_obligation_focused_weights_converge_with_obligations() {
init_test("lab_obligation_focused_weights_converge_with_obligations");
let weights = PotentialWeights::obligation_focused();
let (governor, is_quiescent, leak_count) =
run_cancel_drain_with_obligations(0xBD25_0B1F, 8, 3, 8, weights);
crate::assert_with_log!(is_quiescent, "quiescent", true, is_quiescent);
crate::assert_with_log!(leak_count == 0, "no leaks", 0usize, leak_count);
let verdict = governor.analyze_convergence();
tracing::info!("{verdict}");
crate::assert_with_log!(verdict.monotone, "monotone", true, verdict.monotone);
crate::assert_with_log!(verdict.converged(), "converged", true, verdict.converged());
let first = &governor.history()[0];
let obl_fraction = if first.total > 0.0 {
first.obligation_component / first.total
} else {
0.0
};
tracing::info!(
"Obligation fraction of initial V: {:.2}% ({:.4} / {:.4})",
obl_fraction * 100.0,
first.obligation_component,
first.total,
);
crate::test_complete!("lab_obligation_focused_weights_converge_with_obligations");
}
#[test]
fn lab_quiescence_snapshot_zero_with_obligations() {
init_test("lab_quiescence_snapshot_zero_with_obligations");
let (governor, is_quiescent, leak_count) =
run_cancel_drain_with_obligations(0xBD25_0520, 12, 2, 10, PotentialWeights::default());
crate::assert_with_log!(is_quiescent, "quiescent", true, is_quiescent);
crate::assert_with_log!(leak_count == 0, "no leaks", 0usize, leak_count);
let final_record = governor.history().last().expect("non-empty history");
let snap = &final_record.snapshot;
crate::assert_with_log!(snap.live_tasks == 0, "no live tasks", 0, snap.live_tasks);
crate::assert_with_log!(
snap.pending_obligations == 0,
"no obl",
0,
snap.pending_obligations
);
crate::assert_with_log!(
snap.draining_regions == 0,
"no draining",
0,
snap.draining_regions
);
crate::assert_with_log!(
snap.obligation_age_sum_ns == 0,
"age zero",
0u64,
snap.obligation_age_sum_ns
);
crate::assert_with_log!(
snap.cancel_requested_tasks == 0,
"no cr",
0,
snap.cancel_requested_tasks
);
crate::assert_with_log!(
snap.cancelling_tasks == 0,
"no cancelling",
0,
snap.cancelling_tasks
);
crate::assert_with_log!(
snap.finalizing_tasks == 0,
"no finalizing",
0,
snap.finalizing_tasks
);
crate::assert_with_log!(
snap.is_quiescent(),
"quiescent snap",
true,
snap.is_quiescent()
);
let v_zero = final_record.total.abs() < f64::EPSILON;
crate::assert_with_log!(v_zero, "V = 0", true, v_zero);
crate::test_complete!("lab_quiescence_snapshot_zero_with_obligations");
}
#[test]
fn potential_weights_debug_clone_copy_default() {
let w = PotentialWeights::default();
let dbg = format!("{w:?}");
assert!(dbg.contains("PotentialWeights"));
let w2 = w;
assert!((w2.w_tasks - 1.0).abs() < f64::EPSILON);
let w3 = w;
assert!((w3.w_obligation_age - 5.0).abs() < f64::EPSILON);
}
#[test]
fn scheduling_suggestion_debug_clone_copy_eq() {
let s = SchedulingSuggestion::DrainObligations;
let dbg = format!("{s:?}");
assert!(dbg.contains("DrainObligations"));
let s2 = s;
assert_eq!(s, s2);
let s3 = s;
assert_eq!(s, s3);
assert_ne!(
SchedulingSuggestion::DrainObligations,
SchedulingSuggestion::MeetDeadlines
);
}
#[test]
fn potential_record_debug_clone() {
let snap = StateSnapshot {
time: Time::ZERO,
live_tasks: 0,
pending_obligations: 0,
obligation_age_sum_ns: 0,
draining_regions: 0,
deadline_pressure: 0.0,
pending_send_permits: 0,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
};
let rec = PotentialRecord {
snapshot: snap,
total: 0.0,
task_component: 0.0,
obligation_component: 0.0,
region_component: 0.0,
deadline_component: 0.0,
};
let dbg = format!("{rec:?}");
assert!(dbg.contains("PotentialRecord"));
let rec2 = rec;
assert!(rec2.is_zero());
}
}