use franken_decision::{DecisionContract, FallbackPolicy, LossMatrix, Posterior};
use crate::obligation::lyapunov::StateSnapshot;
pub mod state {
pub const HEALTHY: usize = 0;
pub const CONGESTED: usize = 1;
pub const UNSTABLE: usize = 2;
pub const PARTITIONED: usize = 3;
pub const COUNT: usize = 4;
}
pub mod action {
pub const AGGRESSIVE: usize = 0;
pub const BALANCED: usize = 1;
pub const CONSERVATIVE: usize = 2;
pub const COUNT: usize = 3;
}
#[derive(Debug, Clone)]
pub struct SchedulerDecisionContract {
states: Vec<String>,
actions: Vec<String>,
losses: LossMatrix,
fallback: FallbackPolicy,
}
impl SchedulerDecisionContract {
#[rustfmt::skip]
const DEFAULT_LOSSES: [f64; 12] = [
1.0, 3.0, 11.0,
22.0, 10.0, 3.0,
15.0, 8.0, 5.0,
30.0, 15.0, 8.0,
];
#[must_use]
#[inline]
pub fn new() -> Self {
Self::with_losses_and_policy(Self::DEFAULT_LOSSES.to_vec(), FallbackPolicy::default())
}
#[must_use]
#[inline]
pub fn with_losses_and_policy(losses: Vec<f64>, fallback: FallbackPolicy) -> Self {
let states = vec![
"healthy".into(),
"congested".into(),
"unstable".into(),
"partitioned".into(),
];
let actions = vec![
"aggressive_schedule".into(),
"balanced_schedule".into(),
"conservative_schedule".into(),
];
let loss_matrix = LossMatrix::new(states.clone(), actions.clone(), losses)
.expect("scheduler loss matrix should be valid");
Self {
states,
actions,
losses: loss_matrix,
fallback,
}
}
#[must_use]
#[inline]
#[allow(clippy::suboptimal_flops)] pub fn snapshot_likelihoods(snapshot: &StateSnapshot) -> [f64; 4] {
let cancel_load = f64::from(snapshot.total_cancelling_tasks());
let obligation_load = f64::from(snapshot.pending_obligations);
let ready_load = f64::from(snapshot.ready_queue_depth);
let drain_load = f64::from(snapshot.draining_regions);
let deadline_signal = snapshot.deadline_pressure.clamp(0.0, 1.0);
let healthy_score =
1.0 / (1.0 + cancel_load + obligation_load * 0.5 + deadline_signal * 2.0 + drain_load);
let congested_score = (ready_load + obligation_load) / (1.0 + ready_load + obligation_load)
* (1.0 - deadline_signal * 0.5);
let unstable_score = (cancel_load + drain_load) / (1.0 + cancel_load + drain_load)
* (1.0 - deadline_signal * 0.3);
let cancel_drain_ratio = (cancel_load + drain_load) / (1.0 + cancel_load + drain_load);
let partitioned_score = deadline_signal * (0.5 + cancel_drain_ratio);
let floor = 0.01;
let clamp_likelihood = |score: f64| score.clamp(floor, 1.0);
[
clamp_likelihood(healthy_score),
clamp_likelihood(congested_score),
clamp_likelihood(unstable_score),
clamp_likelihood(partitioned_score),
]
}
}
impl Default for SchedulerDecisionContract {
#[inline]
fn default() -> Self {
Self::new()
}
}
#[allow(clippy::unnecessary_literal_bound)]
impl DecisionContract for SchedulerDecisionContract {
#[inline]
fn name(&self) -> &str {
"scheduler"
}
#[inline]
fn state_space(&self) -> &[String] {
&self.states
}
#[inline]
fn action_set(&self) -> &[String] {
&self.actions
}
#[inline]
fn loss_matrix(&self) -> &LossMatrix {
&self.losses
}
#[inline]
fn update_posterior(&self, posterior: &mut Posterior, observation: usize) {
if posterior.len() != state::COUNT {
return;
}
let mut likelihoods = [0.1; state::COUNT];
if let Some(observed) = likelihoods.get_mut(observation) {
*observed = 0.9;
}
posterior.bayesian_update(&likelihoods);
}
#[inline]
fn choose_action(&self, posterior: &Posterior) -> usize {
if posterior.len() != state::COUNT {
return self.fallback_action();
}
self.losses.bayes_action(posterior)
}
#[inline]
fn fallback_action(&self) -> usize {
action::CONSERVATIVE
}
#[inline]
fn fallback_policy(&self) -> &FallbackPolicy {
&self.fallback
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::Time;
use franken_decision::{EvalContext, Posterior, evaluate};
use franken_kernel::{DecisionId, TraceId};
use serde_json::{Value, json};
#[inline]
fn test_ctx(cal: f64) -> EvalContext {
EvalContext {
calibration_score: cal,
e_process: 1.0,
ci_width: 0.1,
decision_id: DecisionId::from_parts(1_700_000_000_000, 42),
trace_id: TraceId::from_parts(1_700_000_000_000, 1),
ts_unix_ms: 1_700_000_000_000,
}
}
#[inline]
fn zero_snapshot() -> StateSnapshot {
StateSnapshot {
time: Time::ZERO,
live_tasks: 0,
pending_obligations: 0,
obligation_age_sum_ns: 0,
draining_regions: 0,
deadline_pressure: 0.0,
pending_send_permits: 0,
pending_acks: 0,
pending_leases: 0,
pending_io_ops: 0,
cancel_requested_tasks: 0,
cancelling_tasks: 0,
finalizing_tasks: 0,
ready_queue_depth: 0,
}
}
fn scrub_decision_output(value: Value) -> Value {
let mut scrubbed = value;
if let Some(audit) = scrubbed
.get_mut("audit_entry")
.and_then(Value::as_object_mut)
{
if let Some(decision_id) = audit.get_mut("decision_id") {
*decision_id = Value::String("[DECISION_ID]".into());
}
if let Some(trace_id) = audit.get_mut("trace_id") {
*trace_id = Value::String("[TRACE_ID]".into());
}
if let Some(ts_unix_ms) = audit.get_mut("ts_unix_ms") {
*ts_unix_ms = Value::String("[TS_MS]".into());
}
}
scrubbed
}
#[test]
fn contract_creation() {
let c = SchedulerDecisionContract::new();
assert_eq!(c.name(), "scheduler");
assert_eq!(c.state_space().len(), 4);
assert_eq!(c.action_set().len(), 3);
}
#[test]
fn healthy_state_prefers_aggressive() {
let c = SchedulerDecisionContract::new();
let posterior = Posterior::new(vec![0.9, 0.03, 0.03, 0.04]).unwrap();
let outcome = evaluate(&c, &posterior, &test_ctx(0.95));
assert_eq!(outcome.action_index, action::AGGRESSIVE);
assert!(!outcome.fallback_active);
}
#[test]
fn congested_state_prefers_conservative() {
let c = SchedulerDecisionContract::new();
let posterior = Posterior::new(vec![0.05, 0.85, 0.05, 0.05]).unwrap();
let outcome = evaluate(&c, &posterior, &test_ctx(0.95));
assert_eq!(outcome.action_index, action::CONSERVATIVE);
}
#[test]
fn partitioned_state_prefers_conservative() {
let c = SchedulerDecisionContract::new();
let posterior = Posterior::new(vec![0.05, 0.05, 0.05, 0.85]).unwrap();
let outcome = evaluate(&c, &posterior, &test_ctx(0.95));
assert_eq!(outcome.action_index, action::CONSERVATIVE);
}
#[test]
fn unstable_state_prefers_conservative() {
let c = SchedulerDecisionContract::new();
let posterior = Posterior::new(vec![0.05, 0.05, 0.85, 0.05]).unwrap();
let outcome = evaluate(&c, &posterior, &test_ctx(0.95));
assert_eq!(outcome.action_index, action::CONSERVATIVE);
}
#[test]
fn fallback_chooses_conservative() {
let c = SchedulerDecisionContract::new();
let posterior = Posterior::uniform(4);
let outcome = evaluate(&c, &posterior, &test_ctx(0.3));
assert!(outcome.fallback_active);
assert_eq!(outcome.action_index, action::CONSERVATIVE);
}
#[test]
fn uniform_posterior_prefers_conservative() {
let c = SchedulerDecisionContract::new();
let posterior = Posterior::uniform(4);
let outcome = evaluate(&c, &posterior, &test_ctx(0.95));
assert_eq!(outcome.action_index, action::CONSERVATIVE);
}
#[test]
fn audit_entry_produces_valid_evidence() {
let c = SchedulerDecisionContract::new();
let posterior = Posterior::new(vec![0.7, 0.1, 0.1, 0.1]).unwrap();
let outcome = evaluate(&c, &posterior, &test_ctx(0.92));
let evidence = outcome.audit_entry.to_evidence_ledger();
assert_eq!(evidence.component, "scheduler");
assert!(evidence.is_valid());
}
#[test]
fn decision_output_snapshot_scrubbed() {
let c = SchedulerDecisionContract::new();
let posterior = Posterior::new(vec![0.12, 0.18, 0.2, 0.5]).unwrap();
let outcome = evaluate(&c, &posterior, &test_ctx(0.91));
insta::assert_json_snapshot!(
"decision_output_scrubbed",
scrub_decision_output(json!({
"action_index": outcome.action_index,
"action_name": outcome.action_name,
"expected_loss": outcome.expected_loss,
"fallback_active": outcome.fallback_active,
"audit_entry": outcome.audit_entry,
}))
);
}
#[test]
fn snapshot_likelihoods_quiescent() {
let snapshot = zero_snapshot();
let likelihoods = SchedulerDecisionContract::snapshot_likelihoods(&snapshot);
assert!(likelihoods[state::HEALTHY] > likelihoods[state::CONGESTED]);
assert!(likelihoods[state::HEALTHY] > likelihoods[state::UNSTABLE]);
assert!(likelihoods[state::HEALTHY] > likelihoods[state::PARTITIONED]);
}
#[test]
fn snapshot_likelihoods_high_cancel_load() {
let mut snapshot = zero_snapshot();
snapshot.cancel_requested_tasks = 50;
snapshot.cancelling_tasks = 20;
let likelihoods = SchedulerDecisionContract::snapshot_likelihoods(&snapshot);
assert!(likelihoods[state::UNSTABLE] > likelihoods[state::HEALTHY]);
}
#[test]
fn snapshot_likelihoods_high_queue_depth() {
let mut snapshot = zero_snapshot();
snapshot.ready_queue_depth = 100;
snapshot.pending_obligations = 30;
let likelihoods = SchedulerDecisionContract::snapshot_likelihoods(&snapshot);
assert!(likelihoods[state::CONGESTED] > likelihoods[state::HEALTHY]);
}
#[test]
fn snapshot_likelihoods_high_deadline_pressure() {
let mut snapshot = zero_snapshot();
snapshot.deadline_pressure = 1.0;
let likelihoods = SchedulerDecisionContract::snapshot_likelihoods(&snapshot);
assert!(likelihoods[state::PARTITIONED] > likelihoods[state::HEALTHY]);
}
#[test]
fn snapshot_likelihoods_deadline_plus_cancel_pressure_prefers_partitioned() {
let mut snapshot = zero_snapshot();
snapshot.deadline_pressure = 1.0;
snapshot.cancel_requested_tasks = 40;
snapshot.cancelling_tasks = 30;
snapshot.draining_regions = 20;
let likelihoods = SchedulerDecisionContract::snapshot_likelihoods(&snapshot);
assert!(likelihoods[state::PARTITIONED] > likelihoods[state::UNSTABLE]);
}
#[test]
fn end_to_end_posterior_update_and_decide() {
let c = SchedulerDecisionContract::new();
let mut posterior = Posterior::uniform(4);
let healthy_likelihoods = [0.8, 0.05, 0.05, 0.1];
for _ in 0..10 {
posterior.bayesian_update(&healthy_likelihoods);
}
let outcome = evaluate(&c, &posterior, &test_ctx(0.95));
assert_eq!(outcome.action_index, action::AGGRESSIVE);
}
#[test]
fn custom_loss_matrix() {
let losses = vec![
1.0, 2.0, 50.0, 5.0, 3.0, 50.0, 8.0, 6.0, 50.0, 10.0, 8.0, 50.0, ];
let c =
SchedulerDecisionContract::with_losses_and_policy(losses, FallbackPolicy::default());
let posterior = Posterior::uniform(4);
let outcome = evaluate(&c, &posterior, &test_ctx(0.95));
assert_ne!(outcome.action_index, action::CONSERVATIVE);
}
#[test]
fn update_posterior_out_of_range_is_noop() {
let c = SchedulerDecisionContract::new();
let mut posterior = Posterior::uniform(state::COUNT);
let before = posterior.probs().to_vec();
c.update_posterior(&mut posterior, state::COUNT + 5);
assert_eq!(posterior.probs(), before.as_slice());
}
#[test]
fn update_posterior_wrong_dimension_is_noop() {
let c = SchedulerDecisionContract::new();
let mut posterior = Posterior::uniform(state::COUNT - 1);
let before = posterior.probs().to_vec();
c.update_posterior(&mut posterior, state::HEALTHY);
assert_eq!(posterior.probs(), before.as_slice());
}
#[test]
fn choose_action_wrong_dimension_uses_fallback() {
let c = SchedulerDecisionContract::new();
let posterior = Posterior::uniform(state::COUNT - 1);
assert_eq!(c.choose_action(&posterior), action::CONSERVATIVE);
}
}