use std::time::Duration;
use serde::{Deserialize, Serialize};
use super::event::{AdminEvent, AvoidScope, ChainId, DaemonRef, MigrationId, NodeId};
use super::snapshot::MeshOsSnapshot;
use crate::adapter::net::identity::{EntityId, EntityKeypair};
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum IceActionProposal {
FreezeCluster {
ttl: Duration,
},
ThawCluster,
FlushAvoidLists {
scope: AvoidScope,
},
ForceEvictReplica {
chain: ChainId,
victim: NodeId,
},
ForceRestartDaemon {
daemon: DaemonRef,
},
ForceCutover {
chain: ChainId,
target: NodeId,
},
KillMigration {
migration: MigrationId,
},
}
impl IceActionProposal {
pub fn kind(&self) -> &'static str {
match self {
IceActionProposal::FreezeCluster { .. } => "freeze_cluster",
IceActionProposal::ThawCluster => "thaw_cluster",
IceActionProposal::FlushAvoidLists { .. } => "flush_avoid_lists",
IceActionProposal::ForceEvictReplica { .. } => "force_evict_replica",
IceActionProposal::ForceRestartDaemon { .. } => "force_restart_daemon",
IceActionProposal::ForceCutover { .. } => "force_cutover",
IceActionProposal::KillMigration { .. } => "kill_migration",
}
}
pub fn to_admin_event(&self) -> AdminEvent {
match self {
IceActionProposal::FreezeCluster { ttl } => AdminEvent::FreezeCluster { ttl: *ttl },
IceActionProposal::ThawCluster => AdminEvent::ThawCluster,
IceActionProposal::FlushAvoidLists { scope } => {
AdminEvent::FlushAvoidLists { scope: *scope }
}
IceActionProposal::ForceEvictReplica { chain, victim } => {
AdminEvent::ForceEvictReplica {
chain: *chain,
victim: *victim,
}
}
IceActionProposal::ForceRestartDaemon { daemon } => AdminEvent::ForceRestartDaemon {
daemon: daemon.clone(),
},
IceActionProposal::ForceCutover { chain, target } => AdminEvent::ForceCutover {
chain: *chain,
target: *target,
},
IceActionProposal::KillMigration { migration } => AdminEvent::KillMigration {
migration: *migration,
},
}
}
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct OperatorSignature {
pub operator_id: u64,
pub signature: Vec<u8>,
}
pub const ICE_SIGNING_DOMAIN: &[u8] = b"net.meshos.ice.v1\0";
pub const ADMIN_SIGNING_DOMAIN: &[u8] = b"net.meshos.admin.v1\0";
pub const DEFAULT_SIGNING_FRESHNESS_WINDOW: Duration = Duration::from_secs(300);
pub const DEFAULT_SIGNING_FUTURE_SKEW: Duration = Duration::from_secs(30);
pub const DEFAULT_ICE_COOLDOWN_WINDOW: Duration = Duration::from_secs(300);
pub fn now_ms_since_unix_epoch() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
pub const BLAST_RADIUS_HASH_LEN: usize = 32;
pub type BlastRadiusHash = [u8; BLAST_RADIUS_HASH_LEN];
pub const SIMULATION_REQUIRED_SENTINEL: BlastRadiusHash = [0u8; BLAST_RADIUS_HASH_LEN];
#[expect(
clippy::expect_used,
reason = "BlastRadius is composed of types whose postcard encoding is infallible (see module docs)"
)]
pub fn blast_radius_hash(blast: &BlastRadius) -> BlastRadiusHash {
let bytes =
postcard::to_allocvec(blast).expect("postcard encoding of BlastRadius is infallible");
blake3::hash(&bytes).into()
}
#[expect(
clippy::expect_used,
reason = "IceActionProposal is composed of types whose postcard encoding is infallible"
)]
pub fn ice_proposal_signing_payload(
proposal: &IceActionProposal,
issued_at_ms: u64,
blast_hash: &BlastRadiusHash,
) -> Vec<u8> {
let inner = postcard::to_allocvec(proposal)
.expect("postcard encoding of IceActionProposal is infallible");
let mut buf =
Vec::with_capacity(ICE_SIGNING_DOMAIN.len() + 8 + BLAST_RADIUS_HASH_LEN + inner.len());
buf.extend_from_slice(ICE_SIGNING_DOMAIN);
buf.extend_from_slice(&issued_at_ms.to_le_bytes());
buf.extend_from_slice(blast_hash);
buf.extend_from_slice(&inner);
buf
}
#[expect(
clippy::expect_used,
reason = "AdminEvent is composed of types whose postcard encoding is infallible"
)]
pub fn admin_event_signing_payload(event: &AdminEvent, issued_at_ms: u64) -> Vec<u8> {
let inner =
postcard::to_allocvec(event).expect("postcard encoding of AdminEvent is infallible");
let mut buf = Vec::with_capacity(ADMIN_SIGNING_DOMAIN.len() + 8 + inner.len());
buf.extend_from_slice(ADMIN_SIGNING_DOMAIN);
buf.extend_from_slice(&issued_at_ms.to_le_bytes());
buf.extend_from_slice(&inner);
buf
}
impl OperatorSignature {
pub fn sign(
keypair: &EntityKeypair,
proposal: &IceActionProposal,
issued_at_ms: u64,
blast_hash: &BlastRadiusHash,
) -> Self {
let payload = ice_proposal_signing_payload(proposal, issued_at_ms, blast_hash);
let sig = keypair.sign(&payload);
Self {
operator_id: keypair.origin_hash(),
signature: sig.to_bytes().to_vec(),
}
}
pub fn sign_admin(keypair: &EntityKeypair, event: &AdminEvent, issued_at_ms: u64) -> Self {
let payload = admin_event_signing_payload(event, issued_at_ms);
let sig = keypair.sign(&payload);
Self {
operator_id: keypair.origin_hash(),
signature: sig.to_bytes().to_vec(),
}
}
pub fn try_sign(
keypair: &EntityKeypair,
proposal: &IceActionProposal,
issued_at_ms: u64,
blast_hash: &BlastRadiusHash,
) -> Option<Self> {
if keypair.is_read_only() {
return None;
}
Some(Self::sign(keypair, proposal, issued_at_ms, blast_hash))
}
pub fn try_sign_admin(
keypair: &EntityKeypair,
event: &AdminEvent,
issued_at_ms: u64,
) -> Option<Self> {
if keypair.is_read_only() {
return None;
}
Some(Self::sign_admin(keypair, event, issued_at_ms))
}
}
#[derive(Clone, Debug, Default)]
pub struct OperatorRegistry {
keys: std::collections::BTreeMap<u64, EntityId>,
}
impl OperatorRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn insert(&mut self, operator_id: u64, public_key: EntityId) {
self.keys.insert(operator_id, public_key);
}
pub fn register(&mut self, keypair: &EntityKeypair) {
self.insert(keypair.origin_hash(), keypair.entity_id().clone());
}
pub fn contains(&self, operator_id: u64) -> bool {
self.keys.contains_key(&operator_id)
}
pub fn len(&self) -> usize {
self.keys.len()
}
pub fn is_empty(&self) -> bool {
self.keys.is_empty()
}
pub fn verify(&self, signature: &OperatorSignature, payload: &[u8]) -> Result<(), VerifyError> {
let entity_id =
self.keys
.get(&signature.operator_id)
.ok_or(VerifyError::NotAuthorized {
operator_id: signature.operator_id,
})?;
let sig_bytes: &[u8; 64] = signature.signature.as_slice().try_into().map_err(|_| {
VerifyError::InvalidSignature {
operator_id: signature.operator_id,
reason: format!(
"signature is not 64 bytes (got {})",
signature.signature.len()
),
}
})?;
let ed_sig = ed25519_dalek::Signature::from_bytes(sig_bytes);
entity_id
.verify(payload, &ed_sig)
.map_err(|_| VerifyError::InvalidSignature {
operator_id: signature.operator_id,
reason: "signature failed verification against the registered public key".into(),
})
}
pub fn verify_bundle(
&self,
signatures: &[OperatorSignature],
payload: &[u8],
threshold: usize,
) -> Result<(), VerifyError> {
const MAX_SIGNATURES_PER_BUNDLE: usize = 64;
let max_signatures = MAX_SIGNATURES_PER_BUNDLE.max(threshold);
if signatures.len() > max_signatures {
return Err(VerifyError::InsufficientSignatures {
got: signatures.len(),
required: threshold,
});
}
let mut unique_operators: std::collections::BTreeSet<u64> =
std::collections::BTreeSet::new();
for sig in signatures {
self.verify(sig, payload)?;
unique_operators.insert(sig.operator_id);
}
if unique_operators.len() < threshold {
return Err(VerifyError::InsufficientSignatures {
got: unique_operators.len(),
required: threshold,
});
}
Ok(())
}
}
#[derive(Clone, Debug, thiserror::Error)]
pub enum VerifyError {
#[error("operator {operator_id} is not registered in the cluster's operator policy")]
NotAuthorized {
operator_id: u64,
},
#[error("operator {operator_id} signature invalid: {reason}")]
InvalidSignature {
operator_id: u64,
reason: String,
},
#[error("insufficient signatures: got {got}, required {required}")]
InsufficientSignatures {
got: usize,
required: usize,
},
#[error(
"signed envelope expired — issued {issued_at_ms} ms, verified at {now_ms} ms, max age {max_age_ms} ms"
)]
EnvelopeExpired {
issued_at_ms: u64,
now_ms: u64,
max_age_ms: u64,
},
#[error(
"signed envelope is from the future — issued {issued_at_ms} ms, verified at {now_ms} ms, future-skew tolerance {future_skew_ms} ms"
)]
EnvelopeFromFuture {
issued_at_ms: u64,
now_ms: u64,
future_skew_ms: u64,
},
#[error("ICE commit rejected: simulate() must precede commit() (blast-radius hash is the simulation-required sentinel)")]
SimulationRequired,
#[error(
"ICE cooldown active: target {node:?} blocked until {expires_at_ms} ms (verifier observed {now_ms} ms)"
)]
IceCooldownActive {
node: Option<NodeId>,
expires_at_ms: u64,
now_ms: u64,
},
}
impl VerifyError {
pub fn kind(&self) -> &'static str {
match self {
VerifyError::NotAuthorized { .. } => "not_authorized",
VerifyError::InvalidSignature { .. } => "signature_invalid",
VerifyError::InsufficientSignatures { .. } => "insufficient_signatures",
VerifyError::EnvelopeExpired { .. } => "envelope_expired",
VerifyError::EnvelopeFromFuture { .. } => "envelope_from_future",
VerifyError::SimulationRequired => "simulation_required",
VerifyError::IceCooldownActive { .. } => "ice_cooldown_active",
}
}
}
#[derive(Debug, Default)]
struct IceCooldownState {
per_node: std::collections::BTreeMap<NodeId, u64>,
cluster_wide_until_ms: Option<u64>,
}
fn cooldown_targets(proposal: &IceActionProposal) -> CooldownTargets {
match proposal {
IceActionProposal::FreezeCluster { .. } | IceActionProposal::ThawCluster => {
CooldownTargets::ClusterWide
}
IceActionProposal::FlushAvoidLists { scope } => match scope {
AvoidScope::Local { node } => CooldownTargets::PerNode(vec![*node]),
AvoidScope::OnPeer { peer } => CooldownTargets::PerNode(vec![*peer]),
AvoidScope::Global => CooldownTargets::ClusterWide,
},
IceActionProposal::ForceEvictReplica { victim, .. } => {
CooldownTargets::PerNode(vec![*victim])
}
IceActionProposal::ForceCutover { target, .. } => CooldownTargets::PerNode(vec![*target]),
IceActionProposal::ForceRestartDaemon { .. } | IceActionProposal::KillMigration { .. } => {
CooldownTargets::ClusterWide
}
}
}
#[derive(Debug)]
enum CooldownTargets {
ClusterWide,
PerNode(Vec<NodeId>),
}
pub const DEFAULT_MAX_ADMIN_AUDIT_RECORDS: usize = 256;
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum VerificationOutcome {
Accepted,
Rejected {
kind: String,
message: String,
},
Unverified,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct AdminAuditRecord {
pub seq: u64,
pub committed_at_ms: u64,
pub event: super::event::AdminEvent,
pub operator_ids: Vec<u64>,
pub outcome: VerificationOutcome,
#[serde(default)]
pub chain_pending: bool,
}
#[derive(Clone, Debug)]
pub struct AdminVerifier {
registry: std::sync::Arc<OperatorRegistry>,
threshold: usize,
freshness_window: Duration,
future_skew: Duration,
ice_cooldown: Duration,
ice_state: std::sync::Arc<parking_lot::Mutex<IceCooldownState>>,
}
impl AdminVerifier {
pub fn new(registry: std::sync::Arc<OperatorRegistry>, threshold: usize) -> Self {
Self::with_freshness(
registry,
threshold,
DEFAULT_SIGNING_FRESHNESS_WINDOW,
DEFAULT_SIGNING_FUTURE_SKEW,
)
}
pub fn with_freshness(
registry: std::sync::Arc<OperatorRegistry>,
threshold: usize,
freshness_window: Duration,
future_skew: Duration,
) -> Self {
Self::with_full_policy(
registry,
threshold,
freshness_window,
future_skew,
DEFAULT_ICE_COOLDOWN_WINDOW,
)
}
pub fn with_full_policy(
registry: std::sync::Arc<OperatorRegistry>,
threshold: usize,
freshness_window: Duration,
future_skew: Duration,
ice_cooldown: Duration,
) -> Self {
Self {
registry,
threshold: threshold.max(1),
freshness_window,
future_skew,
ice_cooldown,
ice_state: std::sync::Arc::new(parking_lot::Mutex::new(IceCooldownState::default())),
}
}
pub fn registry(&self) -> &OperatorRegistry {
&self.registry
}
pub fn threshold(&self) -> usize {
self.threshold
}
pub fn freshness_window(&self) -> Duration {
self.freshness_window
}
pub fn future_skew(&self) -> Duration {
self.future_skew
}
pub fn ice_cooldown(&self) -> Duration {
self.ice_cooldown
}
pub fn verify_commit(
&self,
proposal: &IceActionProposal,
signatures: &[OperatorSignature],
issued_at_ms: u64,
blast_hash: &BlastRadiusHash,
now_ms: u64,
) -> Result<(), VerifyError> {
if *blast_hash == SIMULATION_REQUIRED_SENTINEL {
return Err(VerifyError::SimulationRequired);
}
self.check_freshness(issued_at_ms, now_ms)?;
let targets = cooldown_targets(proposal);
self.check_ice_cooldown(&targets, now_ms)?;
let payload = ice_proposal_signing_payload(proposal, issued_at_ms, blast_hash);
self.registry
.verify_bundle(signatures, &payload, self.threshold)?;
self.record_ice_cooldown(&targets, now_ms);
Ok(())
}
fn check_ice_cooldown(
&self,
targets: &CooldownTargets,
now_ms: u64,
) -> Result<(), VerifyError> {
let state = self.ice_state.lock();
match targets {
CooldownTargets::ClusterWide => {
if let Some(expires_at_ms) = state.cluster_wide_until_ms {
if expires_at_ms > now_ms {
return Err(VerifyError::IceCooldownActive {
node: None,
expires_at_ms,
now_ms,
});
}
}
}
CooldownTargets::PerNode(nodes) => {
for node in nodes {
if let Some(expires_at_ms) = state.per_node.get(node) {
if *expires_at_ms > now_ms {
return Err(VerifyError::IceCooldownActive {
node: Some(*node),
expires_at_ms: *expires_at_ms,
now_ms,
});
}
}
}
}
}
Ok(())
}
fn record_ice_cooldown(&self, targets: &CooldownTargets, now_ms: u64) {
let expires_at_ms = now_ms.saturating_add(self.ice_cooldown.as_millis() as u64);
let mut state = self.ice_state.lock();
match targets {
CooldownTargets::ClusterWide => {
state.cluster_wide_until_ms = Some(expires_at_ms);
}
CooldownTargets::PerNode(nodes) => {
for node in nodes {
state.per_node.insert(*node, expires_at_ms);
}
}
}
state
.per_node
.retain(|_, expires_at_ms| *expires_at_ms > now_ms);
if let Some(cluster_expires) = state.cluster_wide_until_ms {
if cluster_expires <= now_ms {
state.cluster_wide_until_ms = None;
}
}
}
pub fn verify_admin_commit(
&self,
event: &AdminEvent,
signature: &OperatorSignature,
issued_at_ms: u64,
now_ms: u64,
) -> Result<(), VerifyError> {
self.check_freshness(issued_at_ms, now_ms)?;
let payload = admin_event_signing_payload(event, issued_at_ms);
self.registry.verify(signature, &payload)
}
fn check_freshness(&self, issued_at_ms: u64, now_ms: u64) -> Result<(), VerifyError> {
if issued_at_ms > now_ms {
let drift_ms = issued_at_ms - now_ms;
if drift_ms > self.future_skew.as_millis() as u64 {
return Err(VerifyError::EnvelopeFromFuture {
issued_at_ms,
now_ms,
future_skew_ms: self.future_skew.as_millis() as u64,
});
}
return Ok(());
}
let age_ms = now_ms - issued_at_ms;
if age_ms > self.freshness_window.as_millis() as u64 {
return Err(VerifyError::EnvelopeExpired {
issued_at_ms,
now_ms,
max_age_ms: self.freshness_window.as_millis() as u64,
});
}
Ok(())
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
pub struct BlastRadius {
pub affected_nodes: Vec<NodeId>,
pub affected_replicas: Vec<ChainId>,
pub affected_daemons: Vec<DaemonRef>,
pub estimated_drain_delay: Option<Duration>,
pub placement_stability_delta: f32,
pub warnings: Vec<BlastWarning>,
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum BlastWarning {
ClusterFreezeBlocksOperatorActions,
ThawResumesPendingReconciles,
ThawHasNoFreezeToCancel,
GlobalAvoidFlushMayReEmit,
AvoidFlushLocalToTargetNodeOnly,
AvoidFlushRecoversPeer {
peer: NodeId,
},
ForcedEvictionBypassesCooldown {
chain: ChainId,
victim: NodeId,
},
ForcedEvictionTargetsUnknownChain {
chain: ChainId,
},
ForcedEvictionTargetsNonHolder {
chain: ChainId,
victim: NodeId,
},
ForcedRestartBypassesBackoff {
daemon_id: u64,
},
ForcedRestartTargetsUnknownDaemon {
daemon_id: u64,
},
ForcedRestartDaemonNotInBackoff {
daemon_id: u64,
},
ForcedCutoverBypassesPlacementScorer {
chain: ChainId,
target: NodeId,
},
ForcedCutoverTargetsUnknownChain {
chain: ChainId,
},
ForcedCutoverTargetAlreadyHolder {
chain: ChainId,
target: NodeId,
},
KillMigrationDispatcherIntegrationPending {
migration: MigrationId,
},
}
pub fn simulate(snapshot: &MeshOsSnapshot, proposal: &IceActionProposal) -> BlastRadius {
match proposal {
IceActionProposal::FreezeCluster { ttl } => simulate_freeze(snapshot, *ttl),
IceActionProposal::ThawCluster => simulate_thaw(snapshot),
IceActionProposal::FlushAvoidLists { scope } => {
simulate_flush_avoid_lists(snapshot, *scope)
}
IceActionProposal::ForceEvictReplica { chain, victim } => {
simulate_force_evict_replica(snapshot, *chain, *victim)
}
IceActionProposal::ForceRestartDaemon { daemon } => {
simulate_force_restart_daemon(snapshot, daemon)
}
IceActionProposal::ForceCutover { chain, target } => {
simulate_force_cutover(snapshot, *chain, *target)
}
IceActionProposal::KillMigration { migration } => {
simulate_kill_migration(snapshot, *migration)
}
}
}
fn simulate_freeze(snapshot: &MeshOsSnapshot, ttl: Duration) -> BlastRadius {
let mut affected_nodes: Vec<NodeId> = snapshot.peers.keys().copied().collect();
affected_nodes.sort();
BlastRadius {
affected_nodes,
affected_replicas: Vec::new(),
affected_daemons: Vec::new(),
estimated_drain_delay: Some(ttl),
placement_stability_delta: 0.0,
warnings: vec![BlastWarning::ClusterFreezeBlocksOperatorActions],
}
}
fn simulate_flush_avoid_lists(snapshot: &MeshOsSnapshot, scope: AvoidScope) -> BlastRadius {
let mut affected_nodes: Vec<NodeId> = snapshot.peers.keys().copied().collect();
affected_nodes.sort();
match scope {
AvoidScope::Local { node } => BlastRadius {
affected_nodes: vec![node],
affected_replicas: Vec::new(),
affected_daemons: Vec::new(),
estimated_drain_delay: None,
placement_stability_delta: 0.0,
warnings: vec![BlastWarning::AvoidFlushLocalToTargetNodeOnly],
},
AvoidScope::OnPeer { peer } => BlastRadius {
affected_nodes,
affected_replicas: Vec::new(),
affected_daemons: Vec::new(),
estimated_drain_delay: None,
placement_stability_delta: 0.05,
warnings: vec![BlastWarning::AvoidFlushRecoversPeer { peer }],
},
AvoidScope::Global => BlastRadius {
affected_nodes,
affected_replicas: Vec::new(),
affected_daemons: Vec::new(),
estimated_drain_delay: None,
placement_stability_delta: 0.1,
warnings: vec![BlastWarning::GlobalAvoidFlushMayReEmit],
},
}
}
fn simulate_force_evict_replica(
snapshot: &MeshOsSnapshot,
chain: ChainId,
victim: NodeId,
) -> BlastRadius {
let mut warnings = vec![BlastWarning::ForcedEvictionBypassesCooldown { chain, victim }];
let replica = snapshot.replicas.get(&chain);
if replica.is_none() {
warnings.push(BlastWarning::ForcedEvictionTargetsUnknownChain { chain });
} else if let Some(snap) = replica {
if !snap.holders.contains(&victim) {
warnings.push(BlastWarning::ForcedEvictionTargetsNonHolder { chain, victim });
}
}
BlastRadius {
affected_nodes: vec![victim],
affected_replicas: vec![chain],
affected_daemons: Vec::new(),
estimated_drain_delay: None,
placement_stability_delta: 0.15,
warnings,
}
}
fn simulate_force_restart_daemon(snapshot: &MeshOsSnapshot, daemon: &DaemonRef) -> BlastRadius {
let mut warnings = vec![BlastWarning::ForcedRestartBypassesBackoff {
daemon_id: daemon.id,
}];
match snapshot.daemons.get(&daemon.id) {
None => warnings.push(BlastWarning::ForcedRestartTargetsUnknownDaemon {
daemon_id: daemon.id,
}),
Some(snap) => {
if matches!(
snap.restart_state,
super::snapshot::RestartStateSnapshot::Idle
) {
warnings.push(BlastWarning::ForcedRestartDaemonNotInBackoff {
daemon_id: daemon.id,
});
}
}
}
BlastRadius {
affected_nodes: Vec::new(),
affected_replicas: Vec::new(),
affected_daemons: vec![daemon.clone()],
estimated_drain_delay: None,
placement_stability_delta: 0.0,
warnings,
}
}
fn simulate_force_cutover(
snapshot: &MeshOsSnapshot,
chain: ChainId,
target: NodeId,
) -> BlastRadius {
let mut warnings = vec![BlastWarning::ForcedCutoverBypassesPlacementScorer { chain, target }];
match snapshot.replicas.get(&chain) {
None => warnings.push(BlastWarning::ForcedCutoverTargetsUnknownChain { chain }),
Some(snap) => {
if snap.holders.contains(&target) {
warnings.push(BlastWarning::ForcedCutoverTargetAlreadyHolder { chain, target });
}
}
}
BlastRadius {
affected_nodes: vec![target],
affected_replicas: vec![chain],
affected_daemons: Vec::new(),
estimated_drain_delay: None,
placement_stability_delta: 0.15,
warnings,
}
}
fn simulate_kill_migration(snapshot: &MeshOsSnapshot, migration: MigrationId) -> BlastRadius {
let affected_daemons = match snapshot
.in_flight_migrations
.iter()
.find(|m| m.daemon_origin == migration)
{
Some(_) => vec![super::event::DaemonRef {
id: migration,
name: String::new(),
}],
None => Vec::new(),
};
BlastRadius {
affected_nodes: Vec::new(),
affected_replicas: Vec::new(),
affected_daemons,
estimated_drain_delay: None,
placement_stability_delta: 0.0,
warnings: vec![BlastWarning::KillMigrationDispatcherIntegrationPending { migration }],
}
}
fn simulate_thaw(snapshot: &MeshOsSnapshot) -> BlastRadius {
let warning = if snapshot.freeze_remaining_ms.is_some() {
BlastWarning::ThawResumesPendingReconciles
} else {
BlastWarning::ThawHasNoFreezeToCancel
};
BlastRadius {
affected_nodes: Vec::new(),
affected_replicas: Vec::new(),
affected_daemons: Vec::new(),
estimated_drain_delay: None,
placement_stability_delta: 0.0,
warnings: vec![warning],
}
}
#[cfg(test)]
#[allow(clippy::field_reassign_with_default)]
mod tests {
use super::*;
use crate::adapter::net::behavior::meshos::snapshot::PeerSnapshot;
fn snapshot_with_peers(peers: &[NodeId]) -> MeshOsSnapshot {
let mut snap = MeshOsSnapshot::default();
for peer in peers {
snap.peers.insert(*peer, PeerSnapshot::default());
}
snap
}
#[test]
fn freeze_against_empty_snapshot_reports_no_affected_nodes() {
let snap = MeshOsSnapshot::default();
let blast = simulate(
&snap,
&IceActionProposal::FreezeCluster {
ttl: Duration::from_secs(30),
},
);
assert!(blast.affected_nodes.is_empty());
assert_eq!(blast.estimated_drain_delay, Some(Duration::from_secs(30)));
assert_eq!(
blast.warnings,
vec![BlastWarning::ClusterFreezeBlocksOperatorActions]
);
}
#[test]
fn freeze_against_three_peers_reports_all_three_sorted() {
let snap = snapshot_with_peers(&[30, 10, 20]);
let blast = simulate(
&snap,
&IceActionProposal::FreezeCluster {
ttl: Duration::from_secs(60),
},
);
assert_eq!(blast.affected_nodes, vec![10, 20, 30]);
assert_eq!(blast.estimated_drain_delay, Some(Duration::from_secs(60)));
assert_eq!(blast.placement_stability_delta, 0.0);
assert!(blast.affected_replicas.is_empty());
assert!(blast.affected_daemons.is_empty());
}
#[test]
fn thaw_against_frozen_snapshot_warns_pending_reconciles_resume() {
let mut snap = MeshOsSnapshot::default();
snap.freeze_remaining_ms = Some(15_000);
let blast = simulate(&snap, &IceActionProposal::ThawCluster);
assert_eq!(
blast.warnings,
vec![BlastWarning::ThawResumesPendingReconciles]
);
assert!(blast.affected_nodes.is_empty());
assert_eq!(blast.estimated_drain_delay, None);
}
#[test]
fn thaw_against_unfrozen_snapshot_warns_no_op() {
let snap = MeshOsSnapshot::default();
let blast = simulate(&snap, &IceActionProposal::ThawCluster);
assert_eq!(blast.warnings, vec![BlastWarning::ThawHasNoFreezeToCancel]);
}
#[test]
fn blast_radius_postcard_round_trip_preserves_every_field() {
let blast = BlastRadius {
affected_nodes: vec![1, 2, 3],
affected_replicas: vec![100, 200],
affected_daemons: vec![DaemonRef {
id: 7,
name: "telemetry".into(),
}],
estimated_drain_delay: Some(Duration::from_secs(45)),
placement_stability_delta: 0.25,
warnings: vec![
BlastWarning::ClusterFreezeBlocksOperatorActions,
BlastWarning::ThawResumesPendingReconciles,
],
};
let bytes = postcard::to_allocvec(&blast).expect("encode");
let decoded: BlastRadius = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, blast);
}
#[test]
fn blast_radius_json_round_trip_preserves_every_field() {
let blast = BlastRadius {
affected_nodes: vec![42],
affected_replicas: Vec::new(),
affected_daemons: Vec::new(),
estimated_drain_delay: Some(Duration::from_millis(2_500)),
placement_stability_delta: 0.0,
warnings: vec![BlastWarning::ClusterFreezeBlocksOperatorActions],
};
let json = serde_json::to_string(&blast).expect("encode");
let decoded: BlastRadius = serde_json::from_str(&json).expect("decode");
assert_eq!(decoded, blast);
}
#[test]
fn ice_action_proposal_postcard_round_trips_both_variants() {
for proposal in [
IceActionProposal::FreezeCluster {
ttl: Duration::from_secs(90),
},
IceActionProposal::ThawCluster,
] {
let bytes = postcard::to_allocvec(&proposal).expect("encode");
let decoded: IceActionProposal = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, proposal);
}
}
#[test]
fn operator_signature_signs_and_verifies_round_trip() {
let kp = EntityKeypair::generate();
let mut registry = OperatorRegistry::new();
registry.register(&kp);
let proposal = IceActionProposal::FreezeCluster {
ttl: Duration::from_secs(30),
};
let issued_at_ms = now_ms_since_unix_epoch();
let blast = simulate(&MeshOsSnapshot::default(), &proposal);
let blast_hash = blast_radius_hash(&blast);
let sig = OperatorSignature::sign(&kp, &proposal, issued_at_ms, &blast_hash);
let payload = ice_proposal_signing_payload(&proposal, issued_at_ms, &blast_hash);
registry.verify(&sig, &payload).expect("valid signature");
}
#[test]
fn operator_registry_rejects_unknown_operator_via_substrate_path() {
let kp = EntityKeypair::generate();
let registry = OperatorRegistry::new(); let proposal = IceActionProposal::ThawCluster;
let issued_at_ms = now_ms_since_unix_epoch();
let blast_hash: BlastRadiusHash = [1u8; BLAST_RADIUS_HASH_LEN];
let sig = OperatorSignature::sign(&kp, &proposal, issued_at_ms, &blast_hash);
let payload = ice_proposal_signing_payload(&proposal, issued_at_ms, &blast_hash);
let err = registry.verify(&sig, &payload).unwrap_err();
assert!(matches!(err, VerifyError::NotAuthorized { .. }));
assert_eq!(err.kind(), "not_authorized");
}
#[test]
fn admin_verifier_clamps_zero_threshold_to_one() {
let registry = std::sync::Arc::new(OperatorRegistry::new());
let verifier = AdminVerifier::new(registry, 0);
assert_eq!(verifier.threshold(), 1);
}
#[test]
fn admin_verifier_returns_insufficient_signatures_for_empty_bundle() {
let kp = EntityKeypair::generate();
let mut registry = OperatorRegistry::new();
registry.register(&kp);
let verifier = AdminVerifier::new(std::sync::Arc::new(registry), 2);
let proposal = IceActionProposal::ThawCluster;
let issued_at_ms = now_ms_since_unix_epoch();
let blast_hash: BlastRadiusHash = [1u8; BLAST_RADIUS_HASH_LEN];
let sig = OperatorSignature::sign(&kp, &proposal, issued_at_ms, &blast_hash);
let err = verifier
.verify_commit(&proposal, &[sig], issued_at_ms, &blast_hash, issued_at_ms)
.unwrap_err();
assert!(matches!(
err,
VerifyError::InsufficientSignatures {
got: 1,
required: 2
}
));
}
#[test]
fn admin_verifier_rejects_duplicate_signatures_from_same_operator() {
let kp = EntityKeypair::generate();
let mut registry = OperatorRegistry::new();
registry.register(&kp);
let verifier = AdminVerifier::new(std::sync::Arc::new(registry), 2);
let proposal = IceActionProposal::ThawCluster;
let issued_at_ms = now_ms_since_unix_epoch();
let blast_hash: BlastRadiusHash = [1u8; BLAST_RADIUS_HASH_LEN];
let sig = OperatorSignature::sign(&kp, &proposal, issued_at_ms, &blast_hash);
let bundle = [sig.clone(), sig];
let err = verifier
.verify_commit(&proposal, &bundle, issued_at_ms, &blast_hash, issued_at_ms)
.unwrap_err();
assert!(
matches!(
err,
VerifyError::InsufficientSignatures {
got: 1,
required: 2
}
),
"expected InsufficientSignatures {{ got: 1, required: 2 }}, got {err:?}"
);
}
#[test]
fn admin_verifier_accepts_two_distinct_operators_at_threshold_two() {
let kp_a = EntityKeypair::generate();
let kp_b = EntityKeypair::generate();
let mut registry = OperatorRegistry::new();
registry.register(&kp_a);
registry.register(&kp_b);
let verifier = AdminVerifier::new(std::sync::Arc::new(registry), 2);
let proposal = IceActionProposal::ThawCluster;
let issued_at_ms = now_ms_since_unix_epoch();
let blast_hash: BlastRadiusHash = [1u8; BLAST_RADIUS_HASH_LEN];
let bundle = [
OperatorSignature::sign(&kp_a, &proposal, issued_at_ms, &blast_hash),
OperatorSignature::sign(&kp_b, &proposal, issued_at_ms, &blast_hash),
];
verifier
.verify_commit(&proposal, &bundle, issued_at_ms, &blast_hash, issued_at_ms)
.expect("two distinct operators with valid signatures should satisfy threshold = 2");
}
#[test]
fn admin_verifier_rejects_simulation_required_sentinel_blast_hash() {
let kp = EntityKeypair::generate();
let mut registry = OperatorRegistry::new();
registry.register(&kp);
let verifier = AdminVerifier::new(std::sync::Arc::new(registry), 1);
let proposal = IceActionProposal::ThawCluster;
let issued_at_ms = now_ms_since_unix_epoch();
let sentinel = SIMULATION_REQUIRED_SENTINEL;
let sig = OperatorSignature::sign(&kp, &proposal, issued_at_ms, &sentinel);
let err = verifier
.verify_commit(&proposal, &[sig], issued_at_ms, &sentinel, issued_at_ms)
.unwrap_err();
assert_eq!(err.kind(), "simulation_required");
}
#[test]
fn blast_radius_hash_is_deterministic_across_equal_inputs() {
let blast_a = BlastRadius {
affected_nodes: vec![1, 2, 3],
affected_replicas: vec![10],
affected_daemons: Vec::new(),
estimated_drain_delay: Some(Duration::from_secs(15)),
placement_stability_delta: 0.5,
warnings: vec![BlastWarning::ClusterFreezeBlocksOperatorActions],
};
let blast_b = blast_a.clone();
assert_eq!(blast_radius_hash(&blast_a), blast_radius_hash(&blast_b));
let blast_c = BlastRadius {
affected_nodes: vec![4, 5, 6],
..blast_a
};
assert_ne!(blast_radius_hash(&blast_b), blast_radius_hash(&blast_c));
assert_ne!(blast_radius_hash(&blast_b), SIMULATION_REQUIRED_SENTINEL);
}
#[test]
fn ice_proposal_to_admin_event_maps_freeze_cluster() {
let proposal = IceActionProposal::FreezeCluster {
ttl: Duration::from_secs(45),
};
assert!(matches!(
proposal.to_admin_event(),
AdminEvent::FreezeCluster { ttl } if ttl == Duration::from_secs(45)
));
}
#[test]
fn ice_proposal_to_admin_event_maps_thaw_cluster() {
assert!(matches!(
IceActionProposal::ThawCluster.to_admin_event(),
AdminEvent::ThawCluster
));
}
#[test]
fn simulate_flush_avoid_lists_local_targets_one_node() {
let snap = snapshot_with_peers(&[10, 20, 30]);
let blast = simulate(
&snap,
&IceActionProposal::FlushAvoidLists {
scope: AvoidScope::Local { node: 42 },
},
);
assert_eq!(blast.affected_nodes, vec![42]);
assert!(blast
.warnings
.iter()
.any(|w| matches!(w, BlastWarning::AvoidFlushLocalToTargetNodeOnly)));
}
#[test]
fn simulate_flush_avoid_lists_on_peer_covers_every_peer_with_warning() {
let snap = snapshot_with_peers(&[10, 20, 30]);
let blast = simulate(
&snap,
&IceActionProposal::FlushAvoidLists {
scope: AvoidScope::OnPeer { peer: 20 },
},
);
assert_eq!(blast.affected_nodes, vec![10, 20, 30]);
assert!(blast
.warnings
.iter()
.any(|w| matches!(w, BlastWarning::AvoidFlushRecoversPeer { peer: 20 })));
assert!(blast.placement_stability_delta > 0.0);
}
#[test]
fn simulate_flush_avoid_lists_global_carries_re_emit_warning() {
let snap = snapshot_with_peers(&[1, 2, 3]);
let blast = simulate(
&snap,
&IceActionProposal::FlushAvoidLists {
scope: AvoidScope::Global,
},
);
assert_eq!(blast.affected_nodes, vec![1, 2, 3]);
assert!(blast
.warnings
.iter()
.any(|w| matches!(w, BlastWarning::GlobalAvoidFlushMayReEmit)));
}
#[test]
fn ice_proposal_to_admin_event_maps_flush_avoid_lists() {
for scope in [
AvoidScope::Local { node: 42 },
AvoidScope::OnPeer { peer: 7 },
AvoidScope::Global,
] {
let proposal = IceActionProposal::FlushAvoidLists { scope };
match proposal.to_admin_event() {
AdminEvent::FlushAvoidLists { scope: out } => assert_eq!(out, scope),
other => panic!("expected FlushAvoidLists, got {other:?}"),
}
}
}
#[test]
fn simulator_blast_radius_matches_reconcile_emission_for_force_evict() {
use crate::adapter::net::behavior::meshos::action::MeshOsAction;
use crate::adapter::net::behavior::meshos::reconcile::reconcile;
use crate::adapter::net::behavior::meshos::state::{DesiredState, MeshOsState};
const CHAIN: ChainId = 100;
const LEADER: NodeId = 7;
const VICTIM: NodeId = 9;
let mut actual = MeshOsState::default();
actual.last_tick = Some(std::time::Instant::now());
actual
.replicas
.insert(CHAIN, std::collections::BTreeSet::from([LEADER, 8, VICTIM]));
actual.replica_leader.insert(CHAIN, LEADER);
actual.forced_evictions.push((CHAIN, VICTIM));
let actions = reconcile(
&actual,
&DesiredState::default(),
LEADER,
&Default::default(),
&Default::default(),
&Default::default(),
None,
);
let eviction = actions
.iter()
.find_map(|a| match a {
MeshOsAction::RequestEviction { chain, victim } => Some((*chain, *victim)),
_ => None,
})
.expect("leader's reconcile should emit one RequestEviction");
assert_eq!(eviction, (CHAIN, VICTIM));
let mut snap = MeshOsSnapshot::default();
snap.replicas.insert(
CHAIN,
super::super::snapshot::ReplicaSnapshot {
holders: vec![LEADER, 8, VICTIM],
desired_count: Some(3),
leader: Some(LEADER),
},
);
let blast = simulate(
&snap,
&IceActionProposal::ForceEvictReplica {
chain: CHAIN,
victim: VICTIM,
},
);
assert!(
blast.affected_replicas.contains(&CHAIN),
"simulator should mark chain {CHAIN} as affected; got {:?}",
blast.affected_replicas
);
assert!(
blast.affected_nodes.contains(&VICTIM),
"simulator should mark victim {VICTIM} as affected; got {:?}",
blast.affected_nodes
);
}
#[test]
fn simulator_blast_radius_matches_reconcile_emission_for_force_cutover() {
use crate::adapter::net::behavior::meshos::action::MeshOsAction;
use crate::adapter::net::behavior::meshos::reconcile::reconcile;
use crate::adapter::net::behavior::meshos::state::{DesiredState, MeshOsState};
const CHAIN: ChainId = 200;
const LEADER: NodeId = 11;
const TARGET: NodeId = 42;
let mut actual = MeshOsState::default();
actual.last_tick = Some(std::time::Instant::now());
actual
.replicas
.insert(CHAIN, std::collections::BTreeSet::from([LEADER, 12]));
actual.replica_leader.insert(CHAIN, LEADER);
actual.forced_placements.push((CHAIN, TARGET));
let actions = reconcile(
&actual,
&DesiredState::default(),
LEADER,
&Default::default(),
&Default::default(),
&Default::default(),
None,
);
let placement = actions
.iter()
.find_map(|a| match a {
MeshOsAction::RequestPlacement { chain, target, .. } => Some((*chain, *target)),
_ => None,
})
.expect("leader's reconcile should emit one RequestPlacement");
assert_eq!(placement, (CHAIN, Some(TARGET)));
let mut snap = MeshOsSnapshot::default();
snap.replicas.insert(
CHAIN,
super::super::snapshot::ReplicaSnapshot {
holders: vec![LEADER, 12],
desired_count: Some(3),
leader: Some(LEADER),
},
);
let blast = simulate(
&snap,
&IceActionProposal::ForceCutover {
chain: CHAIN,
target: TARGET,
},
);
assert!(blast.affected_replicas.contains(&CHAIN));
assert!(blast.affected_nodes.contains(&TARGET));
}
#[test]
fn simulate_force_evict_replica_reports_chain_and_victim() {
let mut snap = MeshOsSnapshot::default();
snap.replicas.insert(
100,
super::super::snapshot::ReplicaSnapshot {
holders: vec![7, 8, 9],
desired_count: Some(3),
leader: Some(8),
},
);
let blast = simulate(
&snap,
&IceActionProposal::ForceEvictReplica {
chain: 100,
victim: 7,
},
);
assert_eq!(blast.affected_replicas, vec![100]);
assert_eq!(blast.affected_nodes, vec![7]);
assert!(blast.warnings.iter().any(|w| matches!(
w,
BlastWarning::ForcedEvictionBypassesCooldown {
chain: 100,
victim: 7
}
)));
assert!(blast.placement_stability_delta > 0.0);
}
#[test]
fn simulate_force_evict_replica_warns_on_unknown_chain() {
let snap = MeshOsSnapshot::default();
let blast = simulate(
&snap,
&IceActionProposal::ForceEvictReplica {
chain: 100,
victim: 7,
},
);
assert!(blast.warnings.iter().any(|w| matches!(
w,
BlastWarning::ForcedEvictionTargetsUnknownChain { chain: 100 }
)));
}
#[test]
fn simulate_force_evict_replica_warns_on_non_holder_victim() {
let mut snap = MeshOsSnapshot::default();
snap.replicas.insert(
100,
super::super::snapshot::ReplicaSnapshot {
holders: vec![1, 2, 3],
desired_count: Some(3),
leader: Some(1),
},
);
let blast = simulate(
&snap,
&IceActionProposal::ForceEvictReplica {
chain: 100,
victim: 999,
},
);
assert!(blast.warnings.iter().any(|w| matches!(
w,
BlastWarning::ForcedEvictionTargetsNonHolder {
chain: 100,
victim: 999
}
)));
}
#[test]
fn simulate_force_restart_daemon_targets_only_the_daemon() {
use super::super::snapshot::{
DaemonLifecycleSnapshot, DaemonSnapshot, RestartStateSnapshot,
};
let mut snap = MeshOsSnapshot::default();
snap.daemons.insert(
7,
DaemonSnapshot {
name: "telemetry".into(),
lifecycle: DaemonLifecycleSnapshot::Stopped,
restart_state: RestartStateSnapshot::BackingOff { until_ms: 5_000 },
..Default::default()
},
);
let daemon = DaemonRef {
id: 7,
name: "telemetry".into(),
};
let blast = simulate(
&snap,
&IceActionProposal::ForceRestartDaemon {
daemon: daemon.clone(),
},
);
assert_eq!(blast.affected_daemons, vec![daemon]);
assert!(blast.affected_nodes.is_empty());
assert_eq!(blast.placement_stability_delta, 0.0);
assert!(blast.warnings.iter().any(|w| matches!(
w,
BlastWarning::ForcedRestartBypassesBackoff { daemon_id: 7 }
)));
assert!(blast.warnings.iter().all(|w| !matches!(
w,
BlastWarning::ForcedRestartTargetsUnknownDaemon { .. }
| BlastWarning::ForcedRestartDaemonNotInBackoff { .. }
)));
}
#[test]
fn simulate_force_restart_daemon_warns_on_unknown_daemon() {
let snap = MeshOsSnapshot::default();
let daemon = DaemonRef {
id: 99,
name: "absent".into(),
};
let blast = simulate(&snap, &IceActionProposal::ForceRestartDaemon { daemon });
assert!(blast.warnings.iter().any(|w| matches!(
w,
BlastWarning::ForcedRestartTargetsUnknownDaemon { daemon_id: 99 }
)));
}
#[test]
fn simulate_force_restart_daemon_warns_when_already_idle() {
use super::super::snapshot::{DaemonLifecycleSnapshot, DaemonSnapshot};
let mut snap = MeshOsSnapshot::default();
snap.daemons.insert(
7,
DaemonSnapshot {
name: "telemetry".into(),
lifecycle: DaemonLifecycleSnapshot::Running,
..Default::default()
},
);
let blast = simulate(
&snap,
&IceActionProposal::ForceRestartDaemon {
daemon: DaemonRef {
id: 7,
name: "telemetry".into(),
},
},
);
assert!(blast.warnings.iter().any(|w| matches!(
w,
BlastWarning::ForcedRestartDaemonNotInBackoff { daemon_id: 7 }
)));
}
#[test]
fn ice_proposal_to_admin_event_maps_force_restart_daemon() {
let daemon = DaemonRef {
id: 7,
name: "telemetry".into(),
};
let proposal = IceActionProposal::ForceRestartDaemon {
daemon: daemon.clone(),
};
match proposal.to_admin_event() {
AdminEvent::ForceRestartDaemon { daemon: out } => assert_eq!(out, daemon),
other => panic!("expected ForceRestartDaemon, got {other:?}"),
}
}
#[test]
fn simulate_force_cutover_reports_chain_and_target() {
let mut snap = MeshOsSnapshot::default();
snap.replicas.insert(
100,
super::super::snapshot::ReplicaSnapshot {
holders: vec![1, 2, 3],
desired_count: Some(3),
leader: Some(1),
},
);
let blast = simulate(
&snap,
&IceActionProposal::ForceCutover {
chain: 100,
target: 99,
},
);
assert_eq!(blast.affected_replicas, vec![100]);
assert_eq!(blast.affected_nodes, vec![99]);
assert!(blast.warnings.iter().any(|w| matches!(
w,
BlastWarning::ForcedCutoverBypassesPlacementScorer {
chain: 100,
target: 99
}
)));
assert!(blast.placement_stability_delta > 0.0);
}
#[test]
fn simulate_force_cutover_warns_on_unknown_chain() {
let snap = MeshOsSnapshot::default();
let blast = simulate(
&snap,
&IceActionProposal::ForceCutover {
chain: 100,
target: 7,
},
);
assert!(blast.warnings.iter().any(|w| matches!(
w,
BlastWarning::ForcedCutoverTargetsUnknownChain { chain: 100 }
)));
}
#[test]
fn simulate_force_cutover_warns_when_target_already_holder() {
let mut snap = MeshOsSnapshot::default();
snap.replicas.insert(
100,
super::super::snapshot::ReplicaSnapshot {
holders: vec![7, 8, 9],
desired_count: Some(3),
leader: Some(7),
},
);
let blast = simulate(
&snap,
&IceActionProposal::ForceCutover {
chain: 100,
target: 8,
},
);
assert!(blast.warnings.iter().any(|w| matches!(
w,
BlastWarning::ForcedCutoverTargetAlreadyHolder {
chain: 100,
target: 8
}
)));
}
#[test]
fn simulate_kill_migration_with_empty_snapshot_reports_no_daemons() {
let snap = MeshOsSnapshot::default();
let blast = simulate(&snap, &IceActionProposal::KillMigration { migration: 7 });
assert!(blast.affected_nodes.is_empty());
assert!(blast.affected_replicas.is_empty());
assert!(blast.affected_daemons.is_empty());
assert!(blast.warnings.iter().any(|w| matches!(
w,
BlastWarning::KillMigrationDispatcherIntegrationPending { migration: 7 }
)));
}
#[test]
fn simulate_kill_migration_enumerates_local_in_flight_migration() {
use super::super::snapshot::{MigrationPhaseSnapshot, MigrationSnapshot};
let mut snap = MeshOsSnapshot::default();
snap.in_flight_migrations = vec![
MigrationSnapshot {
daemon_origin: 0xCAFE,
phase: MigrationPhaseSnapshot::Transfer,
elapsed_ms: 250,
..Default::default()
},
MigrationSnapshot {
daemon_origin: 0xBEEF,
phase: MigrationPhaseSnapshot::Replay,
elapsed_ms: 50,
..Default::default()
},
]
.into();
let blast = simulate(
&snap,
&IceActionProposal::KillMigration { migration: 0xCAFE },
);
assert_eq!(blast.affected_daemons.len(), 1);
assert_eq!(blast.affected_daemons[0].id, 0xCAFE);
assert!(blast.warnings.iter().any(|w| matches!(
w,
BlastWarning::KillMigrationDispatcherIntegrationPending { migration: 0xCAFE }
)));
}
#[test]
fn ice_proposal_to_admin_event_maps_kill_migration() {
let proposal = IceActionProposal::KillMigration { migration: 42 };
match proposal.to_admin_event() {
AdminEvent::KillMigration { migration } => assert_eq!(migration, 42),
other => panic!("expected KillMigration, got {other:?}"),
}
}
#[test]
fn ice_proposal_to_admin_event_maps_force_cutover() {
let proposal = IceActionProposal::ForceCutover {
chain: 100,
target: 7,
};
match proposal.to_admin_event() {
AdminEvent::ForceCutover { chain, target } => {
assert_eq!(chain, 100);
assert_eq!(target, 7);
}
other => panic!("expected ForceCutover, got {other:?}"),
}
}
#[test]
fn ice_proposal_to_admin_event_maps_force_evict_replica() {
let proposal = IceActionProposal::ForceEvictReplica {
chain: 100,
victim: 7,
};
match proposal.to_admin_event() {
AdminEvent::ForceEvictReplica { chain, victim } => {
assert_eq!(chain, 100);
assert_eq!(victim, 7);
}
other => panic!("expected ForceEvictReplica, got {other:?}"),
}
}
#[test]
fn admin_audit_record_postcard_round_trips_each_outcome() {
for outcome in [
VerificationOutcome::Accepted,
VerificationOutcome::Rejected {
kind: "signature_invalid".into(),
message: "bad sig".into(),
},
VerificationOutcome::Unverified,
] {
let record = AdminAuditRecord {
seq: 1,
committed_at_ms: 12_345,
event: AdminEvent::FreezeCluster {
ttl: Duration::from_secs(60),
},
operator_ids: vec![1, 2, 3],
outcome: outcome.clone(),
chain_pending: false,
};
let bytes = postcard::to_allocvec(&record).expect("encode");
let decoded: AdminAuditRecord = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, record);
}
}
#[test]
fn admin_audit_record_json_round_trips_for_audit_query_path() {
let record = AdminAuditRecord {
seq: 42,
committed_at_ms: 999,
event: AdminEvent::ThawCluster,
operator_ids: vec![42],
outcome: VerificationOutcome::Accepted,
chain_pending: false,
};
let json = serde_json::to_string(&record).expect("encode");
let decoded: AdminAuditRecord = serde_json::from_str(&json).expect("decode");
assert_eq!(decoded, record);
}
#[test]
fn admin_event_signing_payload_carries_domain_tag_and_issued_at_prefix() {
let event = AdminEvent::EnterMaintenance {
node: 42,
drain_for: Some(Duration::from_secs(120)),
};
let issued_at_ms: u64 = 0x0123_4567_89AB_CDEF;
let payload = admin_event_signing_payload(&event, issued_at_ms);
assert!(
payload.starts_with(ADMIN_SIGNING_DOMAIN),
"payload must begin with ADMIN_SIGNING_DOMAIN, got {:?}",
&payload[..ADMIN_SIGNING_DOMAIN.len().min(payload.len())]
);
let inner_start = ADMIN_SIGNING_DOMAIN.len() + 8;
assert_eq!(
&payload[ADMIN_SIGNING_DOMAIN.len()..inner_start],
&issued_at_ms.to_le_bytes(),
);
let decoded: AdminEvent =
postcard::from_bytes(&payload[inner_start..]).expect("decode inner event");
assert_eq!(decoded, event);
}
#[test]
fn ice_and_admin_signing_payloads_use_distinct_domains() {
assert_ne!(ICE_SIGNING_DOMAIN, ADMIN_SIGNING_DOMAIN);
let event = AdminEvent::ThawCluster;
let proposal = IceActionProposal::ThawCluster;
let ts = 1u64;
let blast_hash: BlastRadiusHash = [1u8; BLAST_RADIUS_HASH_LEN];
let admin_payload = admin_event_signing_payload(&event, ts);
let ice_payload = ice_proposal_signing_payload(&proposal, ts, &blast_hash);
assert_ne!(admin_payload, ice_payload);
}
#[test]
fn admin_verifier_accepts_a_valid_single_signature_admin_commit() {
let kp = EntityKeypair::generate();
let mut registry = OperatorRegistry::new();
registry.register(&kp);
let verifier = AdminVerifier::new(std::sync::Arc::new(registry), 1);
let event = AdminEvent::Cordon { node: 42 };
let issued_at_ms = now_ms_since_unix_epoch();
let signature = OperatorSignature::sign_admin(&kp, &event, issued_at_ms);
verifier
.verify_admin_commit(&event, &signature, issued_at_ms, issued_at_ms)
.expect("valid single-sig commit");
}
#[test]
fn admin_verifier_rejects_tampered_single_signature_admin_commit() {
let kp = EntityKeypair::generate();
let mut registry = OperatorRegistry::new();
registry.register(&kp);
let verifier = AdminVerifier::new(std::sync::Arc::new(registry), 1);
let event = AdminEvent::Cordon { node: 42 };
let issued_at_ms = now_ms_since_unix_epoch();
let mut signature = OperatorSignature::sign_admin(&kp, &event, issued_at_ms);
signature.signature[0] ^= 0x01;
let err = verifier
.verify_admin_commit(&event, &signature, issued_at_ms, issued_at_ms)
.unwrap_err();
assert_eq!(err.kind(), "signature_invalid");
}
#[test]
fn admin_verifier_rejects_admin_commit_from_unknown_operator() {
let kp = EntityKeypair::generate();
let verifier = AdminVerifier::new(std::sync::Arc::new(OperatorRegistry::new()), 1);
let event = AdminEvent::Cordon { node: 42 };
let issued_at_ms = now_ms_since_unix_epoch();
let signature = OperatorSignature::sign_admin(&kp, &event, issued_at_ms);
let err = verifier
.verify_admin_commit(&event, &signature, issued_at_ms, issued_at_ms)
.unwrap_err();
assert_eq!(err.kind(), "not_authorized");
}
#[test]
fn admin_verifier_rejects_expired_ice_envelope() {
let kp = EntityKeypair::generate();
let mut registry = OperatorRegistry::new();
registry.register(&kp);
let verifier = AdminVerifier::with_freshness(
std::sync::Arc::new(registry),
1,
Duration::from_secs(60),
Duration::from_secs(5),
);
let proposal = IceActionProposal::ThawCluster;
let issued_at_ms = 1_000_000u64;
let now_ms = issued_at_ms + 120_000; let blast_hash: BlastRadiusHash = [1u8; BLAST_RADIUS_HASH_LEN];
let sig = OperatorSignature::sign(&kp, &proposal, issued_at_ms, &blast_hash);
let err = verifier
.verify_commit(&proposal, &[sig], issued_at_ms, &blast_hash, now_ms)
.unwrap_err();
assert_eq!(err.kind(), "envelope_expired");
}
#[test]
fn admin_verifier_rejects_envelope_stamped_too_far_in_future() {
let kp = EntityKeypair::generate();
let mut registry = OperatorRegistry::new();
registry.register(&kp);
let verifier = AdminVerifier::with_freshness(
std::sync::Arc::new(registry),
1,
Duration::from_secs(60),
Duration::from_secs(5),
);
let proposal = IceActionProposal::ThawCluster;
let now_ms = 1_000_000u64;
let issued_at_ms = now_ms + 60_000; let blast_hash: BlastRadiusHash = [1u8; BLAST_RADIUS_HASH_LEN];
let sig = OperatorSignature::sign(&kp, &proposal, issued_at_ms, &blast_hash);
let err = verifier
.verify_commit(&proposal, &[sig], issued_at_ms, &blast_hash, now_ms)
.unwrap_err();
assert_eq!(err.kind(), "envelope_from_future");
}
#[test]
fn admin_verifier_accepts_envelope_inside_freshness_window() {
let kp = EntityKeypair::generate();
let mut registry = OperatorRegistry::new();
registry.register(&kp);
let verifier = AdminVerifier::with_freshness(
std::sync::Arc::new(registry),
1,
Duration::from_secs(60),
Duration::from_secs(5),
);
let proposal = IceActionProposal::ThawCluster;
let issued_at_ms = 1_000_000u64;
let now_ms = issued_at_ms + 30_000; let blast_hash: BlastRadiusHash = [1u8; BLAST_RADIUS_HASH_LEN];
let sig = OperatorSignature::sign(&kp, &proposal, issued_at_ms, &blast_hash);
verifier
.verify_commit(&proposal, &[sig], issued_at_ms, &blast_hash, now_ms)
.expect("envelope inside freshness window should verify");
}
#[test]
fn ice_cooldown_blocks_second_commit_against_same_target_inside_window() {
let kp = EntityKeypair::generate();
let mut registry = OperatorRegistry::new();
registry.register(&kp);
let verifier = AdminVerifier::with_full_policy(
std::sync::Arc::new(registry),
1,
Duration::from_secs(300),
Duration::from_secs(30),
Duration::from_secs(60),
);
let proposal = IceActionProposal::ForceCutover {
chain: 100,
target: 42,
};
let issued_at_ms = 1_000_000u64;
let now_ms = issued_at_ms;
let blast = simulate(&MeshOsSnapshot::default(), &proposal);
let blast_hash = blast_radius_hash(&blast);
let sig = OperatorSignature::sign(&kp, &proposal, issued_at_ms, &blast_hash);
verifier
.verify_commit(
&proposal,
std::slice::from_ref(&sig),
issued_at_ms,
&blast_hash,
now_ms,
)
.expect("first commit should succeed");
let issued_at_ms2 = issued_at_ms + 30_000; let now_ms2 = issued_at_ms2;
let sig2 = OperatorSignature::sign(&kp, &proposal, issued_at_ms2, &blast_hash);
let err = verifier
.verify_commit(&proposal, &[sig2], issued_at_ms2, &blast_hash, now_ms2)
.unwrap_err();
assert_eq!(err.kind(), "ice_cooldown_active");
if let VerifyError::IceCooldownActive { node, .. } = err {
assert_eq!(node, Some(42));
} else {
panic!("expected IceCooldownActive {{ node: Some(42), .. }}");
}
}
#[test]
fn ice_cooldown_releases_after_window_expires() {
let kp = EntityKeypair::generate();
let mut registry = OperatorRegistry::new();
registry.register(&kp);
let verifier = AdminVerifier::with_full_policy(
std::sync::Arc::new(registry),
1,
Duration::from_secs(3600),
Duration::from_secs(60),
Duration::from_secs(60),
);
let proposal = IceActionProposal::ForceCutover {
chain: 100,
target: 42,
};
let issued_at_ms = 1_000_000u64;
let blast = simulate(&MeshOsSnapshot::default(), &proposal);
let blast_hash = blast_radius_hash(&blast);
let sig = OperatorSignature::sign(&kp, &proposal, issued_at_ms, &blast_hash);
verifier
.verify_commit(&proposal, &[sig], issued_at_ms, &blast_hash, issued_at_ms)
.expect("first commit should succeed");
let later_ms = issued_at_ms + 61_000; let sig2 = OperatorSignature::sign(&kp, &proposal, later_ms, &blast_hash);
verifier
.verify_commit(&proposal, &[sig2], later_ms, &blast_hash, later_ms)
.expect("commit after cooldown window should succeed");
}
#[test]
fn ice_cooldown_isolates_different_target_nodes() {
let kp = EntityKeypair::generate();
let mut registry = OperatorRegistry::new();
registry.register(&kp);
let verifier = AdminVerifier::with_full_policy(
std::sync::Arc::new(registry),
1,
Duration::from_secs(3600),
Duration::from_secs(60),
Duration::from_secs(60),
);
let issued_at_ms = 1_000_000u64;
let proposal_a = IceActionProposal::ForceCutover {
chain: 100,
target: 42,
};
let blast_a = simulate(&MeshOsSnapshot::default(), &proposal_a);
let hash_a = blast_radius_hash(&blast_a);
let sig_a = OperatorSignature::sign(&kp, &proposal_a, issued_at_ms, &hash_a);
verifier
.verify_commit(&proposal_a, &[sig_a], issued_at_ms, &hash_a, issued_at_ms)
.expect("commit against node 42");
let proposal_b = IceActionProposal::ForceCutover {
chain: 200,
target: 99,
};
let blast_b = simulate(&MeshOsSnapshot::default(), &proposal_b);
let hash_b = blast_radius_hash(&blast_b);
let sig_b = OperatorSignature::sign(&kp, &proposal_b, issued_at_ms, &hash_b);
verifier
.verify_commit(&proposal_b, &[sig_b], issued_at_ms, &hash_b, issued_at_ms)
.expect("different node should not be in cooldown");
}
#[test]
fn ice_cooldown_freeze_cluster_blocks_subsequent_cluster_wide_commits() {
let kp = EntityKeypair::generate();
let mut registry = OperatorRegistry::new();
registry.register(&kp);
let verifier = AdminVerifier::with_full_policy(
std::sync::Arc::new(registry),
1,
Duration::from_secs(3600),
Duration::from_secs(60),
Duration::from_secs(60),
);
let issued_at_ms = 1_000_000u64;
let freeze = IceActionProposal::FreezeCluster {
ttl: Duration::from_secs(120),
};
let blast = simulate(&MeshOsSnapshot::default(), &freeze);
let hash = blast_radius_hash(&blast);
let sig = OperatorSignature::sign(&kp, &freeze, issued_at_ms, &hash);
verifier
.verify_commit(&freeze, &[sig], issued_at_ms, &hash, issued_at_ms)
.expect("first freeze should succeed");
let thaw = IceActionProposal::ThawCluster;
let later_ms = issued_at_ms + 5_000; let thaw_blast = simulate(&MeshOsSnapshot::default(), &thaw);
let thaw_hash = blast_radius_hash(&thaw_blast);
let thaw_sig = OperatorSignature::sign(&kp, &thaw, later_ms, &thaw_hash);
let err = verifier
.verify_commit(&thaw, &[thaw_sig], later_ms, &thaw_hash, later_ms)
.unwrap_err();
assert_eq!(err.kind(), "ice_cooldown_active");
if let VerifyError::IceCooldownActive { node, .. } = err {
assert_eq!(node, None, "FreezeCluster cooldown is cluster-wide");
}
}
#[test]
fn cross_domain_replay_fails_verification() {
let kp = EntityKeypair::generate();
let mut registry = OperatorRegistry::new();
registry.register(&kp);
let issued_at_ms = now_ms_since_unix_epoch();
let proposal = IceActionProposal::ThawCluster;
let blast_hash: BlastRadiusHash = [1u8; BLAST_RADIUS_HASH_LEN];
let ice_sig = OperatorSignature::sign(&kp, &proposal, issued_at_ms, &blast_hash);
let event = AdminEvent::ThawCluster;
let admin_payload = admin_event_signing_payload(&event, issued_at_ms);
let err = registry.verify(&ice_sig, &admin_payload).unwrap_err();
assert_eq!(err.kind(), "signature_invalid");
}
#[test]
fn admin_audit_record_can_carry_ordinary_admin_event() {
let record = AdminAuditRecord {
seq: 7,
committed_at_ms: 1_000,
event: AdminEvent::EnterMaintenance {
node: 42,
drain_for: Some(Duration::from_secs(120)),
},
operator_ids: Vec::new(),
outcome: VerificationOutcome::Unverified,
chain_pending: false,
};
let bytes = postcard::to_allocvec(&record).expect("encode");
let decoded: AdminAuditRecord = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, record);
}
#[test]
fn flush_avoid_lists_proposal_postcard_round_trips_for_every_scope() {
for scope in [
AvoidScope::Local { node: 42 },
AvoidScope::OnPeer { peer: 7 },
AvoidScope::Global,
] {
let proposal = IceActionProposal::FlushAvoidLists { scope };
let bytes = postcard::to_allocvec(&proposal).expect("encode");
let decoded: IceActionProposal = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, proposal);
}
}
}