use std::time::{Duration, Instant};
use super::action::{MaintenanceTransition, MeshOsAction};
use super::config::{LocalityConfig, MaintenanceConfig};
use super::event::{ChainId, DaemonIntent, LocalReplicaIntent, NodeId};
use super::maintenance::MaintenanceState;
use super::scheduler::{PlacementScorer, SchedulerConfig};
use super::state::{DaemonLifecycle, DaemonStatus, DesiredState, MeshOsState};
pub const STOP_GRACE_PERIOD: Duration = Duration::from_secs(30);
pub fn reconcile(
actual: &MeshOsState,
desired: &DesiredState,
this_node: NodeId,
locality: &LocalityConfig,
maintenance: &MaintenanceConfig,
scheduler: &SchedulerConfig,
scorer: Option<&dyn PlacementScorer>,
) -> Vec<MeshOsAction> {
let mut actions = Vec::new();
let now = actual.last_tick.unwrap_or_else(Instant::now);
if actual.is_frozen(now) {
return actions;
}
let mut evicted_this_tick: std::collections::HashSet<ChainId> =
std::collections::HashSet::new();
diff_daemons(actual, desired, now, &mut actions);
diff_forced_evictions(actual, this_node, &mut evicted_this_tick, &mut actions);
diff_forced_placements(actual, this_node, &mut actions);
diff_replicas(
actual,
desired,
this_node,
&mut evicted_this_tick,
&mut actions,
);
diff_locality(actual, now, locality, &mut actions);
diff_maintenance(actual, this_node, now, maintenance, &mut actions);
diff_scheduler(
actual,
this_node,
scheduler,
scorer,
now,
&evicted_this_tick,
&mut actions,
);
actions
}
fn diff_daemons(
actual: &MeshOsState,
desired: &DesiredState,
now: Instant,
out: &mut Vec<MeshOsAction>,
) {
for (daemon, intent) in &desired.desired_daemons {
let status = actual.daemons.get(daemon);
match intent {
DaemonIntent::Run => match status.map(|s| s.lifecycle).unwrap_or_default() {
DaemonLifecycle::Running | DaemonLifecycle::Starting => {
}
DaemonLifecycle::Stopping => {
}
DaemonLifecycle::Stopped => {
let admissible = status
.map(|s| s.backoff.state().is_admissible(now))
.unwrap_or(true);
if admissible {
out.push(MeshOsAction::StartDaemon {
daemon: daemon.clone(),
});
} else if let Some(s) = status {
emit_backoff_record_if_needed(
daemon,
s,
actual.applied_backoffs.get(daemon).copied(),
out,
);
}
}
},
DaemonIntent::Stop => match status.map(|s| s.lifecycle).unwrap_or_default() {
DaemonLifecycle::Running | DaemonLifecycle::Starting => {
out.push(MeshOsAction::StopDaemon {
daemon: daemon.clone(),
reason: "desired-state intent: Stop".to_string(),
deadline: now.checked_add(STOP_GRACE_PERIOD).unwrap_or(now),
});
}
DaemonLifecycle::Stopped | DaemonLifecycle::Stopping => {
}
},
}
}
}
fn diff_replicas(
actual: &MeshOsState,
desired: &DesiredState,
this_node: NodeId,
evicted: &mut std::collections::HashSet<ChainId>,
out: &mut Vec<MeshOsAction>,
) {
let mut local_chains: Vec<ChainId> = desired.desired_local_replicas.keys().copied().collect();
local_chains.sort();
for chain in local_chains {
let intent = desired.desired_local_replicas[&chain];
let holds = actual
.replicas
.get(&chain)
.is_some_and(|hs| hs.contains(&this_node));
match (intent, holds) {
(LocalReplicaIntent::Hold, false) => {
if let Some(source) = pick_pull_source(actual, chain, this_node) {
out.push(MeshOsAction::PullReplica { chain, source });
}
}
(LocalReplicaIntent::Drop, true) => {
out.push(MeshOsAction::DropReplica { chain });
}
_ => {}
}
}
let mut count_chains: Vec<ChainId> = desired.desired_replicas.keys().copied().collect();
count_chains.sort();
for chain in count_chains {
let leader = actual.replica_leader.get(&chain).copied();
if leader != Some(this_node) {
continue;
}
let desired_count = desired.desired_replicas[&chain];
let holders = actual.replicas.get(&chain);
let actual_count = holders.map(|h| h.len()).unwrap_or(0) as u32;
if actual_count < desired_count {
out.push(MeshOsAction::RequestPlacement {
chain,
exclude: holders
.map(|hs| hs.iter().copied().collect())
.unwrap_or_default(),
target: None,
});
} else if actual_count > desired_count {
if let Some(victim) = holders.and_then(|hs| hs.iter().next()).copied() {
out.push(MeshOsAction::RequestEviction { chain, victim });
evicted.insert(chain);
}
}
}
}
fn pick_pull_source(actual: &MeshOsState, chain: ChainId, this_node: NodeId) -> Option<NodeId> {
actual
.replicas
.get(&chain)?
.iter()
.copied()
.filter(|h| *h != this_node)
.min()
}
fn diff_forced_evictions(
actual: &MeshOsState,
this_node: NodeId,
evicted: &mut std::collections::HashSet<ChainId>,
out: &mut Vec<MeshOsAction>,
) {
for (chain, victim) in &actual.forced_evictions {
let leader = actual.replica_leader.get(chain).copied();
if leader != Some(this_node) {
continue;
}
if evicted.contains(chain) {
continue;
}
out.push(MeshOsAction::RequestEviction {
chain: *chain,
victim: *victim,
});
evicted.insert(*chain);
}
}
fn diff_forced_placements(actual: &MeshOsState, this_node: NodeId, out: &mut Vec<MeshOsAction>) {
for (chain, target) in &actual.forced_placements {
let leader = actual.replica_leader.get(chain).copied();
if leader != Some(this_node) {
continue;
}
let exclude: Vec<NodeId> = actual
.replicas
.get(chain)
.map(|hs| hs.iter().copied().filter(|n| n != target).collect())
.unwrap_or_default();
out.push(MeshOsAction::RequestPlacement {
chain: *chain,
exclude,
target: Some(*target),
});
}
}
fn diff_locality(
actual: &MeshOsState,
now: Instant,
locality: &LocalityConfig,
out: &mut Vec<MeshOsAction>,
) {
let _ = now;
let mut peers: Vec<(NodeId, Duration)> = actual
.rtt
.iter()
.filter(|(_, rtt)| **rtt > locality.degraded_rtt_threshold)
.map(|(peer, rtt)| (*peer, *rtt))
.collect();
peers.sort_by_key(|(peer, _)| *peer);
for (peer, rtt) in peers {
if actual.avoid_list.contains_key(&peer) {
continue;
}
out.push(MeshOsAction::MarkAvoid {
peer,
reason: format!("rtt-degradation: {} ms", rtt.as_millis()),
ttl: locality.avoid_ttl,
});
}
}
fn diff_maintenance(
actual: &MeshOsState,
this_node: NodeId,
now: Instant,
config: &MaintenanceConfig,
out: &mut Vec<MeshOsAction>,
) {
let target = match &actual.local_maintenance {
MaintenanceState::Active => None,
MaintenanceState::EnteringMaintenance {
since, deadline, ..
} => {
if all_replicas_drained_locally(actual, this_node) && all_daemons_stopped(actual) {
Some(MaintenanceTransition::Maintenance)
} else if deadline.map(|d| now >= d).unwrap_or(false) {
let elapsed_ms = now.saturating_duration_since(*since).as_millis();
Some(MaintenanceTransition::DrainFailed {
reason: format!("drain deadline elapsed after {elapsed_ms}ms"),
})
} else {
None
}
}
MaintenanceState::Maintenance { .. } => None,
MaintenanceState::ExitingMaintenance { .. } => {
if all_daemons_healthy(actual) {
Some(MaintenanceTransition::Recovery)
} else {
None
}
}
MaintenanceState::DrainFailed { .. } => None,
MaintenanceState::Recovery { since } => {
if now.saturating_duration_since(*since) >= config.recovery_ramp_window {
Some(MaintenanceTransition::Active)
} else {
None
}
}
};
if let Some(target) = target {
out.push(MeshOsAction::CommitMaintenanceTransition {
node: this_node,
target,
});
}
}
fn diff_scheduler(
actual: &MeshOsState,
this_node: NodeId,
config: &SchedulerConfig,
scorer: Option<&dyn PlacementScorer>,
now: Instant,
evicted_this_tick: &std::collections::HashSet<ChainId>,
out: &mut Vec<MeshOsAction>,
) {
let Some(scorer) = scorer else {
return;
};
let mut chains: Vec<ChainId> = actual.replicas.keys().copied().collect();
chains.sort();
for chain in chains {
if evicted_this_tick.contains(&chain) {
continue;
}
let leader = actual.replica_leader.get(&chain).copied();
if leader != Some(this_node) {
continue;
}
if let Some(last) = actual.last_rebalance.get(&chain) {
if now.saturating_duration_since(*last) < config.cooldown {
continue;
}
}
let holders: Vec<NodeId> = match actual.replicas.get(&chain) {
Some(h) if !h.is_empty() => h.iter().copied().collect(),
_ => continue,
};
let mut worst: Option<(NodeId, f32)> = None;
for &h in &holders {
let Some(score) = scorer.score(chain, h) else {
continue;
};
worst = match worst {
None => Some((h, score)),
Some((_, ws)) if score < ws => Some((h, score)),
_ => worst,
};
}
let Some((victim, victim_score)) = worst else {
continue;
};
if victim_score >= config.score_floor {
continue;
}
let Some((_alt_node, alt_score)) = scorer.best_alternative(chain, &holders) else {
continue;
};
if alt_score - victim_score <= config.hysteresis_gap {
continue;
}
out.push(MeshOsAction::RequestEviction { chain, victim });
}
}
fn all_replicas_drained_locally(actual: &MeshOsState, this_node: NodeId) -> bool {
actual
.replicas
.values()
.all(|holders| !holders.contains(&this_node))
}
fn all_daemons_stopped(actual: &MeshOsState) -> bool {
actual
.daemons
.values()
.all(|s| matches!(s.lifecycle, DaemonLifecycle::Stopped))
}
fn all_daemons_healthy(actual: &MeshOsState) -> bool {
use super::event::DaemonHealth;
actual.daemons.values().all(|s| {
matches!(s.lifecycle, DaemonLifecycle::Running)
&& matches!(s.health, Some(DaemonHealth::Healthy) | None)
})
}
fn emit_backoff_record_if_needed(
daemon: &super::event::DaemonRef,
status: &DaemonStatus,
last_applied: Option<Instant>,
out: &mut Vec<MeshOsAction>,
) {
if let Some(until) = status.backoff.state().release_at() {
if last_applied == Some(until) {
return;
}
out.push(MeshOsAction::ApplyBackoff {
daemon: daemon.clone(),
until,
});
}
}
#[cfg(test)]
#[allow(clippy::field_reassign_with_default)]
mod tests {
use std::time::Duration;
use super::super::event::{ChainId, DaemonRef, NodeId};
use super::super::state::{AvoidEntry, BlobObservation, DaemonStatus};
use super::super::supervision::RestartState;
use super::*;
const THIS_NODE: NodeId = 100;
fn daemon(name: &str, id: u64) -> DaemonRef {
DaemonRef {
id,
name: name.into(),
}
}
fn at(base: Instant, secs: u64) -> Instant {
base + Duration::from_secs(secs)
}
fn anchor() -> Instant {
Instant::now()
}
#[test]
fn reconcile_empty_inputs_returns_no_actions() {
let actual = MeshOsState::default();
let desired = DesiredState::default();
assert!(reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
)
.is_empty());
}
#[test]
fn reconcile_is_idempotent_under_repeated_calls() {
let actual = MeshOsState::default();
let desired = DesiredState::default();
let first = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
let second = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(first, second);
}
#[test]
fn reconcile_with_no_daemon_intent_emits_nothing_even_with_state() {
let mut actual = MeshOsState::default();
actual
.daemons
.insert(daemon("telemetry", 1), DaemonStatus::default());
actual.replicas.insert(
0xCAFE_BABE as ChainId,
::std::collections::BTreeSet::from([1, 2, 3]),
);
actual.blobs.insert(
42,
BlobObservation {
size_bytes: 1024,
holders: vec![1],
},
);
actual.avoid_list.insert(
7,
AvoidEntry {
reason: "rtt".into(),
until: Instant::now() + Duration::from_secs(60),
},
);
let desired = DesiredState::default();
assert!(reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
)
.is_empty());
}
#[test]
fn desired_run_with_stopped_actual_emits_start_daemon() {
let mut actual = MeshOsState::default();
let d = daemon("telemetry", 1);
actual.daemons.insert(d.clone(), DaemonStatus::default()); let mut desired = DesiredState::default();
desired.desired_daemons.insert(d.clone(), DaemonIntent::Run);
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(actions, vec![MeshOsAction::StartDaemon { daemon: d }],);
}
#[test]
fn desired_run_when_daemon_absent_emits_start_daemon() {
let actual = MeshOsState::default();
let mut desired = DesiredState::default();
let d = daemon("telemetry", 1);
desired.desired_daemons.insert(d.clone(), DaemonIntent::Run);
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(actions, vec![MeshOsAction::StartDaemon { daemon: d }]);
}
#[test]
fn desired_run_with_running_actual_emits_nothing() {
let mut actual = MeshOsState::default();
let d = daemon("telemetry", 1);
let mut status = DaemonStatus::default();
status.lifecycle = DaemonLifecycle::Running;
actual.daemons.insert(d.clone(), status);
let mut desired = DesiredState::default();
desired.desired_daemons.insert(d, DaemonIntent::Run);
assert!(reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
)
.is_empty());
}
#[test]
fn desired_stop_with_running_actual_emits_stop_daemon_with_grace_window() {
let mut actual = MeshOsState::default();
let base = anchor();
actual.last_tick = Some(base);
let d = daemon("telemetry", 1);
let mut status = DaemonStatus::default();
status.lifecycle = DaemonLifecycle::Running;
actual.daemons.insert(d.clone(), status);
let mut desired = DesiredState::default();
desired
.desired_daemons
.insert(d.clone(), DaemonIntent::Stop);
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
match actions.as_slice() {
[MeshOsAction::StopDaemon {
daemon: d2,
deadline,
..
}] => {
assert_eq!(d2, &d);
assert_eq!(*deadline, base + STOP_GRACE_PERIOD);
}
other => panic!("expected one StopDaemon, got {other:?}"),
}
}
#[test]
fn desired_stop_with_stopped_actual_emits_nothing() {
let mut actual = MeshOsState::default();
let d = daemon("telemetry", 1);
actual.daemons.insert(d.clone(), DaemonStatus::default());
let mut desired = DesiredState::default();
desired.desired_daemons.insert(d, DaemonIntent::Stop);
assert!(reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
)
.is_empty());
}
#[test]
fn backoff_active_gates_start_daemon_emission() {
let base = anchor();
let mut actual = MeshOsState::default();
actual.last_tick = Some(base);
let d = daemon("telemetry", 1);
let mut status = DaemonStatus::default();
status.backoff.observe_crash(base);
assert!(matches!(
status.backoff.state(),
RestartState::BackingOff { .. }
));
actual.daemons.insert(d.clone(), status);
let mut desired = DesiredState::default();
desired.desired_daemons.insert(d.clone(), DaemonIntent::Run);
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
match actions.as_slice() {
[MeshOsAction::ApplyBackoff { daemon: d2, until }] => {
assert_eq!(d2, &d);
assert_eq!(*until, base + Duration::from_millis(500));
}
other => panic!("expected ApplyBackoff while gated, got {other:?}"),
}
}
#[test]
fn backoff_release_after_until_unblocks_start_daemon() {
let base = anchor();
let mut actual = MeshOsState::default();
actual.last_tick = Some(at(base, 60)); let d = daemon("telemetry", 1);
let mut status = DaemonStatus::default();
status.backoff.observe_crash(base);
status.backoff.maybe_release(at(base, 60));
actual.daemons.insert(d.clone(), status);
let mut desired = DesiredState::default();
desired.desired_daemons.insert(d.clone(), DaemonIntent::Run);
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(actions, vec![MeshOsAction::StartDaemon { daemon: d }]);
}
#[test]
fn crash_loop_gate_blocks_start_daemon_emission_under_threshold_crashes() {
let base = anchor();
let mut actual = MeshOsState::default();
actual.last_tick = Some(at(base, 1)); let d = daemon("telemetry", 1);
let mut status = DaemonStatus::default();
for i in 0..5 {
status.backoff.observe_crash(at(base, i));
}
assert!(matches!(
status.backoff.state(),
RestartState::CrashLooping { .. }
));
actual.daemons.insert(d.clone(), status);
let mut desired = DesiredState::default();
desired.desired_daemons.insert(d.clone(), DaemonIntent::Run);
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
match actions.as_slice() {
[MeshOsAction::ApplyBackoff { daemon: d2, .. }] => assert_eq!(d2, &d),
other => panic!("expected ApplyBackoff under crash-loop gate, got {other:?}"),
}
}
#[test]
fn apply_backoff_is_not_re_emitted_after_the_loop_records_it() {
let base = anchor();
let mut actual = MeshOsState::default();
actual.last_tick = Some(at(base, 1));
let d = daemon("telemetry", 1);
let mut status = DaemonStatus::default();
for i in 0..5 {
status.backoff.observe_crash(at(base, i));
}
let until = status
.backoff
.state()
.release_at()
.expect("crash-looping state carries a release_at");
actual.daemons.insert(d.clone(), status);
let mut desired = DesiredState::default();
desired.desired_daemons.insert(d.clone(), DaemonIntent::Run);
let first = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(first.len(), 1, "first pass should emit ApplyBackoff");
actual.applied_backoffs.insert(d.clone(), until);
let second = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert!(
second.is_empty(),
"second pass within the same backoff window must not re-emit ApplyBackoff",
);
}
#[test]
fn reconcile_emits_actions_in_a_stable_order_across_calls() {
let mut actual = MeshOsState::default();
let d1 = daemon("a", 1);
let d2 = daemon("b", 2);
actual.daemons.insert(d1.clone(), DaemonStatus::default());
actual.daemons.insert(d2.clone(), DaemonStatus::default());
let mut desired = DesiredState::default();
desired.desired_daemons.insert(d1, DaemonIntent::Run);
desired.desired_daemons.insert(d2, DaemonIntent::Run);
let a = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
let b = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(a, b);
assert_eq!(a.len(), 2);
}
const CHAIN_A: ChainId = 0xAA;
const CHAIN_B: ChainId = 0xBB;
#[test]
fn local_intent_hold_when_not_a_holder_emits_pull_replica() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([1, 2, 3]));
let mut desired = DesiredState::default();
desired
.desired_local_replicas
.insert(CHAIN_A, LocalReplicaIntent::Hold);
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(
actions,
vec![MeshOsAction::PullReplica {
chain: CHAIN_A,
source: 1, }],
);
}
#[test]
fn local_intent_hold_when_already_a_holder_emits_nothing() {
let mut actual = MeshOsState::default();
actual.replicas.insert(
CHAIN_A,
::std::collections::BTreeSet::from([1, 2, THIS_NODE]),
);
let mut desired = DesiredState::default();
desired
.desired_local_replicas
.insert(CHAIN_A, LocalReplicaIntent::Hold);
assert!(reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
)
.is_empty());
}
#[test]
fn local_intent_drop_when_actually_holding_emits_drop_replica() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([1, THIS_NODE]));
let mut desired = DesiredState::default();
desired
.desired_local_replicas
.insert(CHAIN_A, LocalReplicaIntent::Drop);
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(actions, vec![MeshOsAction::DropReplica { chain: CHAIN_A }]);
}
#[test]
fn local_intent_drop_when_not_holding_emits_nothing() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([1, 2]));
let mut desired = DesiredState::default();
desired
.desired_local_replicas
.insert(CHAIN_A, LocalReplicaIntent::Drop);
assert!(reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
)
.is_empty());
}
#[test]
fn pull_replica_skipped_when_no_other_holder_known() {
let actual = MeshOsState::default();
let mut desired = DesiredState::default();
desired
.desired_local_replicas
.insert(CHAIN_A, LocalReplicaIntent::Hold);
assert!(reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
)
.is_empty());
}
#[test]
fn leader_with_undercount_emits_request_placement() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([1, 2]));
actual.replica_leader.insert(CHAIN_A, THIS_NODE);
let mut desired = DesiredState::default();
desired.desired_replicas.insert(CHAIN_A, 4);
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(
actions,
vec![MeshOsAction::RequestPlacement {
chain: CHAIN_A,
exclude: vec![1, 2],
target: None,
}],
);
}
#[test]
fn leader_with_overcount_emits_request_eviction_lex_smallest_victim() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([5, 2, 9]));
actual.replica_leader.insert(CHAIN_A, THIS_NODE);
let mut desired = DesiredState::default();
desired.desired_replicas.insert(CHAIN_A, 2);
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(
actions,
vec![MeshOsAction::RequestEviction {
chain: CHAIN_A,
victim: 2,
}],
);
}
#[test]
fn non_leader_does_not_emit_request_placement_even_under_undercount() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([1, 2]));
actual.replica_leader.insert(CHAIN_A, 999); let mut desired = DesiredState::default();
desired.desired_replicas.insert(CHAIN_A, 4);
assert!(reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
)
.is_empty());
}
#[test]
fn no_known_leader_silences_request_actions() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([1]));
let mut desired = DesiredState::default();
desired.desired_replicas.insert(CHAIN_A, 3);
assert!(reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
)
.is_empty());
}
#[test]
fn leader_at_exact_count_emits_nothing() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([1, 2, 3]));
actual.replica_leader.insert(CHAIN_A, THIS_NODE);
let mut desired = DesiredState::default();
desired.desired_replicas.insert(CHAIN_A, 3);
assert!(reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
)
.is_empty());
}
#[test]
fn rtt_above_threshold_emits_mark_avoid_once() {
let mut actual = MeshOsState::default();
actual.rtt.insert(42, Duration::from_millis(500));
let desired = DesiredState::default();
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
match actions.as_slice() {
[MeshOsAction::MarkAvoid { peer, ttl, .. }] => {
assert_eq!(*peer, 42);
assert_eq!(*ttl, LocalityConfig::default().avoid_ttl);
}
other => panic!("expected one MarkAvoid, got {other:?}"),
}
}
#[test]
fn rtt_below_threshold_emits_nothing() {
let mut actual = MeshOsState::default();
actual.rtt.insert(42, Duration::from_millis(100));
let desired = DesiredState::default();
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert!(actions.is_empty());
}
#[test]
fn mark_avoid_is_idempotent_when_peer_already_on_avoid_list() {
let mut actual = MeshOsState::default();
actual.rtt.insert(42, Duration::from_millis(500));
actual.avoid_list.insert(
42,
AvoidEntry {
reason: "earlier".into(),
until: Instant::now() + Duration::from_secs(60),
},
);
let desired = DesiredState::default();
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert!(
actions.is_empty(),
"MarkAvoid duplicated for already-avoided peer: {actions:?}"
);
}
#[test]
fn mark_avoid_emission_is_sorted_by_peer_id_for_stability() {
let mut actual = MeshOsState::default();
actual.rtt.insert(7, Duration::from_millis(500));
actual.rtt.insert(3, Duration::from_millis(500));
actual.rtt.insert(11, Duration::from_millis(500));
let desired = DesiredState::default();
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
let peers: Vec<NodeId> = actions
.iter()
.map(|a| match a {
MeshOsAction::MarkAvoid { peer, .. } => *peer,
_ => unreachable!(),
})
.collect();
assert_eq!(peers, vec![3, 7, 11]);
}
#[test]
fn drop_replicas_admin_event_projects_into_drop_intent() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([THIS_NODE, 1]));
actual
.replicas
.insert(CHAIN_B, ::std::collections::BTreeSet::from([THIS_NODE]));
let mut desired = DesiredState::default();
desired.apply_admin(
&super::super::event::AdminEvent::DropReplicas {
node: THIS_NODE,
chains: vec![CHAIN_A, CHAIN_B],
},
THIS_NODE,
);
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(
actions,
vec![
MeshOsAction::DropReplica { chain: CHAIN_A },
MeshOsAction::DropReplica { chain: CHAIN_B },
],
);
}
#[test]
fn drop_replicas_admin_event_targeted_at_other_node_is_a_noop_locally() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([THIS_NODE, 1]));
let mut desired = DesiredState::default();
desired.apply_admin(
&super::super::event::AdminEvent::DropReplicas {
node: 999, chains: vec![CHAIN_A],
},
THIS_NODE,
);
assert!(reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
)
.is_empty());
}
#[test]
fn custom_locality_threshold_overrides_default() {
let mut actual = MeshOsState::default();
actual.rtt.insert(42, Duration::from_millis(150));
let locality = LocalityConfig {
degraded_rtt_threshold: Duration::from_millis(100),
avoid_ttl: Duration::from_secs(60),
};
let desired = DesiredState::default();
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&locality,
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
match actions.as_slice() {
[MeshOsAction::MarkAvoid { peer, ttl, .. }] => {
assert_eq!(*peer, 42);
assert_eq!(*ttl, Duration::from_secs(60));
}
other => panic!("expected one MarkAvoid under tightened threshold, got {other:?}"),
}
}
#[test]
fn active_state_emits_no_maintenance_transition() {
let actual = MeshOsState::default(); let desired = DesiredState::default();
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert!(actions.is_empty());
}
#[test]
fn entering_with_drained_replicas_and_stopped_daemons_emits_maintenance_transition() {
let base = anchor();
let mut actual = MeshOsState::default();
actual.last_tick = Some(base);
actual.local_maintenance = MaintenanceState::EnteringMaintenance {
since: base,
deadline: None,
};
let desired = DesiredState::default();
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(
actions,
vec![MeshOsAction::CommitMaintenanceTransition {
node: THIS_NODE,
target: MaintenanceTransition::Maintenance,
}],
);
}
#[test]
fn entering_with_remaining_replicas_does_not_transition_to_maintenance() {
let base = anchor();
let mut actual = MeshOsState::default();
actual.last_tick = Some(base);
actual.local_maintenance = MaintenanceState::EnteringMaintenance {
since: base,
deadline: None,
};
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([THIS_NODE]));
let desired = DesiredState::default();
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert!(actions.is_empty());
}
#[test]
fn entering_with_running_daemon_does_not_transition_to_maintenance() {
let base = anchor();
let mut actual = MeshOsState::default();
actual.last_tick = Some(base);
actual.local_maintenance = MaintenanceState::EnteringMaintenance {
since: base,
deadline: None,
};
let mut status = DaemonStatus::default();
status.lifecycle = DaemonLifecycle::Running;
actual.daemons.insert(daemon("telemetry", 1), status);
let desired = DesiredState::default();
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert!(actions.is_empty());
}
#[test]
fn entering_past_deadline_with_conditions_unmet_transitions_to_drain_failed() {
let base = anchor();
let deadline = base + Duration::from_secs(60);
let mut actual = MeshOsState::default();
actual.last_tick = Some(deadline + Duration::from_secs(1)); actual.local_maintenance = MaintenanceState::EnteringMaintenance {
since: base,
deadline: Some(deadline),
};
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([THIS_NODE]));
let desired = DesiredState::default();
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(
actions.len(),
1,
"expected exactly one action; got {actions:?}"
);
match &actions[0] {
MeshOsAction::CommitMaintenanceTransition {
node,
target: MaintenanceTransition::DrainFailed { reason },
} => {
assert_eq!(*node, THIS_NODE);
assert!(
reason.contains("drain deadline elapsed"),
"expected drain deadline reason; got {reason:?}",
);
}
other => panic!("expected DrainFailed transition; got {other:?}"),
}
}
#[test]
fn entering_past_deadline_with_conditions_met_prefers_maintenance_over_drain_failed() {
let base = anchor();
let deadline = base + Duration::from_secs(60);
let mut actual = MeshOsState::default();
actual.last_tick = Some(deadline + Duration::from_secs(1));
actual.local_maintenance = MaintenanceState::EnteringMaintenance {
since: base,
deadline: Some(deadline),
};
let desired = DesiredState::default();
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(
actions,
vec![MeshOsAction::CommitMaintenanceTransition {
node: THIS_NODE,
target: MaintenanceTransition::Maintenance,
}],
);
}
#[test]
fn maintenance_steady_state_emits_no_transition() {
let base = anchor();
let mut actual = MeshOsState::default();
actual.last_tick = Some(base);
actual.local_maintenance = MaintenanceState::Maintenance { since: base };
let desired = DesiredState::default();
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert!(actions.is_empty());
}
#[test]
fn exiting_with_healthy_daemons_emits_recovery_transition() {
let base = anchor();
let mut actual = MeshOsState::default();
actual.last_tick = Some(base);
actual.local_maintenance = MaintenanceState::ExitingMaintenance { since: base };
let mut status = DaemonStatus::default();
status.lifecycle = DaemonLifecycle::Running;
actual.daemons.insert(daemon("telemetry", 1), status);
let desired = DesiredState::default();
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(
actions,
vec![MeshOsAction::CommitMaintenanceTransition {
node: THIS_NODE,
target: MaintenanceTransition::Recovery,
}],
);
}
#[test]
fn recovery_before_ramp_window_elapses_emits_nothing() {
let base = anchor();
let mut actual = MeshOsState::default();
actual.last_tick = Some(base + Duration::from_secs(60)); actual.local_maintenance = MaintenanceState::Recovery { since: base };
let desired = DesiredState::default();
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert!(actions.is_empty());
}
#[test]
fn recovery_after_ramp_window_elapses_emits_active_transition() {
let base = anchor();
let mut actual = MeshOsState::default();
actual.last_tick = Some(base + Duration::from_secs(6 * 60)); actual.local_maintenance = MaintenanceState::Recovery { since: base };
let desired = DesiredState::default();
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(
actions,
vec![MeshOsAction::CommitMaintenanceTransition {
node: THIS_NODE,
target: MaintenanceTransition::Active,
}],
);
}
#[test]
fn drain_failed_emits_no_transition_until_operator_intervention() {
let base = anchor();
let mut actual = MeshOsState::default();
actual.last_tick = Some(base);
actual.local_maintenance = MaintenanceState::DrainFailed {
since: base,
reason: "deadline elapsed".into(),
};
let desired = DesiredState::default();
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert!(actions.is_empty());
}
struct FixedScorer {
scores: std::collections::HashMap<(ChainId, NodeId), f32>,
alternatives: std::collections::HashMap<ChainId, (NodeId, f32)>,
}
impl super::super::scheduler::PlacementScorer for FixedScorer {
fn score(&self, chain: ChainId, node: NodeId) -> Option<f32> {
self.scores.get(&(chain, node)).copied()
}
fn best_alternative(&self, chain: ChainId, exclude: &[NodeId]) -> Option<(NodeId, f32)> {
let (n, s) = self.alternatives.get(&chain).copied()?;
if exclude.contains(&n) {
None
} else {
Some((n, s))
}
}
}
fn scheduler_call(
actual: &MeshOsState,
scorer: Option<&dyn super::super::scheduler::PlacementScorer>,
) -> Vec<MeshOsAction> {
reconcile(
actual,
&DesiredState::default(),
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
scorer,
)
}
#[test]
fn no_scorer_yields_no_scheduler_actions() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([THIS_NODE]));
actual.replica_leader.insert(CHAIN_A, THIS_NODE);
let actions = scheduler_call(&actual, None);
assert!(actions.is_empty());
}
#[test]
fn high_score_holder_is_not_rebalanced() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([THIS_NODE]));
actual.replica_leader.insert(CHAIN_A, THIS_NODE);
let scorer = FixedScorer {
scores: [((CHAIN_A, THIS_NODE), 0.9)].into_iter().collect(),
alternatives: [(CHAIN_A, (5, 0.95))].into_iter().collect(),
};
let actions = scheduler_call(&actual, Some(&scorer));
assert!(
actions.is_empty(),
"high-scoring holder should not rebalance"
);
}
#[test]
fn under_scoring_holder_emits_request_eviction_when_alternative_exceeds_hysteresis() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([THIS_NODE]));
actual.replica_leader.insert(CHAIN_A, THIS_NODE);
let scorer = FixedScorer {
scores: [((CHAIN_A, THIS_NODE), 0.3)].into_iter().collect(),
alternatives: [(CHAIN_A, (5, 0.9))].into_iter().collect(),
};
let actions = scheduler_call(&actual, Some(&scorer));
assert_eq!(
actions,
vec![MeshOsAction::RequestEviction {
chain: CHAIN_A,
victim: THIS_NODE,
}],
);
}
#[test]
fn under_scoring_holder_with_no_alternative_does_not_emit() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([THIS_NODE]));
actual.replica_leader.insert(CHAIN_A, THIS_NODE);
let scorer = FixedScorer {
scores: [((CHAIN_A, THIS_NODE), 0.3)].into_iter().collect(),
alternatives: Default::default(), };
assert!(scheduler_call(&actual, Some(&scorer)).is_empty());
}
#[test]
fn under_scoring_holder_with_small_improvement_is_blocked_by_hysteresis() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([THIS_NODE]));
actual.replica_leader.insert(CHAIN_A, THIS_NODE);
let scorer = FixedScorer {
scores: [((CHAIN_A, THIS_NODE), 0.3)].into_iter().collect(),
alternatives: [(CHAIN_A, (5, 0.4))].into_iter().collect(),
};
assert!(scheduler_call(&actual, Some(&scorer)).is_empty());
}
#[test]
fn non_leader_does_not_emit_eviction_even_for_under_scoring_chain() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([THIS_NODE]));
actual.replica_leader.insert(CHAIN_A, 999); let scorer = FixedScorer {
scores: [((CHAIN_A, THIS_NODE), 0.3)].into_iter().collect(),
alternatives: [(CHAIN_A, (5, 0.9))].into_iter().collect(),
};
assert!(scheduler_call(&actual, Some(&scorer)).is_empty());
}
#[test]
fn cooldown_blocks_rebalance_within_window() {
let base = anchor();
let mut actual = MeshOsState::default();
actual.last_tick = Some(base + Duration::from_secs(60)); actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([THIS_NODE]));
actual.replica_leader.insert(CHAIN_A, THIS_NODE);
actual.last_rebalance.insert(CHAIN_A, base);
let scorer = FixedScorer {
scores: [((CHAIN_A, THIS_NODE), 0.3)].into_iter().collect(),
alternatives: [(CHAIN_A, (5, 0.9))].into_iter().collect(),
};
assert!(scheduler_call(&actual, Some(&scorer)).is_empty());
}
#[test]
fn cooldown_releases_after_window_elapses() {
let base = anchor();
let mut actual = MeshOsState::default();
actual.last_tick = Some(base + Duration::from_secs(6 * 60)); actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([THIS_NODE]));
actual.replica_leader.insert(CHAIN_A, THIS_NODE);
actual.last_rebalance.insert(CHAIN_A, base);
let scorer = FixedScorer {
scores: [((CHAIN_A, THIS_NODE), 0.3)].into_iter().collect(),
alternatives: [(CHAIN_A, (5, 0.9))].into_iter().collect(),
};
let actions = scheduler_call(&actual, Some(&scorer));
assert_eq!(actions.len(), 1);
}
#[test]
fn phase_c_overcount_eviction_suppresses_phase_d1_eviction_for_same_chain() {
let base = anchor();
let mut actual = MeshOsState::default();
actual.last_tick = Some(base);
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([THIS_NODE, 99]));
actual.replica_leader.insert(CHAIN_A, THIS_NODE);
let mut desired = DesiredState::default();
desired.desired_replicas.insert(CHAIN_A, 1);
let scorer = FixedScorer {
scores: [((CHAIN_A, THIS_NODE), 0.1), ((CHAIN_A, 99), 0.1)]
.into_iter()
.collect(),
alternatives: [(CHAIN_A, (5, 0.9))].into_iter().collect(),
};
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
Some(&scorer),
);
let evictions: Vec<_> = actions
.iter()
.filter(|a| matches!(a, MeshOsAction::RequestEviction { .. }))
.collect();
assert_eq!(
evictions.len(),
1,
"phase C and phase D-1 must not both evict the same chain in one tick",
);
}
#[test]
fn scheduler_eviction_is_idempotent_when_loop_writes_back_last_rebalance() {
let base = anchor();
let mut actual = MeshOsState::default();
actual.last_tick = Some(base);
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([THIS_NODE]));
actual.replica_leader.insert(CHAIN_A, THIS_NODE);
let scorer = FixedScorer {
scores: [((CHAIN_A, THIS_NODE), 0.3)].into_iter().collect(),
alternatives: [(CHAIN_A, (5, 0.9))].into_iter().collect(),
};
let first = scheduler_call(&actual, Some(&scorer));
assert_eq!(
first,
vec![MeshOsAction::RequestEviction {
chain: CHAIN_A,
victim: THIS_NODE,
}],
);
actual.last_rebalance.insert(CHAIN_A, base);
let second = scheduler_call(&actual, Some(&scorer));
assert!(
second.is_empty(),
"second reconcile within cooldown must not re-emit",
);
}
#[test]
fn worst_holder_is_picked_as_victim() {
let mut actual = MeshOsState::default();
actual.replicas.insert(
CHAIN_A,
::std::collections::BTreeSet::from([THIS_NODE, 11, 12]),
);
actual.replica_leader.insert(CHAIN_A, THIS_NODE);
let scorer = FixedScorer {
scores: [
((CHAIN_A, THIS_NODE), 0.6),
((CHAIN_A, 11), 0.3), ((CHAIN_A, 12), 0.7),
]
.into_iter()
.collect(),
alternatives: [(CHAIN_A, (5, 0.9))].into_iter().collect(),
};
match scheduler_call(&actual, Some(&scorer)).as_slice() {
[MeshOsAction::RequestEviction { chain, victim }] => {
assert_eq!(*chain, CHAIN_A);
assert_eq!(*victim, 11);
}
other => panic!("expected one RequestEviction(11), got {other:?}"),
}
}
#[test]
fn scheduler_emits_actions_in_chain_id_sorted_order() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_B, ::std::collections::BTreeSet::from([THIS_NODE]));
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([THIS_NODE]));
actual.replica_leader.insert(CHAIN_A, THIS_NODE);
actual.replica_leader.insert(CHAIN_B, THIS_NODE);
let scorer = FixedScorer {
scores: [((CHAIN_A, THIS_NODE), 0.3), ((CHAIN_B, THIS_NODE), 0.3)]
.into_iter()
.collect(),
alternatives: [(CHAIN_A, (5, 0.9)), (CHAIN_B, (6, 0.9))]
.into_iter()
.collect(),
};
let actions = scheduler_call(&actual, Some(&scorer));
let chains: Vec<ChainId> = actions
.iter()
.map(|a| match a {
MeshOsAction::RequestEviction { chain, .. } => *chain,
_ => unreachable!(),
})
.collect();
assert_eq!(chains, vec![CHAIN_A, CHAIN_B]);
}
#[test]
fn reconcile_replica_actions_are_sorted_by_chain_id_for_stability() {
let mut actual = MeshOsState::default();
actual
.replicas
.insert(CHAIN_A, ::std::collections::BTreeSet::from([1]));
actual
.replicas
.insert(CHAIN_B, ::std::collections::BTreeSet::from([1]));
actual.replica_leader.insert(CHAIN_A, THIS_NODE);
actual.replica_leader.insert(CHAIN_B, THIS_NODE);
let mut desired = DesiredState::default();
desired.desired_replicas.insert(CHAIN_A, 3);
desired.desired_replicas.insert(CHAIN_B, 3);
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(actions.len(), 2);
match (&actions[0], &actions[1]) {
(
MeshOsAction::RequestPlacement { chain: c1, .. },
MeshOsAction::RequestPlacement { chain: c2, .. },
) => {
assert_eq!(*c1, CHAIN_A);
assert_eq!(*c2, CHAIN_B);
}
other => panic!("expected two RequestPlacement actions in chain order, got {other:?}"),
}
}
#[test]
fn freeze_suppresses_action_emission_that_would_otherwise_fire() {
let mut actual = MeshOsState::default();
actual.last_tick = Some(Instant::now());
let d = daemon("frozen-daemon", 1);
actual.daemons.insert(d.clone(), DaemonStatus::default());
let mut desired = DesiredState::default();
desired.desired_daemons.insert(d.clone(), DaemonIntent::Run);
let baseline = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(baseline, vec![MeshOsAction::StartDaemon { daemon: d }]);
actual.freeze_until = Some(actual.last_tick.unwrap() + Duration::from_secs(30));
let frozen = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert!(
frozen.is_empty(),
"freeze should suppress reconcile output, got {frozen:?}",
);
}
#[test]
fn forced_placement_emits_request_placement_with_target_pinned_on_leader() {
const CHAIN: ChainId = 0xC0FFEE;
let mut actual = MeshOsState::default();
actual.last_tick = Some(Instant::now());
actual
.replicas
.insert(CHAIN, ::std::collections::BTreeSet::from([THIS_NODE, 99]));
actual.replica_leader.insert(CHAIN, THIS_NODE);
actual.forced_placements.push((CHAIN, 42));
let actions = reconcile(
&actual,
&DesiredState::default(),
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
let mut placements: Vec<_> = actions
.iter()
.filter(|a| matches!(a, MeshOsAction::RequestPlacement { .. }))
.collect();
assert_eq!(placements.len(), 1);
let placement = placements.remove(0);
match placement {
MeshOsAction::RequestPlacement {
chain,
exclude,
target,
} => {
assert_eq!(*chain, CHAIN);
assert_eq!(*target, Some(42));
assert!(exclude.contains(&THIS_NODE));
assert!(exclude.contains(&99));
assert!(!exclude.contains(&42));
}
other => panic!("expected RequestPlacement, got {other:?}"),
}
}
#[test]
fn forced_placement_does_not_emit_on_non_leader_nodes() {
const CHAIN: ChainId = 0xCAFE;
let mut actual = MeshOsState::default();
actual.last_tick = Some(Instant::now());
actual.replica_leader.insert(CHAIN, 999);
actual.forced_placements.push((CHAIN, 7));
let actions = reconcile(
&actual,
&DesiredState::default(),
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert!(
!actions
.iter()
.any(|a| matches!(a, MeshOsAction::RequestPlacement { .. })),
"non-leader should not emit forced placement, got {actions:?}",
);
}
#[test]
fn forced_eviction_emits_request_eviction_on_the_leader() {
const CHAIN: ChainId = 0xC0FFEE;
let mut actual = MeshOsState::default();
actual.last_tick = Some(Instant::now());
actual
.replicas
.insert(CHAIN, ::std::collections::BTreeSet::from([THIS_NODE, 99]));
actual.replica_leader.insert(CHAIN, THIS_NODE);
actual.forced_evictions.push((CHAIN, 99));
let actions = reconcile(
&actual,
&DesiredState::default(),
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(
actions,
vec![MeshOsAction::RequestEviction {
chain: CHAIN,
victim: 99,
}],
);
}
#[test]
fn forced_eviction_does_not_emit_on_non_leader_nodes() {
const CHAIN: ChainId = 0xCAFE;
let mut actual = MeshOsState::default();
actual.last_tick = Some(Instant::now());
actual
.replicas
.insert(CHAIN, ::std::collections::BTreeSet::from([THIS_NODE, 7]));
actual.replica_leader.insert(CHAIN, 999);
actual.forced_evictions.push((CHAIN, 7));
let actions = reconcile(
&actual,
&DesiredState::default(),
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert!(
actions.is_empty(),
"non-leader should not emit, got {actions:?}"
);
}
#[test]
fn forced_eviction_bypasses_scheduler_cooldown() {
const CHAIN: ChainId = 0xDEAD;
let mut actual = MeshOsState::default();
let now = Instant::now();
actual.last_tick = Some(now);
actual
.replicas
.insert(CHAIN, ::std::collections::BTreeSet::from([7]));
actual.replica_leader.insert(CHAIN, THIS_NODE);
actual.last_rebalance.insert(CHAIN, now);
actual.forced_evictions.push((CHAIN, 7));
let actions = reconcile(
&actual,
&DesiredState::default(),
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(
actions,
vec![MeshOsAction::RequestEviction {
chain: CHAIN,
victim: 7,
}],
);
}
#[test]
fn freeze_expired_does_not_suppress_action_emission() {
let mut actual = MeshOsState::default();
actual.last_tick = Some(Instant::now());
actual.freeze_until = Some(actual.last_tick.unwrap() - Duration::from_secs(1));
let d = daemon("post-thaw-daemon", 1);
actual.daemons.insert(d.clone(), DaemonStatus::default());
let mut desired = DesiredState::default();
desired.desired_daemons.insert(d.clone(), DaemonIntent::Run);
let actions = reconcile(
&actual,
&desired,
THIS_NODE,
&LocalityConfig::default(),
&MaintenanceConfig::default(),
&SchedulerConfig::default(),
None,
);
assert_eq!(actions, vec![MeshOsAction::StartDaemon { daemon: d }]);
}
}