use super::fabric::{CellEpoch, CellId, CellTemperature, DataCapsule, RepairPolicy, SubjectCell};
use super::policy::{ReliabilityControlError, SafetyEnvelope};
use crate::remote::NodeId;
use crate::types::{ObligationId, Time};
use crate::util::DetHasher;
use std::hash::{Hash, Hasher};
use thiserror::Error;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct ConsumerStateDigest(u64);
impl ConsumerStateDigest {
pub const ZERO: Self = Self(0);
#[must_use]
pub const fn new(raw: u64) -> Self {
Self(raw)
}
#[must_use]
pub const fn raw(self) -> u64 {
self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct CapsuleDigest(u64);
impl CapsuleDigest {
pub const ZERO: Self = Self(0);
#[must_use]
pub const fn new(raw: u64) -> Self {
Self(raw)
}
#[must_use]
pub const fn raw(self) -> u64 {
self.0
}
#[must_use]
const fn is_zero(self) -> bool {
self.0 == 0
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CutCertificate {
pub cell_id: CellId,
pub epoch: CellEpoch,
pub obligation_frontier: Vec<ObligationId>,
pub consumer_state_digest: ConsumerStateDigest,
pub timestamp: Time,
pub signer: NodeId,
}
impl CutCertificate {
pub fn issue(
cell: &SubjectCell,
obligation_frontier: impl IntoIterator<Item = ObligationId>,
consumer_state_digest: ConsumerStateDigest,
timestamp: Time,
signer: NodeId,
) -> Result<Self, CutMobilityError> {
if !contains_node(&cell.steward_set, &signer) {
return Err(CutMobilityError::SignerNotInStewardSet {
cell_id: cell.cell_id,
signer,
});
}
Ok(Self {
cell_id: cell.cell_id,
epoch: cell.epoch,
obligation_frontier: canonicalize_frontier(obligation_frontier),
consumer_state_digest,
timestamp,
signer,
})
}
pub fn validate_for(&self, cell: &SubjectCell) -> Result<(), CutMobilityError> {
if self.cell_id != cell.cell_id {
return Err(CutMobilityError::CellMismatch {
certificate_cell: self.cell_id,
actual_cell: cell.cell_id,
});
}
if self.epoch != cell.epoch {
return Err(CutMobilityError::EpochMismatch {
certificate_epoch: self.epoch,
actual_epoch: cell.epoch,
});
}
if !contains_node(&cell.steward_set, &self.signer) {
return Err(CutMobilityError::SignerNotInStewardSet {
cell_id: cell.cell_id,
signer: self.signer.clone(),
});
}
Ok(())
}
#[must_use]
pub fn covers_obligation(&self, obligation: ObligationId) -> bool {
self.obligation_frontier.binary_search(&obligation).is_ok()
}
#[must_use]
pub fn obligation_frontier_digest(&self) -> u64 {
stable_hash((
"cut-frontier",
self.cell_id.raw(),
self.epoch,
&self.obligation_frontier,
self.consumer_state_digest.raw(),
self.timestamp.as_nanos(),
self.signer.as_str(),
))
}
#[must_use]
pub fn certificate_digest(&self) -> u64 {
stable_hash(("cut-certificate", self))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum MobilityOperation {
Evacuate {
from: NodeId,
to: NodeId,
},
Handoff {
from: NodeId,
to: NodeId,
},
WarmRestore {
target: NodeId,
restored_epoch: CellEpoch,
capsule_digest: CapsuleDigest,
},
Failover {
failed: NodeId,
promote_to: NodeId,
},
}
impl MobilityOperation {
pub fn certify(
&self,
cell: &SubjectCell,
certificate: &CutCertificate,
) -> Result<CertifiedMobility, CutMobilityError> {
certificate.validate_for(cell)?;
let resulting_cell = match self {
Self::Evacuate { from, to } => certify_evacuation(cell, certificate, from, to)?,
Self::Handoff { from, to } => certify_handoff(cell, certificate, from, to)?,
Self::WarmRestore {
target,
restored_epoch,
capsule_digest,
} => certify_warm_restore(cell, certificate, target, *restored_epoch, *capsule_digest)?,
Self::Failover { failed, promote_to } => {
certify_failover(cell, certificate, failed, promote_to)?
}
};
Ok(CertifiedMobility {
certificate: certificate.clone(),
operation: self.clone(),
obligation_frontier_digest: certificate.obligation_frontier_digest(),
resulting_cell,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CertifiedMobility {
pub certificate: CutCertificate,
pub operation: MobilityOperation,
pub obligation_frontier_digest: u64,
pub resulting_cell: SubjectCell,
}
impl CertifiedMobility {
#[must_use]
pub fn mobility_digest(&self) -> u64 {
stable_hash((
"certified-mobility",
self.certificate.certificate_digest(),
&self.operation,
self.obligation_frontier_digest,
self.resulting_cell.cell_id.raw(),
self.resulting_cell.epoch,
self.resulting_cell
.control_capsule
.sequencer_lease_generation,
self.resulting_cell.control_capsule.policy_revision,
))
}
}
impl SubjectCell {
pub fn issue_cut_certificate(
&self,
obligation_frontier: impl IntoIterator<Item = ObligationId>,
consumer_state_digest: ConsumerStateDigest,
timestamp: Time,
signer: NodeId,
) -> Result<CutCertificate, CutMobilityError> {
CutCertificate::issue(
self,
obligation_frontier,
consumer_state_digest,
timestamp,
signer,
)
}
pub fn certify_mobility(
&self,
certificate: &CutCertificate,
operation: &MobilityOperation,
) -> Result<CertifiedMobility, CutMobilityError> {
operation.certify(self, certificate)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum CutMobilityError {
#[error("cut certificate signer `{signer}` is not in the steward set for `{cell_id}`")]
SignerNotInStewardSet {
cell_id: CellId,
signer: NodeId,
},
#[error("cut certificate targets `{certificate_cell}` but current cell is `{actual_cell}`")]
CellMismatch {
certificate_cell: CellId,
actual_cell: CellId,
},
#[error(
"cut certificate epoch {certificate_epoch:?} does not match current epoch {actual_epoch:?}"
)]
EpochMismatch {
certificate_epoch: CellEpoch,
actual_epoch: CellEpoch,
},
#[error("subject cell `{cell_id}` has no active sequencer")]
NoActiveSequencer {
cell_id: CellId,
},
#[error("mobility source `{requested}` does not match active sequencer `{active}`")]
SourceNotActive {
requested: NodeId,
active: NodeId,
},
#[error("cut certificate signer `{signer}` must match mobility source `{active_source}`")]
SignerMustMatchSource {
signer: NodeId,
active_source: NodeId,
},
#[error("mobility target `{target}` is not in the steward set for `{cell_id}`")]
TargetNotInStewardSet {
cell_id: CellId,
target: NodeId,
},
#[error("mobility target `{target}` must differ from source `{current_source}`")]
TargetMatchesSource {
current_source: NodeId,
target: NodeId,
},
#[error("failover signer `{signer}` cannot also be the failed steward `{failed}`")]
FailoverSignedByFailedNode {
signer: NodeId,
failed: NodeId,
},
#[error("warm restore requires a non-zero consumer-state digest")]
MissingConsumerStateDigest,
#[error("warm restore requires a non-zero capsule digest")]
MissingCapsuleDigest,
#[error(
"warm restore epoch {restored_epoch:?} must be newer than cut epoch {certificate_epoch:?}"
)]
StaleRestoreEpoch {
restored_epoch: CellEpoch,
certificate_epoch: CellEpoch,
},
}
fn certify_evacuation(
cell: &SubjectCell,
certificate: &CutCertificate,
from: &NodeId,
to: &NodeId,
) -> Result<SubjectCell, CutMobilityError> {
let active = require_active_sequencer(cell)?;
if from != active {
return Err(CutMobilityError::SourceNotActive {
requested: from.clone(),
active: active.clone(),
});
}
if &certificate.signer != from {
return Err(CutMobilityError::SignerMustMatchSource {
signer: certificate.signer.clone(),
active_source: from.clone(),
});
}
if from == to {
return Err(CutMobilityError::TargetMatchesSource {
current_source: from.clone(),
target: to.clone(),
});
}
if !contains_node(&cell.steward_set, to) {
return Err(CutMobilityError::TargetNotInStewardSet {
cell_id: cell.cell_id,
target: to.clone(),
});
}
let mut moved = advance_control_state(cell);
moved.control_capsule.active_sequencer = Some(to.clone());
move_node_to_front(&mut moved.steward_set, to);
move_node_to_back(&mut moved.steward_set, from);
move_node_to_front(&mut moved.control_capsule.steward_pool, to);
move_node_to_back(&mut moved.control_capsule.steward_pool, from);
Ok(moved)
}
fn certify_handoff(
cell: &SubjectCell,
certificate: &CutCertificate,
from: &NodeId,
to: &NodeId,
) -> Result<SubjectCell, CutMobilityError> {
let active = require_active_sequencer(cell)?;
if from != active {
return Err(CutMobilityError::SourceNotActive {
requested: from.clone(),
active: active.clone(),
});
}
if &certificate.signer != from {
return Err(CutMobilityError::SignerMustMatchSource {
signer: certificate.signer.clone(),
active_source: from.clone(),
});
}
if from == to {
return Err(CutMobilityError::TargetMatchesSource {
current_source: from.clone(),
target: to.clone(),
});
}
if !contains_node(&cell.steward_set, to) {
return Err(CutMobilityError::TargetNotInStewardSet {
cell_id: cell.cell_id,
target: to.clone(),
});
}
let mut moved = advance_control_state(cell);
moved.control_capsule.active_sequencer = Some(to.clone());
Ok(moved)
}
fn certify_warm_restore(
cell: &SubjectCell,
certificate: &CutCertificate,
target: &NodeId,
restored_epoch: CellEpoch,
capsule_digest: CapsuleDigest,
) -> Result<SubjectCell, CutMobilityError> {
if certificate.consumer_state_digest == ConsumerStateDigest::ZERO {
return Err(CutMobilityError::MissingConsumerStateDigest);
}
if capsule_digest.is_zero() {
return Err(CutMobilityError::MissingCapsuleDigest);
}
if restored_epoch <= certificate.epoch {
return Err(CutMobilityError::StaleRestoreEpoch {
restored_epoch,
certificate_epoch: certificate.epoch,
});
}
let mut restored = cell.clone();
restored.epoch = restored_epoch;
restored.cell_id = CellId::for_partition(restored_epoch, &restored.subject_partition);
restored.control_capsule.active_sequencer = Some(target.clone());
restored.control_capsule.sequencer_lease_generation = restored_epoch.generation;
restored.control_capsule.policy_revision =
restored.control_capsule.policy_revision.saturating_add(1);
ensure_node_at_front(&mut restored.steward_set, target.clone());
ensure_node_at_front(&mut restored.control_capsule.steward_pool, target.clone());
Ok(restored)
}
fn certify_failover(
cell: &SubjectCell,
certificate: &CutCertificate,
failed: &NodeId,
promote_to: &NodeId,
) -> Result<SubjectCell, CutMobilityError> {
let active = require_active_sequencer(cell)?;
if failed != active {
return Err(CutMobilityError::SourceNotActive {
requested: failed.clone(),
active: active.clone(),
});
}
if &certificate.signer == failed {
return Err(CutMobilityError::FailoverSignedByFailedNode {
signer: certificate.signer.clone(),
failed: failed.clone(),
});
}
if failed == promote_to {
return Err(CutMobilityError::TargetMatchesSource {
current_source: failed.clone(),
target: promote_to.clone(),
});
}
if !contains_node(&cell.steward_set, promote_to) {
return Err(CutMobilityError::TargetNotInStewardSet {
cell_id: cell.cell_id,
target: promote_to.clone(),
});
}
let mut moved = advance_control_state(cell);
moved.steward_set.retain(|node| node != failed);
moved
.control_capsule
.steward_pool
.retain(|node| node != failed);
move_node_to_front(&mut moved.steward_set, promote_to);
move_node_to_front(&mut moved.control_capsule.steward_pool, promote_to);
moved.control_capsule.active_sequencer = Some(promote_to.clone());
Ok(moved)
}
fn require_active_sequencer(cell: &SubjectCell) -> Result<&NodeId, CutMobilityError> {
cell.control_capsule
.active_sequencer
.as_ref()
.ok_or(CutMobilityError::NoActiveSequencer {
cell_id: cell.cell_id,
})
}
fn canonicalize_frontier(
obligation_frontier: impl IntoIterator<Item = ObligationId>,
) -> Vec<ObligationId> {
let mut frontier: Vec<_> = obligation_frontier.into_iter().collect();
frontier.sort_unstable();
frontier.dedup();
frontier
}
fn advance_control_state(cell: &SubjectCell) -> SubjectCell {
let mut next = cell.clone();
next.control_capsule.sequencer_lease_generation = next
.control_capsule
.sequencer_lease_generation
.saturating_add(1);
next.control_capsule.policy_revision = next.control_capsule.policy_revision.saturating_add(1);
next
}
fn contains_node(nodes: &[NodeId], candidate: &NodeId) -> bool {
nodes.iter().any(|node| node == candidate)
}
fn move_node_to_front(nodes: &mut Vec<NodeId>, candidate: &NodeId) {
if let Some(index) = nodes.iter().position(|node| node == candidate) {
let node = nodes.remove(index);
nodes.insert(0, node);
}
}
fn move_node_to_back(nodes: &mut Vec<NodeId>, candidate: &NodeId) {
if let Some(index) = nodes.iter().position(|node| node == candidate) {
let node = nodes.remove(index);
nodes.push(node);
}
}
fn ensure_node_at_front(nodes: &mut Vec<NodeId>, candidate: NodeId) {
if let Some(index) = nodes.iter().position(|node| node == &candidate) {
let node = nodes.remove(index);
nodes.insert(0, node);
} else {
nodes.insert(0, candidate);
}
}
fn stable_hash<T: Hash>(value: T) -> u64 {
let mut hasher = DetHasher::default();
value.hash(&mut hasher);
hasher.finish()
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct IncidentSnapshot {
pub cell: SubjectCell,
pub certificate: CutCertificate,
pub original_operation: MobilityOperation,
pub label: String,
pub snapshot_time: Time,
}
impl IncidentSnapshot {
pub fn capture(
cell: &SubjectCell,
certificate: &CutCertificate,
original_operation: MobilityOperation,
label: impl Into<String>,
snapshot_time: Time,
) -> Result<Self, RehearsalError> {
let label = label.into();
certificate
.validate_for(cell)
.map_err(|e| RehearsalError::InvalidCertificate {
label: label.clone(),
source: e,
})?;
Ok(Self {
cell: cell.clone(),
certificate: certificate.clone(),
original_operation,
label,
snapshot_time,
})
}
#[must_use]
pub fn snapshot_digest(&self) -> u64 {
stable_hash((
"incident-snapshot",
self.cell.cell_id.raw(),
self.cell.epoch,
self.certificate.certificate_digest(),
&self.original_operation,
&self.label,
self.snapshot_time.as_nanos(),
))
}
pub fn replay_original(&self) -> Result<CertifiedMobility, CutMobilityError> {
self.original_operation
.certify(&self.cell, &self.certificate)
}
pub fn fork_rehearsal(
&self,
alternative: RehearsalPolicy,
rehearsal_epoch: CellEpoch,
) -> Result<RehearsalFork, RehearsalError> {
if rehearsal_epoch <= self.cell.epoch {
return Err(RehearsalError::StaleRehearsalEpoch {
rehearsal_epoch,
snapshot_epoch: self.cell.epoch,
});
}
let mut forked_cell = self.cell.clone();
forked_cell.epoch = rehearsal_epoch;
forked_cell.cell_id =
CellId::for_partition(rehearsal_epoch, &forked_cell.subject_partition);
if let Some(placement) = &alternative.placement_override {
apply_placement_override(&mut forked_cell, placement);
}
if let Some(repair) = &alternative.repair_override {
forked_cell.repair_policy = repair.clone();
}
if let Some(data) = &alternative.data_capsule_override {
forked_cell.data_capsule = data.clone();
}
if let Some(ref stewards) = alternative.steward_override {
forked_cell.steward_set.clone_from(stewards);
forked_cell
.control_capsule
.steward_pool
.clone_from(stewards);
if let Some(active) = &forked_cell.control_capsule.active_sequencer {
if !contains_node(stewards, active) {
forked_cell.control_capsule.active_sequencer = stewards.first().cloned();
}
}
}
let forked_signer = if contains_node(&forked_cell.steward_set, &self.certificate.signer) {
self.certificate.signer.clone()
} else {
forked_cell.steward_set.first().cloned().ok_or_else(|| {
RehearsalError::EmptyStewardOverride {
label: self.label.clone(),
}
})?
};
let forked_certificate = CutCertificate {
cell_id: forked_cell.cell_id,
epoch: forked_cell.epoch,
obligation_frontier: self.certificate.obligation_frontier.clone(),
consumer_state_digest: self.certificate.consumer_state_digest,
timestamp: self.snapshot_time,
signer: forked_signer,
};
Ok(RehearsalFork {
snapshot_digest: self.snapshot_digest(),
label: self.label.clone(),
forked_cell,
forked_certificate,
alternative_policy: alternative,
rehearsal_epoch,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct RehearsalPolicy {
pub placement_override: Option<PlacementOverride>,
pub repair_override: Option<RepairPolicy>,
pub data_capsule_override: Option<DataCapsule>,
pub steward_override: Option<Vec<NodeId>>,
pub description: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PlacementOverride {
pub cold_stewards: Option<usize>,
pub warm_stewards: Option<usize>,
pub hot_stewards: Option<usize>,
}
fn apply_placement_override(cell: &mut SubjectCell, overrides: &PlacementOverride) {
if let Some(hot) = overrides.hot_stewards {
cell.repair_policy.hot_witnesses = hot;
}
if let Some(cold) = overrides.cold_stewards {
cell.repair_policy.cold_witnesses = cold;
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RehearsalFork {
pub snapshot_digest: u64,
pub label: String,
pub forked_cell: SubjectCell,
pub forked_certificate: CutCertificate,
pub alternative_policy: RehearsalPolicy,
pub rehearsal_epoch: CellEpoch,
}
impl RehearsalFork {
#[must_use]
pub fn replay(&self, operation: &MobilityOperation) -> RehearsalOutcome {
let result = operation.certify(&self.forked_cell, &self.forked_certificate);
RehearsalOutcome {
snapshot_digest: self.snapshot_digest,
label: self.label.clone(),
rehearsal_epoch: self.rehearsal_epoch,
operation: operation.clone(),
result,
policy_description: self.alternative_policy.description.clone(),
}
}
#[must_use]
pub fn fork_digest(&self) -> u64 {
stable_hash((
"rehearsal-fork",
self.snapshot_digest,
self.forked_cell.cell_id.raw(),
self.forked_cell.epoch,
self.rehearsal_epoch,
))
}
}
#[derive(Debug, Clone)]
pub struct RehearsalOutcome {
pub snapshot_digest: u64,
pub label: String,
pub rehearsal_epoch: CellEpoch,
pub operation: MobilityOperation,
pub result: Result<CertifiedMobility, CutMobilityError>,
pub policy_description: String,
}
impl RehearsalOutcome {
#[must_use]
pub fn succeeded(&self) -> bool {
self.result.is_ok()
}
#[must_use]
pub fn resulting_cell(&self) -> Option<&SubjectCell> {
self.result.as_ref().ok().map(|m| &m.resulting_cell)
}
}
#[derive(Debug, Clone)]
pub struct RehearsalComparison {
pub label: String,
pub snapshot_digest: u64,
pub original: RehearsalOutcome,
pub rehearsed: RehearsalOutcome,
pub divergence: RehearsalDivergence,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RehearsalDivergence {
Equivalent,
CellDrift {
differences: Vec<String>,
},
RehearsalFailed {
error: CutMobilityError,
},
OriginalFailed {
error: CutMobilityError,
},
BothFailed {
original_error: CutMobilityError,
rehearsal_error: CutMobilityError,
},
}
impl RehearsalComparison {
pub fn compare(
snapshot: &IncidentSnapshot,
rehearsal_outcome: RehearsalOutcome,
) -> Result<Self, RehearsalError> {
let original_result = snapshot.replay_original();
let original_outcome = RehearsalOutcome {
snapshot_digest: snapshot.snapshot_digest(),
label: snapshot.label.clone(),
rehearsal_epoch: snapshot.cell.epoch,
operation: snapshot.original_operation.clone(),
result: original_result,
policy_description: "original".to_owned(),
};
let divergence = classify_divergence(&original_outcome.result, &rehearsal_outcome.result);
Ok(Self {
label: snapshot.label.clone(),
snapshot_digest: snapshot.snapshot_digest(),
original: original_outcome,
rehearsed: rehearsal_outcome,
divergence,
})
}
#[must_use]
pub fn is_equivalent(&self) -> bool {
matches!(self.divergence, RehearsalDivergence::Equivalent)
}
#[must_use]
pub fn comparison_digest(&self) -> u64 {
stable_hash((
"rehearsal-comparison",
self.snapshot_digest,
&self.label,
matches!(self.divergence, RehearsalDivergence::Equivalent),
))
}
#[must_use]
pub fn evaluate_promotion(
&self,
baseline: &CounterfactualScore,
candidate: &CounterfactualScore,
envelope: &CounterfactualPromotionEnvelope,
) -> CounterfactualPromotionDecision {
if matches!(self.divergence, RehearsalDivergence::Equivalent) {
return CounterfactualPromotionDecision::RejectEquivalent;
}
if let Some(rejection) = validate_counterfactual_score(baseline, "baseline_") {
return rejection;
}
if let Some(rejection) = validate_counterfactual_score(candidate, "") {
return rejection;
}
if let Err(error) = envelope.validate() {
return CounterfactualPromotionDecision::RejectInvalidEnvelope { error };
}
if candidate.evidence_confidence < envelope.reliability.evidence_threshold {
return CounterfactualPromotionDecision::RejectInsufficientEvidence {
observed: candidate.evidence_confidence,
required: envelope.reliability.evidence_threshold,
};
}
if candidate.violation_rate > envelope.reliability.rollback_violation_threshold {
return CounterfactualPromotionDecision::RejectRollbackRisk {
observed: candidate.violation_rate,
threshold: envelope.reliability.rollback_violation_threshold,
};
}
let Some(rehearsed_cell) = self.rehearsed.resulting_cell() else {
return CounterfactualPromotionDecision::RejectFailure {
divergence: self.divergence.clone(),
};
};
let envelope_violations = envelope.evaluate(rehearsed_cell);
if !envelope_violations.is_empty() {
return CounterfactualPromotionDecision::RejectEnvelopeViolation {
reasons: envelope_violations,
};
}
if candidate.policy_gain <= baseline.policy_gain {
return CounterfactualPromotionDecision::RejectNoImprovement {
baseline_gain: baseline.policy_gain,
candidate_gain: candidate.policy_gain,
};
}
CounterfactualPromotionDecision::Promote {
comparison_digest: self.comparison_digest(),
policy_description: self.rehearsed.policy_description.clone(),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct CounterfactualScore {
pub policy_gain: f64,
pub evidence_confidence: f64,
pub violation_rate: f64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct CounterfactualPromotionEnvelope {
pub reliability: SafetyEnvelope,
pub max_temperature: CellTemperature,
pub max_retained_message_blocks: usize,
pub max_cold_witnesses: usize,
pub max_hot_witnesses: usize,
}
impl Default for CounterfactualPromotionEnvelope {
fn default() -> Self {
Self {
reliability: SafetyEnvelope::default(),
max_temperature: CellTemperature::Hot,
max_retained_message_blocks: usize::MAX,
max_cold_witnesses: usize::MAX,
max_hot_witnesses: usize::MAX,
}
}
}
impl CounterfactualPromotionEnvelope {
fn validate(&self) -> Result<(), ReliabilityControlError> {
self.reliability.validate()
}
fn evaluate(&self, cell: &SubjectCell) -> Vec<String> {
let mut violations = Vec::new();
let steward_count = cell.steward_set.len();
if steward_count < self.reliability.min_stewards
|| steward_count > self.reliability.max_stewards
{
violations.push(format!(
"steward_count={steward_count} outside [{}, {}]",
self.reliability.min_stewards, self.reliability.max_stewards
));
}
let recoverability = u16::from(cell.repair_policy.recoverability_target);
if recoverability < self.reliability.min_repair_depth
|| recoverability > self.reliability.max_repair_depth
{
violations.push(format!(
"recoverability_target={recoverability} outside [{}, {}]",
self.reliability.min_repair_depth, self.reliability.max_repair_depth
));
}
if temperature_rank(cell.data_capsule.temperature) > temperature_rank(self.max_temperature)
{
violations.push(format!(
"temperature={:?} exceeds {:?}",
cell.data_capsule.temperature, self.max_temperature
));
}
if cell.data_capsule.retained_message_blocks > self.max_retained_message_blocks {
violations.push(format!(
"retained_message_blocks={} exceeds {}",
cell.data_capsule.retained_message_blocks, self.max_retained_message_blocks
));
}
if cell.repair_policy.cold_witnesses > self.max_cold_witnesses {
violations.push(format!(
"cold_witnesses={} exceeds {}",
cell.repair_policy.cold_witnesses, self.max_cold_witnesses
));
}
if cell.repair_policy.hot_witnesses > self.max_hot_witnesses {
violations.push(format!(
"hot_witnesses={} exceeds {}",
cell.repair_policy.hot_witnesses, self.max_hot_witnesses
));
}
violations
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum CounterfactualPromotionDecision {
Promote {
comparison_digest: u64,
policy_description: String,
},
RejectEquivalent,
RejectInvalidScore {
field: &'static str,
value: f64,
},
RejectInvalidEnvelope {
error: ReliabilityControlError,
},
RejectInsufficientEvidence {
observed: f64,
required: f64,
},
RejectRollbackRisk {
observed: f64,
threshold: f64,
},
RejectEnvelopeViolation {
reasons: Vec<String>,
},
RejectNoImprovement {
baseline_gain: f64,
candidate_gain: f64,
},
RejectFailure {
divergence: RehearsalDivergence,
},
}
fn validate_counterfactual_score(
score: &CounterfactualScore,
field_prefix: &'static str,
) -> Option<CounterfactualPromotionDecision> {
if !score.policy_gain.is_finite() {
return Some(CounterfactualPromotionDecision::RejectInvalidScore {
field: invalid_score_field(field_prefix, "policy_gain"),
value: score.policy_gain,
});
}
if !score.evidence_confidence.is_finite()
|| score.evidence_confidence < 0.0
|| score.evidence_confidence > 1.0
{
return Some(CounterfactualPromotionDecision::RejectInvalidScore {
field: invalid_score_field(field_prefix, "evidence_confidence"),
value: score.evidence_confidence,
});
}
if !score.violation_rate.is_finite() || score.violation_rate < 0.0 || score.violation_rate > 1.0
{
return Some(CounterfactualPromotionDecision::RejectInvalidScore {
field: invalid_score_field(field_prefix, "violation_rate"),
value: score.violation_rate,
});
}
None
}
fn invalid_score_field(field_prefix: &'static str, field_name: &'static str) -> &'static str {
match (field_prefix, field_name) {
("", "policy_gain") => "policy_gain",
("", "evidence_confidence") => "evidence_confidence",
("", "violation_rate") => "violation_rate",
("baseline_", "policy_gain") => "baseline_policy_gain",
("baseline_", "evidence_confidence") => "baseline_evidence_confidence",
("baseline_", "violation_rate") => "baseline_violation_rate",
_ => "invalid_score",
}
}
fn temperature_rank(temperature: CellTemperature) -> u8 {
match temperature {
CellTemperature::Cold => 0,
CellTemperature::Warm => 1,
CellTemperature::Hot => 2,
}
}
fn classify_divergence(
original: &Result<CertifiedMobility, CutMobilityError>,
rehearsed: &Result<CertifiedMobility, CutMobilityError>,
) -> RehearsalDivergence {
match (original, rehearsed) {
(Ok(orig), Ok(reh)) => {
let diffs = diff_cells(&orig.resulting_cell, &reh.resulting_cell);
if diffs.is_empty() {
RehearsalDivergence::Equivalent
} else {
RehearsalDivergence::CellDrift { differences: diffs }
}
}
(Ok(_), Err(e)) => RehearsalDivergence::RehearsalFailed { error: e.clone() },
(Err(e), Ok(_)) => RehearsalDivergence::OriginalFailed { error: e.clone() },
(Err(o), Err(r)) => RehearsalDivergence::BothFailed {
original_error: o.clone(),
rehearsal_error: r.clone(),
},
}
}
fn diff_cells(a: &SubjectCell, b: &SubjectCell) -> Vec<String> {
let mut diffs = Vec::new();
if a.cell_id != b.cell_id {
diffs.push("cell_id".to_owned());
}
if a.epoch != b.epoch {
diffs.push("epoch".to_owned());
}
if a.subject_partition != b.subject_partition {
diffs.push("subject_partition".to_owned());
}
if a.steward_set != b.steward_set {
diffs.push("steward_set".to_owned());
}
if a.control_capsule.active_sequencer != b.control_capsule.active_sequencer {
diffs.push("active_sequencer".to_owned());
}
if a.control_capsule.sequencer_lease_generation != b.control_capsule.sequencer_lease_generation
{
diffs.push("sequencer_lease_generation".to_owned());
}
if a.control_capsule.policy_revision != b.control_capsule.policy_revision {
diffs.push("policy_revision".to_owned());
}
if a.control_capsule.steward_pool != b.control_capsule.steward_pool {
diffs.push("steward_pool".to_owned());
}
if a.data_capsule != b.data_capsule {
diffs.push("data_capsule".to_owned());
}
if a.repair_policy != b.repair_policy {
diffs.push("repair_policy".to_owned());
}
diffs
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum RehearsalError {
#[error("incident snapshot `{label}`: invalid certificate: {source}")]
InvalidCertificate {
label: String,
source: CutMobilityError,
},
#[error(
"rehearsal epoch {rehearsal_epoch:?} must be newer than snapshot epoch {snapshot_epoch:?}"
)]
StaleRehearsalEpoch {
rehearsal_epoch: CellEpoch,
snapshot_epoch: CellEpoch,
},
#[error("rehearsal `{label}`: steward_override produced an empty steward set")]
EmptyStewardOverride {
label: String,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub enum CutAccessClass {
#[default]
Operator,
ServiceScoped,
AuditOnly,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct PolicyRegime {
pub policy_revision: u64,
pub label: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum MaterializationState {
Materialized,
#[default]
Reconstructible,
Compacted,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CutIndexEntry {
pub entry_id: u64,
pub certificate: CutCertificate,
pub cell_id: CellId,
pub epoch: CellEpoch,
pub policy_regime: PolicyRegime,
pub access_class: CutAccessClass,
pub live_obligation_count: usize,
pub resolved_obligation_count: usize,
pub descendant_branches: Vec<u64>,
pub materialization: MaterializationState,
pub indexed_at: Time,
}
impl CutIndexEntry {
#[must_use]
pub fn entry_digest(&self) -> u64 {
stable_hash((
"cut-index-entry",
self.entry_id,
self.certificate.certificate_digest(),
self.cell_id.raw(),
self.epoch,
&self.policy_regime,
))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CutRetentionPolicy {
pub max_materialized_per_cell: usize,
pub max_age_nanos: u64,
pub min_retained_per_cell: usize,
}
impl Default for CutRetentionPolicy {
fn default() -> Self {
Self {
max_materialized_per_cell: 64,
max_age_nanos: 3_600_000_000_000, min_retained_per_cell: 2,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct CutIndexQuery {
pub cell_id: Option<CellId>,
pub after: Option<Time>,
pub before: Option<Time>,
pub policy_label: Option<String>,
pub max_access_class: Option<CutAccessClass>,
pub covers_obligation: Option<ObligationId>,
pub materialized_only: bool,
pub limit: usize,
}
#[derive(Debug, Clone)]
pub struct CutLatticeIndex {
entries: Vec<CutIndexEntry>,
retention: CutRetentionPolicy,
}
impl CutLatticeIndex {
#[must_use]
pub fn new(retention: CutRetentionPolicy) -> Self {
Self {
entries: Vec::new(),
retention,
}
}
#[must_use]
pub fn with_defaults() -> Self {
Self::new(CutRetentionPolicy::default())
}
pub fn index_cut(
&mut self,
certificate: CutCertificate,
policy_regime: PolicyRegime,
access_class: CutAccessClass,
live_obligation_count: usize,
resolved_obligation_count: usize,
indexed_at: Time,
) -> u64 {
let entry_id = certificate.certificate_digest();
if self.entries.iter().any(|e| e.entry_id == entry_id) {
return entry_id;
}
let cell_id = certificate.cell_id;
let epoch = certificate.epoch;
let entry = CutIndexEntry {
entry_id,
certificate,
cell_id,
epoch,
policy_regime,
access_class,
live_obligation_count,
resolved_obligation_count,
descendant_branches: Vec::new(),
materialization: MaterializationState::Materialized,
indexed_at,
};
self.entries.push(entry);
entry_id
}
pub fn register_descendant(&mut self, entry_id: u64, branch_digest: u64) -> bool {
if let Some(entry) = self.entries.iter_mut().find(|e| e.entry_id == entry_id) {
if !entry.descendant_branches.contains(&branch_digest) {
entry.descendant_branches.push(branch_digest);
}
true
} else {
false
}
}
#[must_use]
pub fn query(&self, q: &CutIndexQuery) -> Vec<&CutIndexEntry> {
let mut results: Vec<&CutIndexEntry> = self
.entries
.iter()
.filter(|e| {
if let Some(cell_id) = q.cell_id {
if e.cell_id != cell_id {
return false;
}
}
if let Some(after) = q.after {
if e.certificate.timestamp < after {
return false;
}
}
if let Some(before) = q.before {
if e.certificate.timestamp > before {
return false;
}
}
if let Some(ref label) = q.policy_label {
if &e.policy_regime.label != label {
return false;
}
}
if let Some(max_class) = q.max_access_class {
if e.access_class > max_class {
return false;
}
}
if let Some(obligation) = q.covers_obligation {
if !e.certificate.covers_obligation(obligation) {
return false;
}
}
if q.materialized_only && e.materialization != MaterializationState::Materialized {
return false;
}
true
})
.collect();
results.sort_by(|a, b| {
b.certificate
.timestamp
.as_nanos()
.cmp(&a.certificate.timestamp.as_nanos())
});
if q.limit > 0 {
results.truncate(q.limit);
}
results
}
#[must_use]
pub fn latest_for_cell(&self, cell_id: CellId) -> Option<&CutIndexEntry> {
self.query(&CutIndexQuery {
cell_id: Some(cell_id),
materialized_only: true,
limit: 1,
..Default::default()
})
.into_iter()
.next()
}
#[must_use]
pub fn latest_for_policy(&self, policy_label: &str) -> Option<&CutIndexEntry> {
self.query(&CutIndexQuery {
policy_label: Some(policy_label.to_owned()),
limit: 1,
..Default::default()
})
.into_iter()
.next()
}
#[must_use]
pub fn len(&self) -> usize {
self.entries.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn compact(&mut self, current_time: Time) -> usize {
let mut compacted = 0;
let mut cell_ids: Vec<CellId> = self.entries.iter().map(|e| e.cell_id).collect();
cell_ids.sort_unstable();
cell_ids.dedup();
for cell_id in &cell_ids {
let mut cell_indices: Vec<usize> = self
.entries
.iter()
.enumerate()
.filter(|(_, e)| &e.cell_id == cell_id)
.map(|(i, _)| i)
.collect();
cell_indices.sort_by(|&a, &b| {
self.entries[b]
.certificate
.timestamp
.as_nanos()
.cmp(&self.entries[a].certificate.timestamp.as_nanos())
});
let active_indices: Vec<usize> = cell_indices
.iter()
.copied()
.filter(|&idx| self.entries[idx].materialization != MaterializationState::Compacted)
.collect();
for (rank, &idx) in active_indices.iter().enumerate() {
let age_nanos = current_time
.as_nanos()
.saturating_sub(self.entries[idx].indexed_at.as_nanos());
let exceeds_age = age_nanos > self.retention.max_age_nanos;
let exceeds_count = rank >= self.retention.max_materialized_per_cell;
if (exceeds_age || exceeds_count) && rank >= self.retention.min_retained_per_cell {
self.entries[idx].materialization = MaterializationState::Compacted;
compacted += 1;
} else if rank >= self.retention.max_materialized_per_cell
&& rank < self.retention.min_retained_per_cell
{
if self.entries[idx].materialization == MaterializationState::Materialized {
self.entries[idx].materialization = MaterializationState::Reconstructible;
}
}
}
}
self.entries.retain(|e| {
let age_nanos = current_time
.as_nanos()
.saturating_sub(e.indexed_at.as_nanos());
!(e.materialization == MaterializationState::Compacted
&& age_nanos > self.retention.max_age_nanos.saturating_mul(2))
});
compacted
}
pub fn materialize(&mut self, entry_id: u64) -> bool {
if let Some(entry) = self.entries.iter_mut().find(|e| e.entry_id == entry_id) {
if entry.materialization == MaterializationState::Reconstructible {
entry.materialization = MaterializationState::Materialized;
return true;
}
}
false
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum BranchType {
Live,
Lagged,
Replayed,
Canary,
Forensic,
}
impl BranchType {
#[must_use]
pub const fn default_mutable(self) -> bool {
matches!(self, Self::Live | Self::Canary)
}
#[must_use]
pub const fn is_fenced(self) -> bool {
!matches!(self, Self::Live)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum BranchAccessPolicy {
ReadOnly,
Sandboxed,
ReadWrite,
}
impl BranchAccessPolicy {
#[must_use]
pub const fn allows_writes(self) -> bool {
matches!(self, Self::Sandboxed | Self::ReadWrite)
}
#[must_use]
pub const fn propagates_to_live(self) -> bool {
matches!(self, Self::ReadWrite)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BranchAddress {
pub branch_id: u64,
pub branch_type: BranchType,
pub cut_entry_id: u64,
pub cell_id: CellId,
pub policy_label: String,
pub access_policy: BranchAccessPolicy,
pub description: String,
pub created_at: Time,
}
impl BranchAddress {
fn new(
branch_type: BranchType,
cut_entry_id: u64,
cell_id: CellId,
policy_label: impl Into<String>,
access_policy: BranchAccessPolicy,
description: impl Into<String>,
created_at: Time,
) -> Self {
let policy_label = policy_label.into();
let description = description.into();
let branch_id = stable_hash((
"branch-address",
branch_type,
cut_entry_id,
cell_id.raw(),
&policy_label,
));
Self {
branch_id,
branch_type,
cut_entry_id,
cell_id,
policy_label,
access_policy,
description,
created_at,
}
}
#[must_use]
pub fn address_digest(&self) -> u64 {
self.branch_id
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BranchError {
ReadWriteOnNonLive {
branch_type: BranchType,
},
CutNotFound {
entry_id: u64,
},
DuplicateBranch {
branch_id: u64,
},
}
impl std::fmt::Display for BranchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ReadWriteOnNonLive { branch_type } => {
write!(f, "ReadWrite access not allowed on {branch_type:?} branch")
}
Self::CutNotFound { entry_id } => {
write!(f, "cut entry {entry_id} not found in index")
}
Self::DuplicateBranch { branch_id } => {
write!(f, "branch {branch_id} already exists")
}
}
}
}
impl std::error::Error for BranchError {}
#[derive(Debug, Clone)]
pub struct BranchRegistry {
branches: Vec<BranchAddress>,
}
impl BranchRegistry {
#[must_use]
pub fn new() -> Self {
Self {
branches: Vec::new(),
}
}
pub fn create_live(
&mut self,
cell_id: CellId,
created_at: Time,
) -> Result<&BranchAddress, BranchError> {
let branch = BranchAddress::new(
BranchType::Live,
0,
cell_id,
"",
BranchAccessPolicy::ReadWrite,
"live production branch",
created_at,
);
self.insert(branch)
}
pub fn create_lagged(
&mut self,
cut_entry_id: u64,
cell_id: CellId,
description: impl Into<String>,
created_at: Time,
index: &CutLatticeIndex,
) -> Result<&BranchAddress, BranchError> {
Self::require_cut_exists(cut_entry_id, index)?;
let branch = BranchAddress::new(
BranchType::Lagged,
cut_entry_id,
cell_id,
"",
BranchAccessPolicy::ReadOnly,
description,
created_at,
);
self.insert(branch)
}
pub fn create_replayed(
&mut self,
cut_entry_id: u64,
cell_id: CellId,
policy_label: impl Into<String>,
description: impl Into<String>,
created_at: Time,
index: &CutLatticeIndex,
) -> Result<&BranchAddress, BranchError> {
Self::require_cut_exists(cut_entry_id, index)?;
let branch = BranchAddress::new(
BranchType::Replayed,
cut_entry_id,
cell_id,
policy_label,
BranchAccessPolicy::ReadOnly,
description,
created_at,
);
self.insert(branch)
}
pub fn create_canary(
&mut self,
cut_entry_id: u64,
cell_id: CellId,
policy_label: impl Into<String>,
description: impl Into<String>,
created_at: Time,
index: &CutLatticeIndex,
) -> Result<&BranchAddress, BranchError> {
Self::require_cut_exists(cut_entry_id, index)?;
let branch = BranchAddress::new(
BranchType::Canary,
cut_entry_id,
cell_id,
policy_label,
BranchAccessPolicy::Sandboxed,
description,
created_at,
);
self.insert(branch)
}
pub fn create_forensic(
&mut self,
cut_entry_id: u64,
cell_id: CellId,
description: impl Into<String>,
created_at: Time,
index: &CutLatticeIndex,
) -> Result<&BranchAddress, BranchError> {
Self::require_cut_exists(cut_entry_id, index)?;
let branch = BranchAddress::new(
BranchType::Forensic,
cut_entry_id,
cell_id,
"",
BranchAccessPolicy::ReadOnly,
description,
created_at,
);
self.insert(branch)
}
#[must_use]
pub fn get(&self, branch_id: u64) -> Option<&BranchAddress> {
self.branches.iter().find(|b| b.branch_id == branch_id)
}
#[must_use]
pub fn live_for_cell(&self, cell_id: CellId) -> Option<&BranchAddress> {
self.branches
.iter()
.find(|b| b.branch_type == BranchType::Live && b.cell_id == cell_id)
}
#[must_use]
pub fn branches_for_cell(&self, cell_id: CellId) -> Vec<&BranchAddress> {
self.branches
.iter()
.filter(|b| b.cell_id == cell_id)
.collect()
}
#[must_use]
pub fn branches_of_type(&self, branch_type: BranchType) -> Vec<&BranchAddress> {
self.branches
.iter()
.filter(|b| b.branch_type == branch_type)
.collect()
}
#[must_use]
pub fn len(&self) -> usize {
self.branches.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.branches.is_empty()
}
pub fn remove(&mut self, branch_id: u64) -> bool {
let before = self.branches.len();
self.branches.retain(|b| b.branch_id != branch_id);
self.branches.len() < before
}
fn require_cut_exists(entry_id: u64, index: &CutLatticeIndex) -> Result<(), BranchError> {
if index.entries.iter().any(|e| e.entry_id == entry_id) {
Ok(())
} else {
Err(BranchError::CutNotFound { entry_id })
}
}
fn insert(&mut self, branch: BranchAddress) -> Result<&BranchAddress, BranchError> {
if self
.branches
.iter()
.any(|b| b.branch_id == branch.branch_id)
{
return Err(BranchError::DuplicateBranch {
branch_id: branch.branch_id,
});
}
self.branches.push(branch);
Ok(self.branches.last().expect("just pushed"))
}
}
impl Default for BranchRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::messaging::fabric::{
CellTemperature, DataCapsule, NodeRole, PlacementPolicy, RepairPolicy, StewardCandidate,
StorageClass, SubjectPattern,
};
fn candidate(name: &str, domain: &str) -> StewardCandidate {
StewardCandidate::new(NodeId::new(name), domain)
.with_role(NodeRole::Steward)
.with_role(NodeRole::RepairWitness)
.with_storage_class(StorageClass::Durable)
}
fn test_cell() -> SubjectCell {
SubjectCell::new(
&SubjectPattern::parse("orders.created").expect("pattern"),
CellEpoch::new(7, 11),
&[
candidate("node-a", "rack-a"),
candidate("node-b", "rack-b"),
candidate("node-c", "rack-c"),
],
&PlacementPolicy {
cold_stewards: 3,
warm_stewards: 3,
hot_stewards: 3,
..PlacementPolicy::default()
},
RepairPolicy::default(),
DataCapsule {
temperature: CellTemperature::Warm,
retained_message_blocks: 4,
},
)
.expect("cell")
}
fn obligation(index: u32) -> ObligationId {
ObligationId::new_for_test(index, 0)
}
#[test]
fn evacuation_carries_obligation_frontier_proof() {
let cell = test_cell();
let certificate = cell
.issue_cut_certificate(
[obligation(7), obligation(3), obligation(7)],
ConsumerStateDigest::new(0xfeed_cafe),
Time::from_secs(9),
NodeId::new("node-a"),
)
.expect("certificate");
let proof = cell
.certify_mobility(
&certificate,
&MobilityOperation::Evacuate {
from: NodeId::new("node-a"),
to: NodeId::new("node-b"),
},
)
.expect("evacuation proof");
assert_eq!(
certificate.obligation_frontier,
vec![obligation(3), obligation(7)]
);
assert!(certificate.covers_obligation(obligation(3)));
assert_eq!(
proof.obligation_frontier_digest,
certificate.obligation_frontier_digest()
);
assert_eq!(
proof.resulting_cell.control_capsule.active_sequencer,
Some(NodeId::new("node-b"))
);
assert_eq!(
proof.resulting_cell.steward_set.first(),
Some(&NodeId::new("node-b"))
);
assert_eq!(
proof.resulting_cell.steward_set.last(),
Some(&NodeId::new("node-a"))
);
assert_eq!(
proof
.resulting_cell
.control_capsule
.sequencer_lease_generation,
cell.control_capsule.sequencer_lease_generation + 1
);
}
#[test]
fn handoff_uses_explicit_cut_certificate() {
let cell = test_cell();
let certificate = cell
.issue_cut_certificate(
[obligation(10)],
ConsumerStateDigest::new(0x1234),
Time::from_secs(11),
NodeId::new("node-a"),
)
.expect("certificate");
let proof = cell
.certify_mobility(
&certificate,
&MobilityOperation::Handoff {
from: NodeId::new("node-a"),
to: NodeId::new("node-c"),
},
)
.expect("handoff proof");
assert_eq!(
proof.resulting_cell.control_capsule.active_sequencer,
Some(NodeId::new("node-c"))
);
assert_eq!(proof.resulting_cell.steward_set, cell.steward_set);
assert_eq!(
proof.resulting_cell.control_capsule.steward_pool,
cell.control_capsule.steward_pool
);
assert_eq!(
proof.resulting_cell.control_capsule.policy_revision,
cell.control_capsule.policy_revision + 1
);
}
#[test]
fn warm_restore_rebinds_epoch_and_cell_id_from_capsule() {
let cell = test_cell();
let restored_epoch = CellEpoch::new(8, 1);
let certificate = cell
.issue_cut_certificate(
[obligation(2)],
ConsumerStateDigest::new(0xface_b00c),
Time::from_secs(13),
NodeId::new("node-a"),
)
.expect("certificate");
let proof = cell
.certify_mobility(
&certificate,
&MobilityOperation::WarmRestore {
target: NodeId::new("edge-restore"),
restored_epoch,
capsule_digest: CapsuleDigest::new(0x9abc),
},
)
.expect("warm restore proof");
assert_eq!(proof.resulting_cell.epoch, restored_epoch);
assert_eq!(
proof.resulting_cell.cell_id,
CellId::for_partition(restored_epoch, &cell.subject_partition)
);
assert_ne!(proof.resulting_cell.cell_id, cell.cell_id);
assert_eq!(
proof.resulting_cell.control_capsule.active_sequencer,
Some(NodeId::new("edge-restore"))
);
assert_eq!(
proof.resulting_cell.steward_set.first(),
Some(&NodeId::new("edge-restore"))
);
}
#[test]
fn failover_removes_failed_steward_and_promotes_replacement() {
let cell = test_cell();
let certificate = cell
.issue_cut_certificate(
[obligation(1), obligation(4)],
ConsumerStateDigest::new(0x2222),
Time::from_secs(21),
NodeId::new("node-b"),
)
.expect("certificate");
let proof = cell
.certify_mobility(
&certificate,
&MobilityOperation::Failover {
failed: NodeId::new("node-a"),
promote_to: NodeId::new("node-c"),
},
)
.expect("failover proof");
assert_eq!(
proof.resulting_cell.control_capsule.active_sequencer,
Some(NodeId::new("node-c"))
);
assert!(
!proof
.resulting_cell
.steward_set
.contains(&NodeId::new("node-a"))
);
assert!(
!proof
.resulting_cell
.control_capsule
.steward_pool
.contains(&NodeId::new("node-a"))
);
assert_eq!(
proof.resulting_cell.steward_set.first(),
Some(&NodeId::new("node-c"))
);
assert_eq!(
proof
.resulting_cell
.control_capsule
.sequencer_lease_generation,
cell.control_capsule.sequencer_lease_generation + 1
);
}
#[test]
fn warm_restore_rejects_missing_capsule_or_consumer_state() {
let cell = test_cell();
let empty_certificate = cell
.issue_cut_certificate(
[],
ConsumerStateDigest::ZERO,
Time::from_secs(14),
NodeId::new("node-a"),
)
.expect("certificate");
let err = cell
.certify_mobility(
&empty_certificate,
&MobilityOperation::WarmRestore {
target: NodeId::new("edge-restore"),
restored_epoch: CellEpoch::new(9, 1),
capsule_digest: CapsuleDigest::ZERO,
},
)
.expect_err("restore without state must fail");
assert_eq!(err, CutMobilityError::MissingConsumerStateDigest);
}
fn make_snapshot() -> (SubjectCell, CutCertificate, IncidentSnapshot) {
let cell = test_cell();
let cert = cell
.issue_cut_certificate(
[obligation(5), obligation(10)],
ConsumerStateDigest::new(0xdead),
Time::from_secs(100),
NodeId::new("node-a"),
)
.expect("certificate");
let snap = IncidentSnapshot::capture(
&cell,
&cert,
MobilityOperation::Evacuate {
from: NodeId::new("node-a"),
to: NodeId::new("node-b"),
},
"test-outage-1",
Time::from_secs(100),
)
.expect("snapshot");
(cell, cert, snap)
}
#[test]
fn incident_snapshot_captures_cell_and_certificate() {
let (cell, cert, snap) = make_snapshot();
assert_eq!(snap.cell.cell_id, cell.cell_id);
assert_eq!(
snap.certificate.certificate_digest(),
cert.certificate_digest()
);
assert_eq!(snap.label, "test-outage-1");
}
#[test]
fn incident_snapshot_digest_is_deterministic() {
let (_, _, snap1) = make_snapshot();
let (_, _, snap2) = make_snapshot();
assert_eq!(snap1.snapshot_digest(), snap2.snapshot_digest());
}
#[test]
fn incident_snapshot_rejects_invalid_certificate() {
let cell = test_cell();
let bad_cert = CutCertificate {
cell_id: cell.cell_id,
epoch: CellEpoch::new(99, 99),
obligation_frontier: vec![],
consumer_state_digest: ConsumerStateDigest::ZERO,
timestamp: Time::from_secs(1),
signer: NodeId::new("node-a"),
};
let err = IncidentSnapshot::capture(
&cell,
&bad_cert,
MobilityOperation::Evacuate {
from: NodeId::new("node-a"),
to: NodeId::new("node-b"),
},
"bad-cert",
Time::from_secs(1),
)
.expect_err("should reject mismatched cert");
assert!(matches!(err, RehearsalError::InvalidCertificate { .. }));
}
#[test]
fn replay_original_produces_same_result_as_direct_certify() {
let (cell, cert, snap) = make_snapshot();
let direct = snap
.original_operation
.certify(&cell, &cert)
.expect("direct");
let replayed = snap.replay_original().expect("replayed");
assert_eq!(
direct.resulting_cell.control_capsule.active_sequencer,
replayed.resulting_cell.control_capsule.active_sequencer,
);
assert_eq!(direct.mobility_digest(), replayed.mobility_digest());
}
#[test]
fn fork_rehearsal_rebases_epoch_and_cell_id() {
let (_, _, snap) = make_snapshot();
let rehearsal_epoch = CellEpoch::new(10, 1);
let fork = snap
.fork_rehearsal(RehearsalPolicy::default(), rehearsal_epoch)
.expect("fork");
assert_eq!(fork.forked_cell.epoch, rehearsal_epoch);
assert_ne!(fork.forked_cell.cell_id, snap.cell.cell_id);
assert_eq!(fork.forked_certificate.epoch, rehearsal_epoch);
assert_eq!(fork.forked_certificate.cell_id, fork.forked_cell.cell_id);
assert_eq!(fork.snapshot_digest, snap.snapshot_digest());
}
#[test]
fn fork_rehearsal_rejects_stale_epoch() {
let (_, _, snap) = make_snapshot();
let stale = snap.cell.epoch;
let err = snap
.fork_rehearsal(RehearsalPolicy::default(), stale)
.expect_err("stale epoch must fail");
assert!(matches!(err, RehearsalError::StaleRehearsalEpoch { .. }));
}
#[test]
fn fork_with_steward_override_replaces_steward_set() {
let (_, _, snap) = make_snapshot();
let new_stewards = vec![
NodeId::new("node-x"),
NodeId::new("node-y"),
NodeId::new("node-z"),
];
let fork = snap
.fork_rehearsal(
RehearsalPolicy {
steward_override: Some(new_stewards.clone()),
description: "different steward set".to_owned(),
..Default::default()
},
CellEpoch::new(10, 1),
)
.expect("fork");
assert_eq!(fork.forked_cell.steward_set, new_stewards);
assert_eq!(fork.forked_cell.control_capsule.steward_pool, new_stewards);
assert_eq!(fork.forked_certificate.signer, NodeId::new("node-x"));
}
#[test]
fn fork_with_steward_override_that_excludes_signer_still_replays() {
let (_, _, snap) = make_snapshot();
let new_stewards = vec![NodeId::new("node-x"), NodeId::new("node-y")];
let fork = snap
.fork_rehearsal(
RehearsalPolicy {
steward_override: Some(new_stewards),
description: "entirely new steward set".to_owned(),
..Default::default()
},
CellEpoch::new(10, 1),
)
.expect("fork with replaced signers");
let outcome = fork.replay(&MobilityOperation::Evacuate {
from: NodeId::new("node-x"),
to: NodeId::new("node-y"),
});
assert!(
outcome.succeeded(),
"rehearsal with completely replaced steward set must succeed"
);
}
#[test]
fn fork_with_data_capsule_override() {
let (_, _, snap) = make_snapshot();
let fork = snap
.fork_rehearsal(
RehearsalPolicy {
data_capsule_override: Some(DataCapsule {
temperature: CellTemperature::Hot,
retained_message_blocks: 16,
}),
description: "hot temperature rehearsal".to_owned(),
..Default::default()
},
CellEpoch::new(10, 1),
)
.expect("fork");
assert_eq!(
fork.forked_cell.data_capsule.temperature,
CellTemperature::Hot
);
assert_eq!(fork.forked_cell.data_capsule.retained_message_blocks, 16);
}
#[test]
fn rehearsal_replay_succeeds_with_same_operation() {
let (_, _, snap) = make_snapshot();
let rehearsal_epoch = CellEpoch::new(10, 1);
let fork = snap
.fork_rehearsal(RehearsalPolicy::default(), rehearsal_epoch)
.expect("fork");
let outcome = fork.replay(&MobilityOperation::Evacuate {
from: NodeId::new("node-a"),
to: NodeId::new("node-b"),
});
assert!(outcome.succeeded());
let resulting = outcome.resulting_cell().expect("cell");
assert_eq!(
resulting.control_capsule.active_sequencer,
Some(NodeId::new("node-b"))
);
}
#[test]
fn rehearsal_replay_with_different_operation() {
let (_, _, snap) = make_snapshot();
let rehearsal_epoch = CellEpoch::new(10, 1);
let fork = snap
.fork_rehearsal(RehearsalPolicy::default(), rehearsal_epoch)
.expect("fork");
let outcome = fork.replay(&MobilityOperation::Handoff {
from: NodeId::new("node-a"),
to: NodeId::new("node-c"),
});
assert!(outcome.succeeded());
assert_eq!(
outcome
.resulting_cell()
.expect("cell")
.control_capsule
.active_sequencer,
Some(NodeId::new("node-c"))
);
}
#[test]
fn rehearsal_replay_captures_failure() {
let (_, _, snap) = make_snapshot();
let rehearsal_epoch = CellEpoch::new(10, 1);
let fork = snap
.fork_rehearsal(
RehearsalPolicy {
steward_override: Some(vec![NodeId::new("node-a"), NodeId::new("node-c")]),
description: "steward set without node-b".to_owned(),
..Default::default()
},
rehearsal_epoch,
)
.expect("fork");
let outcome = fork.replay(&MobilityOperation::Evacuate {
from: NodeId::new("node-a"),
to: NodeId::new("node-b"),
});
assert!(!outcome.succeeded());
}
#[test]
fn comparison_detects_cell_drift() {
let (_, _, snap) = make_snapshot();
let fork = snap
.fork_rehearsal(RehearsalPolicy::default(), CellEpoch::new(10, 1))
.expect("fork");
let rehearsal_outcome = fork.replay(&snap.original_operation);
let comparison =
RehearsalComparison::compare(&snap, rehearsal_outcome).expect("comparison");
assert!(!comparison.is_equivalent());
match &comparison.divergence {
RehearsalDivergence::CellDrift { differences } => {
assert!(differences.contains(&"cell_id".to_owned()));
assert!(differences.contains(&"epoch".to_owned()));
}
other => panic!("expected CellDrift, got {other:?}"),
}
}
#[test]
fn comparison_detects_rehearsal_failure() {
let (_, _, snap) = make_snapshot();
let fork = snap
.fork_rehearsal(
RehearsalPolicy {
steward_override: Some(vec![NodeId::new("node-a"), NodeId::new("node-c")]),
description: "no node-b".to_owned(),
..Default::default()
},
CellEpoch::new(10, 1),
)
.expect("fork");
let rehearsal_outcome = fork.replay(&MobilityOperation::Evacuate {
from: NodeId::new("node-a"),
to: NodeId::new("node-b"),
});
let comparison =
RehearsalComparison::compare(&snap, rehearsal_outcome).expect("comparison");
assert!(matches!(
comparison.divergence,
RehearsalDivergence::RehearsalFailed { .. }
));
}
#[test]
fn comparison_digest_is_deterministic() {
let (_, _, snap) = make_snapshot();
let fork = snap
.fork_rehearsal(RehearsalPolicy::default(), CellEpoch::new(10, 1))
.expect("fork");
let outcome = fork.replay(&snap.original_operation);
let c1 = RehearsalComparison::compare(&snap, outcome).expect("comparison");
let fork2 = snap
.fork_rehearsal(RehearsalPolicy::default(), CellEpoch::new(10, 1))
.expect("fork");
let outcome2 = fork2.replay(&snap.original_operation);
let c2 = RehearsalComparison::compare(&snap, outcome2).expect("comparison");
assert_eq!(c1.comparison_digest(), c2.comparison_digest());
}
#[test]
fn fork_digest_is_deterministic() {
let (_, _, snap) = make_snapshot();
let fork1 = snap
.fork_rehearsal(RehearsalPolicy::default(), CellEpoch::new(10, 1))
.expect("fork1");
let fork2 = snap
.fork_rehearsal(RehearsalPolicy::default(), CellEpoch::new(10, 1))
.expect("fork2");
assert_eq!(fork1.fork_digest(), fork2.fork_digest());
}
#[test]
fn rehearsal_with_repair_policy_override() {
let (_, _, snap) = make_snapshot();
let fork = snap
.fork_rehearsal(
RehearsalPolicy {
repair_override: Some(RepairPolicy {
recoverability_target: 5,
cold_witnesses: 10,
hot_witnesses: 10,
}),
description: "aggressive repair".to_owned(),
..Default::default()
},
CellEpoch::new(10, 1),
)
.expect("fork");
assert_eq!(fork.forked_cell.repair_policy.recoverability_target, 5);
assert_eq!(fork.forked_cell.repair_policy.cold_witnesses, 10);
let outcome = fork.replay(&snap.original_operation);
assert!(outcome.succeeded());
}
#[test]
fn end_to_end_rehearsal_workflow() {
let cell = test_cell();
let cert = cell
.issue_cut_certificate(
[obligation(1), obligation(2), obligation(3)],
ConsumerStateDigest::new(0xbeef),
Time::from_secs(200),
NodeId::new("node-b"),
)
.expect("certificate");
let snap = IncidentSnapshot::capture(
&cell,
&cert,
MobilityOperation::Failover {
failed: NodeId::new("node-a"),
promote_to: NodeId::new("node-c"),
},
"failover-incident-2026-03-20",
Time::from_secs(200),
)
.expect("snapshot");
let fork = snap
.fork_rehearsal(
RehearsalPolicy {
steward_override: Some(vec![
NodeId::new("node-a"),
NodeId::new("node-b"),
NodeId::new("node-d"),
]),
description: "failover with node-d instead of node-c".to_owned(),
..Default::default()
},
CellEpoch::new(10, 1),
)
.expect("fork");
let outcome = fork.replay(&MobilityOperation::Failover {
failed: NodeId::new("node-a"),
promote_to: NodeId::new("node-d"),
});
assert!(outcome.succeeded());
assert_eq!(
outcome
.resulting_cell()
.expect("cell")
.control_capsule
.active_sequencer,
Some(NodeId::new("node-d"))
);
let comparison = RehearsalComparison::compare(&snap, outcome).expect("comparison");
assert!(!comparison.is_equivalent());
match &comparison.divergence {
RehearsalDivergence::CellDrift { differences } => {
assert!(differences.contains(&"active_sequencer".to_owned()));
assert!(differences.contains(&"steward_set".to_owned()));
}
other => panic!("expected CellDrift, got {other:?}"),
}
}
#[test]
fn promotion_accepts_rehearsal_that_beats_failed_original_within_envelope() {
let (_, _, snap) = make_snapshot();
let fork = snap
.fork_rehearsal(
RehearsalPolicy {
steward_override: Some(vec![
NodeId::new("node-a"),
NodeId::new("node-b"),
NodeId::new("node-d"),
]),
description: "recover with node-d".to_owned(),
..Default::default()
},
CellEpoch::new(10, 1),
)
.expect("fork");
let outcome = fork.replay(&MobilityOperation::Evacuate {
from: NodeId::new("node-a"),
to: NodeId::new("node-d"),
});
let comparison = RehearsalComparison::compare(&snap, outcome).expect("comparison");
let decision = comparison.evaluate_promotion(
&CounterfactualScore {
policy_gain: 0.1,
evidence_confidence: 0.9,
violation_rate: 0.1,
},
&CounterfactualScore {
policy_gain: 0.8,
evidence_confidence: 0.95,
violation_rate: 0.1,
},
&CounterfactualPromotionEnvelope {
reliability: SafetyEnvelope {
min_stewards: 2,
max_stewards: 4,
min_repair_depth: 1,
max_repair_depth: 4,
evidence_threshold: 0.8,
rollback_violation_threshold: 0.2,
..SafetyEnvelope::default()
},
max_temperature: CellTemperature::Warm,
max_retained_message_blocks: 8,
max_cold_witnesses: 2,
max_hot_witnesses: 4,
},
);
assert!(matches!(
decision,
CounterfactualPromotionDecision::Promote { .. }
));
}
#[test]
fn promotion_rejects_candidate_below_evidence_threshold() {
let (_, _, snap) = make_snapshot();
let fork = snap
.fork_rehearsal(RehearsalPolicy::default(), CellEpoch::new(10, 1))
.expect("fork");
let outcome = fork.replay(&snap.original_operation);
let comparison = RehearsalComparison::compare(&snap, outcome).expect("comparison");
let decision = comparison.evaluate_promotion(
&CounterfactualScore {
policy_gain: 0.2,
evidence_confidence: 0.9,
violation_rate: 0.1,
},
&CounterfactualScore {
policy_gain: 0.7,
evidence_confidence: 0.4,
violation_rate: 0.1,
},
&CounterfactualPromotionEnvelope {
reliability: SafetyEnvelope {
evidence_threshold: 0.8,
..SafetyEnvelope::default()
},
..Default::default()
},
);
assert_eq!(
decision,
CounterfactualPromotionDecision::RejectInsufficientEvidence {
observed: 0.4,
required: 0.8,
}
);
}
#[test]
fn promotion_rejects_candidate_outside_safety_envelope() {
let (_, _, snap) = make_snapshot();
let fork = snap
.fork_rehearsal(
RehearsalPolicy {
repair_override: Some(RepairPolicy {
recoverability_target: 6,
cold_witnesses: 5,
hot_witnesses: 7,
}),
data_capsule_override: Some(DataCapsule {
temperature: CellTemperature::Hot,
retained_message_blocks: 32,
}),
description: "aggressive unsafe branch".to_owned(),
..Default::default()
},
CellEpoch::new(10, 1),
)
.expect("fork");
let outcome = fork.replay(&snap.original_operation);
let comparison = RehearsalComparison::compare(&snap, outcome).expect("comparison");
let decision = comparison.evaluate_promotion(
&CounterfactualScore {
policy_gain: 0.2,
evidence_confidence: 0.9,
violation_rate: 0.1,
},
&CounterfactualScore {
policy_gain: 0.9,
evidence_confidence: 0.95,
violation_rate: 0.1,
},
&CounterfactualPromotionEnvelope {
reliability: SafetyEnvelope {
min_repair_depth: 1,
max_repair_depth: 4,
evidence_threshold: 0.8,
rollback_violation_threshold: 0.2,
..SafetyEnvelope::default()
},
max_temperature: CellTemperature::Warm,
max_retained_message_blocks: 8,
max_cold_witnesses: 2,
max_hot_witnesses: 4,
},
);
match decision {
CounterfactualPromotionDecision::RejectEnvelopeViolation { reasons } => {
assert!(reasons.iter().any(|r| r.contains("recoverability_target")));
assert!(reasons.iter().any(|r| r.contains("temperature")));
assert!(
reasons
.iter()
.any(|r| r.contains("retained_message_blocks"))
);
assert!(reasons.iter().any(|r| r.contains("cold_witnesses")));
assert!(reasons.iter().any(|r| r.contains("hot_witnesses")));
}
other => panic!("expected RejectEnvelopeViolation, got {other:?}"),
}
}
#[test]
fn promotion_rejects_candidate_with_nan_policy_gain() {
let (_, _, snap) = make_snapshot();
let fork = snap
.fork_rehearsal(RehearsalPolicy::default(), CellEpoch::new(10, 1))
.expect("fork");
let outcome = fork.replay(&MobilityOperation::Failover {
failed: NodeId::new("node-a"),
promote_to: NodeId::new("node-b"),
});
let comparison = RehearsalComparison::compare(&snap, outcome).expect("comparison");
let decision = comparison.evaluate_promotion(
&CounterfactualScore {
policy_gain: 0.2,
evidence_confidence: 0.9,
violation_rate: 0.1,
},
&CounterfactualScore {
policy_gain: f64::NAN,
evidence_confidence: 0.95,
violation_rate: 0.05,
},
&CounterfactualPromotionEnvelope::default(),
);
assert!(matches!(
decision,
CounterfactualPromotionDecision::RejectInvalidScore {
field: "policy_gain",
value,
} if value.is_nan()
));
}
#[test]
fn promotion_rejects_invalid_envelope_before_threshold_checks() {
let (_, _, snap) = make_snapshot();
let fork = snap
.fork_rehearsal(RehearsalPolicy::default(), CellEpoch::new(10, 1))
.expect("fork");
let outcome = fork.replay(&MobilityOperation::Failover {
failed: NodeId::new("node-a"),
promote_to: NodeId::new("node-b"),
});
let comparison = RehearsalComparison::compare(&snap, outcome).expect("comparison");
let decision = comparison.evaluate_promotion(
&CounterfactualScore {
policy_gain: 0.2,
evidence_confidence: 0.9,
violation_rate: 0.1,
},
&CounterfactualScore {
policy_gain: 0.8,
evidence_confidence: 0.95,
violation_rate: 0.05,
},
&CounterfactualPromotionEnvelope {
reliability: SafetyEnvelope {
evidence_threshold: f64::NAN,
..SafetyEnvelope::default()
},
..Default::default()
},
);
assert!(matches!(
decision,
CounterfactualPromotionDecision::RejectInvalidEnvelope {
error: ReliabilityControlError::InvalidProbability {
field: "evidence_threshold",
value,
},
} if value.is_nan()
));
}
#[test]
fn promotion_rejects_baseline_with_nan_policy_gain() {
let (_, _, snap) = make_snapshot();
let fork = snap
.fork_rehearsal(RehearsalPolicy::default(), CellEpoch::new(10, 1))
.expect("fork");
let outcome = fork.replay(&MobilityOperation::Failover {
failed: NodeId::new("node-a"),
promote_to: NodeId::new("node-b"),
});
let comparison = RehearsalComparison::compare(&snap, outcome).expect("comparison");
let decision = comparison.evaluate_promotion(
&CounterfactualScore {
policy_gain: f64::NAN,
evidence_confidence: 0.9,
violation_rate: 0.1,
},
&CounterfactualScore {
policy_gain: 0.8,
evidence_confidence: 0.95,
violation_rate: 0.05,
},
&CounterfactualPromotionEnvelope::default(),
);
assert!(matches!(
decision,
CounterfactualPromotionDecision::RejectInvalidScore {
field: "baseline_policy_gain",
value,
} if value.is_nan()
));
}
fn make_index_cert(
cell: &SubjectCell,
signer: &str,
obligations: &[u32],
time_secs: u64,
) -> CutCertificate {
cell.issue_cut_certificate(
obligations.iter().map(|&i| obligation(i)),
ConsumerStateDigest::new(0xaaaa),
Time::from_secs(time_secs),
NodeId::new(signer),
)
.expect("certificate")
}
fn make_regime(rev: u64, label: &str) -> PolicyRegime {
PolicyRegime {
policy_revision: rev,
label: label.to_owned(),
}
}
#[test]
fn index_cut_and_query_by_cell() {
let cell = test_cell();
let mut idx = CutLatticeIndex::with_defaults();
let cert = make_index_cert(&cell, "node-a", &[1, 2], 100);
let eid = idx.index_cut(
cert,
make_regime(1, "v1"),
CutAccessClass::Operator,
2,
0,
Time::from_secs(100),
);
assert_eq!(idx.len(), 1);
assert!(!idx.is_empty());
let results = idx.query(&CutIndexQuery {
cell_id: Some(cell.cell_id),
..Default::default()
});
assert_eq!(results.len(), 1);
assert_eq!(results[0].entry_id, eid);
}
#[test]
fn query_by_time_range() {
let cell = test_cell();
let mut idx = CutLatticeIndex::with_defaults();
for t in [100, 200, 300] {
let cert = make_index_cert(&cell, "node-a", &[1], t);
idx.index_cut(
cert,
make_regime(1, "v1"),
CutAccessClass::Operator,
1,
0,
Time::from_secs(t),
);
}
let results = idx.query(&CutIndexQuery {
after: Some(Time::from_secs(150)),
before: Some(Time::from_secs(250)),
..Default::default()
});
assert_eq!(results.len(), 1);
assert_eq!(results[0].certificate.timestamp, Time::from_secs(200));
}
#[test]
fn query_by_policy_label() {
let cell = test_cell();
let mut idx = CutLatticeIndex::with_defaults();
let cert1 = make_index_cert(&cell, "node-a", &[1], 100);
idx.index_cut(
cert1,
make_regime(1, "v1-strict"),
CutAccessClass::Operator,
1,
0,
Time::from_secs(100),
);
let cert2 = make_index_cert(&cell, "node-a", &[2], 200);
idx.index_cut(
cert2,
make_regime(2, "v2-relaxed"),
CutAccessClass::Operator,
1,
0,
Time::from_secs(200),
);
let results = idx.query(&CutIndexQuery {
policy_label: Some("v1-strict".to_owned()),
..Default::default()
});
assert_eq!(results.len(), 1);
assert_eq!(results[0].policy_regime.label, "v1-strict");
}
#[test]
fn query_with_obligation_filter() {
let cell = test_cell();
let mut idx = CutLatticeIndex::with_defaults();
let cert1 = make_index_cert(&cell, "node-a", &[5, 10], 100);
idx.index_cut(
cert1,
make_regime(1, "v1"),
CutAccessClass::Operator,
2,
0,
Time::from_secs(100),
);
let cert2 = make_index_cert(&cell, "node-a", &[20, 30], 200);
idx.index_cut(
cert2,
make_regime(1, "v1"),
CutAccessClass::Operator,
2,
0,
Time::from_secs(200),
);
let results = idx.query(&CutIndexQuery {
covers_obligation: Some(obligation(10)),
..Default::default()
});
assert_eq!(results.len(), 1);
assert!(results[0].certificate.covers_obligation(obligation(10)));
}
#[test]
fn query_respects_access_class_filter() {
let cell = test_cell();
let mut idx = CutLatticeIndex::with_defaults();
let cert1 = make_index_cert(&cell, "node-a", &[1], 100);
idx.index_cut(
cert1,
make_regime(1, "v1"),
CutAccessClass::Operator,
1,
0,
Time::from_secs(100),
);
let cert2 = make_index_cert(&cell, "node-a", &[2], 200);
idx.index_cut(
cert2,
make_regime(1, "v1"),
CutAccessClass::AuditOnly,
1,
0,
Time::from_secs(200),
);
let results = idx.query(&CutIndexQuery {
max_access_class: Some(CutAccessClass::Operator),
..Default::default()
});
assert_eq!(results.len(), 1);
assert_eq!(results[0].access_class, CutAccessClass::Operator);
}
#[test]
fn query_limit_truncates_results() {
let cell = test_cell();
let mut idx = CutLatticeIndex::with_defaults();
for t in [100, 200, 300, 400, 500] {
let cert = make_index_cert(&cell, "node-a", &[1], t);
idx.index_cut(
cert,
make_regime(1, "v1"),
CutAccessClass::Operator,
1,
0,
Time::from_secs(t),
);
}
let results = idx.query(&CutIndexQuery {
limit: 2,
..Default::default()
});
assert_eq!(results.len(), 2);
assert_eq!(results[0].certificate.timestamp, Time::from_secs(500));
assert_eq!(results[1].certificate.timestamp, Time::from_secs(400));
}
#[test]
fn latest_for_cell_returns_most_recent() {
let cell = test_cell();
let mut idx = CutLatticeIndex::with_defaults();
for t in [100, 200, 300] {
let cert = make_index_cert(&cell, "node-a", &[1], t);
idx.index_cut(
cert,
make_regime(1, "v1"),
CutAccessClass::Operator,
1,
0,
Time::from_secs(t),
);
}
let latest = idx.latest_for_cell(cell.cell_id).expect("latest");
assert_eq!(latest.certificate.timestamp, Time::from_secs(300));
}
#[test]
fn latest_for_policy_returns_most_recent() {
let cell = test_cell();
let mut idx = CutLatticeIndex::with_defaults();
let cert = make_index_cert(&cell, "node-a", &[1], 100);
idx.index_cut(
cert,
make_regime(1, "canary-policy"),
CutAccessClass::Operator,
1,
0,
Time::from_secs(100),
);
let result = idx.latest_for_policy("canary-policy").expect("found");
assert_eq!(result.policy_regime.label, "canary-policy");
assert!(idx.latest_for_policy("nonexistent").is_none());
}
#[test]
fn register_descendant_branch() {
let cell = test_cell();
let mut idx = CutLatticeIndex::with_defaults();
let cert = make_index_cert(&cell, "node-a", &[1], 100);
let eid = idx.index_cut(
cert,
make_regime(1, "v1"),
CutAccessClass::Operator,
1,
0,
Time::from_secs(100),
);
assert!(idx.register_descendant(eid, 0x1234));
assert!(idx.register_descendant(eid, 0x5678));
assert!(idx.register_descendant(eid, 0x1234));
let entry = idx.query(&CutIndexQuery::default());
assert_eq!(entry[0].descendant_branches, vec![0x1234, 0x5678]);
assert!(!idx.register_descendant(0xdead, 0x9999));
}
#[test]
fn compact_dematerializes_old_entries() {
let cell = test_cell();
let mut idx = CutLatticeIndex::new(CutRetentionPolicy {
max_materialized_per_cell: 2,
max_age_nanos: 500_000_000_000, min_retained_per_cell: 1,
});
for t in [100, 200, 300, 400] {
let cert = make_index_cert(&cell, "node-a", &[1], t);
idx.index_cut(
cert,
make_regime(1, "v1"),
CutAccessClass::Operator,
1,
0,
Time::from_secs(t),
);
}
assert_eq!(idx.len(), 4);
let compacted = idx.compact(Time::from_secs(450));
assert_eq!(compacted, 2);
let materialized = idx.query(&CutIndexQuery {
materialized_only: true,
..Default::default()
});
assert_eq!(materialized.len(), 2);
}
#[test]
fn compact_respects_min_retained() {
let cell = test_cell();
let mut idx = CutLatticeIndex::new(CutRetentionPolicy {
max_materialized_per_cell: 1,
max_age_nanos: 1, min_retained_per_cell: 2,
});
for t in [100, 200, 300] {
let cert = make_index_cert(&cell, "node-a", &[1], t);
idx.index_cut(
cert,
make_regime(1, "v1"),
CutAccessClass::Operator,
1,
0,
Time::from_secs(t),
);
}
let compacted = idx.compact(Time::from_secs(10_000));
assert_eq!(compacted, 1);
let all = idx.query(&CutIndexQuery::default());
let non_compacted: Vec<_> = all
.iter()
.filter(|e| e.materialization != MaterializationState::Compacted)
.collect();
assert_eq!(non_compacted.len(), 2);
}
#[test]
fn materialize_reconstructible_entry() {
let cell = test_cell();
let mut idx = CutLatticeIndex::new(CutRetentionPolicy {
max_materialized_per_cell: 1,
max_age_nanos: u64::MAX,
min_retained_per_cell: 3,
});
for t in [100, 200, 300] {
let cert = make_index_cert(&cell, "node-a", &[1], t);
idx.index_cut(
cert,
make_regime(1, "v1"),
CutAccessClass::Operator,
1,
0,
Time::from_secs(t),
);
}
idx.compact(Time::from_secs(400));
let all = idx.query(&CutIndexQuery::default());
let recon = all
.iter()
.find(|e| e.materialization == MaterializationState::Reconstructible);
if let Some(entry) = recon {
let eid = entry.entry_id;
assert!(idx.materialize(eid));
let refreshed = idx.query(&CutIndexQuery {
materialized_only: true,
..Default::default()
});
assert!(refreshed.iter().any(|e| e.entry_id == eid));
}
}
#[test]
fn entry_digest_is_deterministic() {
let cell = test_cell();
let mut idx = CutLatticeIndex::with_defaults();
let cert1 = make_index_cert(&cell, "node-a", &[1, 2], 100);
let cert2 = make_index_cert(&cell, "node-a", &[1, 2], 100);
let eid1 = idx.index_cut(
cert1,
make_regime(1, "v1"),
CutAccessClass::Operator,
2,
0,
Time::from_secs(100),
);
let eid2 = idx.index_cut(
cert2,
make_regime(1, "v1"),
CutAccessClass::Operator,
2,
0,
Time::from_secs(100),
);
assert_eq!(eid1, eid2);
assert_eq!(idx.len(), 1);
}
#[test]
fn entry_digest_deterministic_across_indexes() {
let cell = test_cell();
let mut idx1 = CutLatticeIndex::with_defaults();
let mut idx2 = CutLatticeIndex::with_defaults();
let cert1 = make_index_cert(&cell, "node-a", &[1, 2], 100);
let cert2 = make_index_cert(&cell, "node-a", &[1, 2], 100);
idx1.index_cut(
cert1,
make_regime(1, "v1"),
CutAccessClass::Operator,
2,
0,
Time::from_secs(100),
);
idx2.index_cut(
cert2,
make_regime(1, "v1"),
CutAccessClass::Operator,
2,
0,
Time::from_secs(100),
);
let e1 = &idx1.query(&CutIndexQuery::default())[0];
let e2 = &idx2.query(&CutIndexQuery::default())[0];
assert_eq!(e1.entry_digest(), e2.entry_digest());
}
#[test]
fn empty_index_queries_return_none() {
let idx = CutLatticeIndex::with_defaults();
assert!(idx.is_empty());
assert_eq!(idx.len(), 0);
assert!(
idx.latest_for_cell(CellId::for_partition(
CellEpoch::new(1, 1),
&SubjectPattern::parse("x").expect("pat")
))
.is_none()
);
assert!(idx.latest_for_policy("any").is_none());
}
fn make_index_with_entry() -> (SubjectCell, CutLatticeIndex, u64) {
let cell = test_cell();
let mut idx = CutLatticeIndex::with_defaults();
let cert = make_index_cert(&cell, "node-a", &[1, 2], 100);
let eid = idx.index_cut(
cert,
make_regime(1, "v1"),
CutAccessClass::Operator,
2,
0,
Time::from_secs(100),
);
(cell, idx, eid)
}
#[test]
fn branch_type_default_mutability() {
assert!(BranchType::Live.default_mutable());
assert!(BranchType::Canary.default_mutable());
assert!(!BranchType::Lagged.default_mutable());
assert!(!BranchType::Replayed.default_mutable());
assert!(!BranchType::Forensic.default_mutable());
}
#[test]
fn branch_type_fencing() {
assert!(!BranchType::Live.is_fenced());
assert!(BranchType::Lagged.is_fenced());
assert!(BranchType::Replayed.is_fenced());
assert!(BranchType::Canary.is_fenced());
assert!(BranchType::Forensic.is_fenced());
}
#[test]
fn access_policy_write_semantics() {
assert!(!BranchAccessPolicy::ReadOnly.allows_writes());
assert!(BranchAccessPolicy::Sandboxed.allows_writes());
assert!(BranchAccessPolicy::ReadWrite.allows_writes());
assert!(!BranchAccessPolicy::ReadOnly.propagates_to_live());
assert!(!BranchAccessPolicy::Sandboxed.propagates_to_live());
assert!(BranchAccessPolicy::ReadWrite.propagates_to_live());
}
#[test]
fn create_live_branch() {
let cell = test_cell();
let mut reg = BranchRegistry::new();
let branch = reg
.create_live(cell.cell_id, Time::from_secs(1))
.expect("live");
assert_eq!(branch.branch_type, BranchType::Live);
assert_eq!(branch.access_policy, BranchAccessPolicy::ReadWrite);
assert_eq!(branch.cell_id, cell.cell_id);
assert_eq!(reg.len(), 1);
}
#[test]
fn create_lagged_branch() {
let (cell, idx, eid) = make_index_with_entry();
let mut reg = BranchRegistry::new();
let branch = reg
.create_lagged(eid, cell.cell_id, "trailing live", Time::from_secs(2), &idx)
.expect("lagged");
assert_eq!(branch.branch_type, BranchType::Lagged);
assert_eq!(branch.access_policy, BranchAccessPolicy::ReadOnly);
assert_eq!(branch.cut_entry_id, eid);
}
#[test]
fn create_replayed_branch() {
let (cell, idx, eid) = make_index_with_entry();
let mut reg = BranchRegistry::new();
let branch = reg
.create_replayed(
eid,
cell.cell_id,
"v2-strict",
"replay under strict policy",
Time::from_secs(3),
&idx,
)
.expect("replayed");
assert_eq!(branch.branch_type, BranchType::Replayed);
assert_eq!(branch.policy_label, "v2-strict");
assert_eq!(branch.access_policy, BranchAccessPolicy::ReadOnly);
}
#[test]
fn create_canary_branch_is_sandboxed() {
let (cell, idx, eid) = make_index_with_entry();
let mut reg = BranchRegistry::new();
let branch = reg
.create_canary(
eid,
cell.cell_id,
"candidate-policy",
"canary evaluation",
Time::from_secs(4),
&idx,
)
.expect("canary");
assert_eq!(branch.branch_type, BranchType::Canary);
assert_eq!(branch.access_policy, BranchAccessPolicy::Sandboxed);
assert!(branch.access_policy.allows_writes());
assert!(!branch.access_policy.propagates_to_live());
}
#[test]
fn create_forensic_branch() {
let (cell, idx, eid) = make_index_with_entry();
let mut reg = BranchRegistry::new();
let branch = reg
.create_forensic(
eid,
cell.cell_id,
"explain ticket-1234",
Time::from_secs(5),
&idx,
)
.expect("forensic");
assert_eq!(branch.branch_type, BranchType::Forensic);
assert_eq!(branch.access_policy, BranchAccessPolicy::ReadOnly);
}
#[test]
fn create_branch_rejects_missing_cut() {
let (cell, idx, _eid) = make_index_with_entry();
let mut reg = BranchRegistry::new();
let err = reg
.create_lagged(0xdead, cell.cell_id, "orphan", Time::from_secs(6), &idx)
.expect_err("should reject missing cut");
assert!(matches!(err, BranchError::CutNotFound { .. }));
}
#[test]
fn create_duplicate_branch_rejected() {
let cell = test_cell();
let mut reg = BranchRegistry::new();
reg.create_live(cell.cell_id, Time::from_secs(1))
.expect("first");
let err = reg
.create_live(cell.cell_id, Time::from_secs(2))
.expect_err("duplicate");
assert!(matches!(err, BranchError::DuplicateBranch { .. }));
}
#[test]
fn lookup_by_id() {
let (cell, idx, eid) = make_index_with_entry();
let mut reg = BranchRegistry::new();
let branch = reg
.create_forensic(eid, cell.cell_id, "lookup test", Time::from_secs(7), &idx)
.expect("forensic");
let bid = branch.branch_id;
assert!(reg.get(bid).is_some());
assert!(reg.get(0xffff).is_none());
}
#[test]
fn live_for_cell() {
let cell = test_cell();
let mut reg = BranchRegistry::new();
assert!(reg.live_for_cell(cell.cell_id).is_none());
reg.create_live(cell.cell_id, Time::from_secs(1))
.expect("live");
assert!(reg.live_for_cell(cell.cell_id).is_some());
}
#[test]
fn branches_for_cell_lists_all() {
let (cell, idx, eid) = make_index_with_entry();
let mut reg = BranchRegistry::new();
reg.create_live(cell.cell_id, Time::from_secs(1))
.expect("live");
reg.create_forensic(eid, cell.cell_id, "forensic", Time::from_secs(2), &idx)
.expect("forensic");
reg.create_canary(
eid,
cell.cell_id,
"canary-pol",
"canary",
Time::from_secs(3),
&idx,
)
.expect("canary");
assert_eq!(reg.branches_for_cell(cell.cell_id).len(), 3);
}
#[test]
fn branches_of_type() {
let (cell, idx, eid) = make_index_with_entry();
let mut reg = BranchRegistry::new();
reg.create_live(cell.cell_id, Time::from_secs(1))
.expect("live");
reg.create_forensic(eid, cell.cell_id, "f1", Time::from_secs(2), &idx)
.expect("f1");
assert_eq!(reg.branches_of_type(BranchType::Live).len(), 1);
assert_eq!(reg.branches_of_type(BranchType::Forensic).len(), 1);
assert_eq!(reg.branches_of_type(BranchType::Canary).len(), 0);
}
#[test]
fn remove_branch() {
let cell = test_cell();
let mut reg = BranchRegistry::new();
let branch = reg
.create_live(cell.cell_id, Time::from_secs(1))
.expect("live");
let bid = branch.branch_id;
assert!(reg.remove(bid));
assert!(reg.is_empty());
assert!(!reg.remove(bid)); }
#[test]
fn branch_address_digest_is_deterministic() {
let (cell, idx, eid) = make_index_with_entry();
let mut reg1 = BranchRegistry::new();
let mut reg2 = BranchRegistry::new();
let b1 = reg1
.create_canary(eid, cell.cell_id, "pol", "test", Time::from_secs(1), &idx)
.expect("b1");
let b2 = reg2
.create_canary(eid, cell.cell_id, "pol", "test", Time::from_secs(2), &idx)
.expect("b2");
assert_eq!(b1.address_digest(), b2.address_digest());
}
#[test]
fn empty_registry() {
let reg = BranchRegistry::new();
assert!(reg.is_empty());
assert_eq!(reg.len(), 0);
}
}