use super::control::{ControlBudget, SystemSubjectFamily};
use super::morphism::{
FabricCapability, Morphism, MorphismClass, MorphismEvaluationError, MorphismValidationError,
};
use super::subject::{
NamespaceComponent, NamespaceKernel, NamespaceKernelError, Subject, SubjectPattern,
SubjectPatternError,
};
use crate::distributed::{RegionBridge, RegionSnapshot, SnapshotError};
use crate::remote::NodeId;
use crate::supervision::{RestartConfig, SupervisionStrategy};
use crate::types::Time;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::mem;
use std::time::Duration;
use thiserror::Error;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MorphismConstraints {
pub allowed_classes: BTreeSet<MorphismClass>,
pub max_expansion_factor: u16,
pub max_fanout: u16,
}
impl Default for MorphismConstraints {
fn default() -> Self {
Self {
allowed_classes: [MorphismClass::DerivedView, MorphismClass::Egress]
.into_iter()
.collect(),
max_expansion_factor: 4,
max_fanout: 8,
}
}
}
impl MorphismConstraints {
fn validate(&self) -> Result<(), FederationError> {
if self.allowed_classes.is_empty() {
return Err(FederationError::EmptyAllowedMorphismClasses);
}
if self.max_expansion_factor == 0 {
return Err(FederationError::ZeroMaxExpansionFactor);
}
if self.max_fanout == 0 {
return Err(FederationError::ZeroMaxFanout);
}
Ok(())
}
fn admits(&self, morphism: &Morphism) -> Result<(), FederationError> {
if !self.allowed_classes.contains(&morphism.class) {
return Err(FederationError::LeafMorphismClassNotAllowed {
class: morphism.class,
});
}
if morphism.quota_policy.max_expansion_factor > self.max_expansion_factor {
return Err(FederationError::LeafExpansionFactorExceeded {
actual: morphism.quota_policy.max_expansion_factor,
max: self.max_expansion_factor,
});
}
if morphism.quota_policy.max_fanout > self.max_fanout {
return Err(FederationError::LeafFanoutExceeded {
actual: morphism.quota_policy.max_fanout,
max: self.max_fanout,
});
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LeafConfig {
pub max_reconnect_backoff: Duration,
pub offline_buffer_limit: u64,
pub morphism_constraints: MorphismConstraints,
}
impl Default for LeafConfig {
fn default() -> Self {
Self {
max_reconnect_backoff: Duration::from_secs(30),
offline_buffer_limit: 1_024,
morphism_constraints: MorphismConstraints::default(),
}
}
}
impl LeafConfig {
fn validate(&self) -> Result<(), FederationError> {
if self.max_reconnect_backoff.is_zero() {
return Err(FederationError::ZeroDuration {
field: "role.leaf_fabric.max_reconnect_backoff".to_owned(),
});
}
if self.offline_buffer_limit == 0 {
return Err(FederationError::ZeroOfflineBufferLimit);
}
self.morphism_constraints.validate()
}
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize,
)]
#[serde(rename_all = "snake_case")]
pub enum InterestPropagationPolicy {
ExplicitSubscriptions,
PrefixAnnouncements,
#[default]
DemandDriven,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct GatewayConfig {
pub interest_propagation_policy: InterestPropagationPolicy,
pub amplification_limit: u16,
pub convergence_timeout: Duration,
}
impl Default for GatewayConfig {
fn default() -> Self {
Self {
interest_propagation_policy: InterestPropagationPolicy::default(),
amplification_limit: 16,
convergence_timeout: Duration::from_secs(15),
}
}
}
impl GatewayConfig {
fn validate(&self) -> Result<(), FederationError> {
if self.amplification_limit == 0 {
return Err(FederationError::ZeroAmplificationLimit);
}
if self.convergence_timeout.is_zero() {
return Err(FederationError::ZeroDuration {
field: "role.gateway_fabric.convergence_timeout".to_owned(),
});
}
Ok(())
}
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize,
)]
#[serde(rename_all = "snake_case")]
pub enum OrderingGuarantee {
#[default]
PerSubject,
SnapshotConsistent,
CheckpointBounded,
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize,
)]
#[serde(rename_all = "snake_case")]
pub enum CatchUpPolicy {
SnapshotRequired,
#[default]
SnapshotThenDelta,
LogOnly,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ReplicationConfig {
pub ordering_guarantee: OrderingGuarantee,
pub snapshot_interval: Duration,
pub catch_up_policy: CatchUpPolicy,
}
impl Default for ReplicationConfig {
fn default() -> Self {
Self {
ordering_guarantee: OrderingGuarantee::default(),
snapshot_interval: Duration::from_secs(60),
catch_up_policy: CatchUpPolicy::default(),
}
}
}
impl ReplicationConfig {
fn validate(&self) -> Result<(), FederationError> {
if self.snapshot_interval.is_zero() {
return Err(FederationError::ZeroDuration {
field: "role.replication_link.snapshot_interval".to_owned(),
});
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum TraceRetention {
LatestArtifacts {
max_artifacts: u32,
},
DurationWindow {
retention: Duration,
},
UntilAcknowledged,
}
impl Default for TraceRetention {
fn default() -> Self {
Self::LatestArtifacts { max_artifacts: 128 }
}
}
impl TraceRetention {
fn validate(&self) -> Result<(), FederationError> {
match self {
Self::LatestArtifacts { max_artifacts } if *max_artifacts == 0 => {
Err(FederationError::ZeroTraceArtifactLimit)
}
Self::DurationWindow { retention } if retention.is_zero() => {
Err(FederationError::ZeroDuration {
field: "role.edge_replay_link.trace_retention.retention".to_owned(),
})
}
Self::LatestArtifacts { .. }
| Self::DurationWindow { .. }
| Self::UntilAcknowledged => Ok(()),
}
}
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize,
)]
#[serde(rename_all = "snake_case")]
pub enum EvidenceShippingPolicy {
#[default]
OnReconnect,
PeriodicBatch,
ContinuousMirror,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct EdgeReplayConfig {
pub trace_retention: TraceRetention,
pub evidence_shipping_policy: EvidenceShippingPolicy,
pub reconnection_replay_depth: u32,
}
impl Default for EdgeReplayConfig {
fn default() -> Self {
Self {
trace_retention: TraceRetention::default(),
evidence_shipping_policy: EvidenceShippingPolicy::default(),
reconnection_replay_depth: 256,
}
}
}
impl EdgeReplayConfig {
fn validate(&self) -> Result<(), FederationError> {
self.trace_retention.validate()?;
if self.reconnection_replay_depth == 0 {
return Err(FederationError::ZeroReplayDepth);
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", content = "config", rename_all = "snake_case")]
pub enum FederationRole {
LeafFabric(LeafConfig),
GatewayFabric(GatewayConfig),
ReplicationLink(ReplicationConfig),
EdgeReplayLink(EdgeReplayConfig),
}
impl FederationRole {
#[must_use]
pub const fn name(&self) -> &'static str {
match self {
Self::LeafFabric(_) => "leaf_fabric",
Self::GatewayFabric(_) => "gateway_fabric",
Self::ReplicationLink(_) => "replication_link",
Self::EdgeReplayLink(_) => "edge_replay_link",
}
}
pub fn validate(&self) -> Result<(), FederationError> {
match self {
Self::LeafFabric(config) => config.validate(),
Self::GatewayFabric(config) => config.validate(),
Self::ReplicationLink(config) => config.validate(),
Self::EdgeReplayLink(config) => config.validate(),
}
}
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize,
)]
#[serde(rename_all = "snake_case")]
pub enum FederationBridgeState {
#[default]
Provisioning,
Active,
Degraded,
Closed,
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize,
)]
#[serde(rename_all = "snake_case")]
pub enum FederationDirection {
#[default]
LocalToRemote,
RemoteToLocal,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct BufferedLeafRoute {
pub direction: FederationDirection,
pub subject: SubjectPattern,
pub fanout: u16,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "state", rename_all = "snake_case")]
pub enum LeafRouteDisposition {
Forwarded {
route: BufferedLeafRoute,
},
Buffered {
route: BufferedLeafRoute,
buffered_entries: usize,
dropped_entries: u64,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LeafBufferDrain {
pub routes: Vec<BufferedLeafRoute>,
pub dropped_entries: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GatewayInterestPlan {
pub family: SystemSubjectFamily,
pub pattern: SubjectPattern,
pub requested_amplification: u16,
pub admitted_amplification: u16,
pub budget: ControlBudget,
pub policy: InterestPropagationPolicy,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct GatewayInterestRecord {
pub family: SystemSubjectFamily,
pub pattern: SubjectPattern,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GatewayAdvisoryRecord {
pub family: SystemSubjectFamily,
pub pattern: SubjectPattern,
pub budget: ControlBudget,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GatewayConvergenceRecord {
pub elapsed: Duration,
pub timed_out: bool,
pub propagated_interest_count: usize,
pub forwarded_advisory_count: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ReplicationTransfer {
pub sequence: u64,
pub ordering_guarantee: OrderingGuarantee,
pub snapshot_hash: u64,
pub snapshot_bytes: Vec<u8>,
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize,
)]
#[serde(rename_all = "snake_case")]
pub enum ReplicationCatchUpAction {
#[default]
AlreadyConverged,
Snapshot,
SnapshotThenDelta,
DeltaOnly,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ReplicationCatchUpPlan {
pub policy: CatchUpPolicy,
pub action: ReplicationCatchUpAction,
pub local_sequence: u64,
pub remote_sequence: u64,
pub lag: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplayArtifactRecord {
pub artifact_id: String,
pub family: SystemSubjectFamily,
pub captured_at: Duration,
pub sequence: u64,
pub bytes: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplayShippingPlan {
pub policy: EvidenceShippingPolicy,
pub artifacts: Vec<ReplayArtifactRecord>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FederationBridgeRuntime {
Leaf(LeafBridgeRuntime),
Gateway(GatewayBridgeRuntime),
Replication(ReplicationBridgeRuntime),
EdgeReplay(EdgeReplayBridgeRuntime),
}
impl FederationBridgeRuntime {
fn for_role(role: &FederationRole) -> Self {
match role {
FederationRole::LeafFabric(_) => Self::Leaf(LeafBridgeRuntime::default()),
FederationRole::GatewayFabric(_) => Self::Gateway(GatewayBridgeRuntime::default()),
FederationRole::ReplicationLink(_) => {
Self::Replication(ReplicationBridgeRuntime::default())
}
FederationRole::EdgeReplayLink(_) => {
Self::EdgeReplay(EdgeReplayBridgeRuntime::default())
}
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct LeafBridgeRuntime {
pub buffered_routes: VecDeque<BufferedLeafRoute>,
pub dropped_routes: u64,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct GatewayBridgeRuntime {
pub propagated_interests: BTreeSet<GatewayInterestRecord>,
pub forwarded_advisories: Vec<GatewayAdvisoryRecord>,
pub last_convergence: Option<GatewayConvergenceRecord>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ReplicationBridgeRuntime {
pub last_exported_sequence: Option<u64>,
pub last_applied_sequence: Option<u64>,
pub last_catch_up: Option<ReplicationCatchUpPlan>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct EdgeReplayBridgeRuntime {
pub retained_artifacts: Vec<ReplayArtifactRecord>,
pub shipped_batches: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FederationBridge {
pub role: FederationRole,
pub local_morphisms: Vec<Morphism>,
pub remote_morphisms: Vec<Morphism>,
pub capability_scope: BTreeSet<FabricCapability>,
pub state: FederationBridgeState,
#[serde(skip, default)]
runtime: Option<FederationBridgeRuntime>,
}
impl PartialEq for FederationBridge {
fn eq(&self, other: &Self) -> bool {
self.role == other.role
&& self.local_morphisms == other.local_morphisms
&& self.remote_morphisms == other.remote_morphisms
&& self.capability_scope == other.capability_scope
&& self.state == other.state
}
}
impl FederationBridge {
pub fn new<I>(
role: FederationRole,
local_morphisms: Vec<Morphism>,
remote_morphisms: Vec<Morphism>,
capability_scope: I,
) -> Result<Self, FederationError>
where
I: IntoIterator<Item = FabricCapability>,
{
role.validate()?;
let capability_scope = capability_scope.into_iter().collect::<BTreeSet<_>>();
let runtime = FederationBridgeRuntime::for_role(&role);
if capability_scope.is_empty() {
return Err(FederationError::EmptyCapabilityScope);
}
if local_morphisms.is_empty() && remote_morphisms.is_empty() {
return Err(FederationError::EmptyMorphismSet);
}
for morphism in local_morphisms.iter().chain(remote_morphisms.iter()) {
morphism.validate()?;
ensure_capability_scope(&capability_scope, morphism)?;
}
match &role {
FederationRole::LeafFabric(config) => {
for morphism in local_morphisms.iter().chain(remote_morphisms.iter()) {
config.morphism_constraints.admits(morphism)?;
}
}
FederationRole::GatewayFabric(config) => {
for morphism in local_morphisms.iter().chain(remote_morphisms.iter()) {
if morphism.quota_policy.max_fanout > config.amplification_limit {
return Err(FederationError::GatewayAmplificationExceeded {
actual: morphism.quota_policy.max_fanout,
max: config.amplification_limit,
});
}
}
}
FederationRole::ReplicationLink(_) => {}
FederationRole::EdgeReplayLink(_) => {
if !capability_scope.contains(&FabricCapability::ObserveEvidence) {
return Err(FederationError::EdgeReplayRequiresObserveEvidence);
}
}
}
Ok(Self {
role,
local_morphisms,
remote_morphisms,
capability_scope,
state: FederationBridgeState::Provisioning,
runtime: Some(runtime),
})
}
pub fn activate(&mut self) -> Result<(), FederationError> {
if self.state == FederationBridgeState::Closed {
return Err(FederationError::CannotActivateClosedBridge);
}
self.state = FederationBridgeState::Active;
Ok(())
}
pub fn mark_degraded(&mut self) -> Result<(), FederationError> {
if self.state == FederationBridgeState::Closed {
return Err(FederationError::CannotDegradeClosedBridge);
}
self.state = FederationBridgeState::Degraded;
Ok(())
}
pub fn close(&mut self) {
self.state = FederationBridgeState::Closed;
}
#[must_use]
pub fn runtime(&self) -> FederationBridgeRuntime {
self.runtime
.clone()
.unwrap_or_else(|| FederationBridgeRuntime::for_role(&self.role))
}
pub fn queue_leaf_route(
&mut self,
direction: FederationDirection,
subject: SubjectPattern,
fanout: u16,
) -> Result<LeafRouteDisposition, FederationError> {
let config = self.leaf_config("queue_leaf_route")?.clone();
self.ensure_not_closed("queue_leaf_route")?;
if fanout > config.morphism_constraints.max_fanout {
return Err(FederationError::LeafFanoutExceeded {
actual: fanout,
max: config.morphism_constraints.max_fanout,
});
}
let route = BufferedLeafRoute {
direction,
subject,
fanout,
};
if self.state == FederationBridgeState::Active {
return Ok(LeafRouteDisposition::Forwarded { route });
}
let buffer_limit = usize::try_from(config.offline_buffer_limit).unwrap_or(usize::MAX);
let runtime = self.leaf_runtime_mut("queue_leaf_route")?;
if runtime.buffered_routes.len() == buffer_limit {
runtime.buffered_routes.pop_front();
runtime.dropped_routes = runtime.dropped_routes.saturating_add(1);
}
runtime.buffered_routes.push_back(route.clone());
Ok(LeafRouteDisposition::Buffered {
route,
buffered_entries: runtime.buffered_routes.len(),
dropped_entries: runtime.dropped_routes,
})
}
pub fn drain_leaf_buffer(&mut self) -> Result<LeafBufferDrain, FederationError> {
self.leaf_config("drain_leaf_buffer")?;
self.ensure_active_state("drain_leaf_buffer")?;
let runtime = self.leaf_runtime_mut("drain_leaf_buffer")?;
let routes = mem::take(&mut runtime.buffered_routes)
.into_iter()
.collect();
let dropped_entries = mem::take(&mut runtime.dropped_routes);
Ok(LeafBufferDrain {
routes,
dropped_entries,
})
}
pub fn plan_gateway_interest(
&mut self,
family: SystemSubjectFamily,
pattern: SubjectPattern,
requested_amplification: u16,
budget: ControlBudget,
) -> Result<GatewayInterestPlan, FederationError> {
let config = self.gateway_config("plan_gateway_interest")?.clone();
self.ensure_operational_state("plan_gateway_interest")?;
let budget_limit = u16::try_from(budget.poll_quota).unwrap_or(u16::MAX);
let effective_limit = config.amplification_limit.min(budget_limit);
if requested_amplification > effective_limit {
return Err(FederationError::GatewayAmplificationExceeded {
actual: requested_amplification,
max: effective_limit,
});
}
let plan = GatewayInterestPlan {
family,
pattern: pattern.clone(),
requested_amplification,
admitted_amplification: requested_amplification,
budget,
policy: config.interest_propagation_policy,
};
self.gateway_runtime_mut("plan_gateway_interest")?
.propagated_interests
.insert(GatewayInterestRecord { family, pattern });
Ok(plan)
}
pub fn forward_gateway_advisory(
&mut self,
family: SystemSubjectFamily,
pattern: SubjectPattern,
budget: ControlBudget,
) -> Result<GatewayAdvisoryRecord, FederationError> {
self.gateway_config("forward_gateway_advisory")?;
self.ensure_operational_state("forward_gateway_advisory")?;
let record = GatewayAdvisoryRecord {
family,
pattern,
budget,
};
self.gateway_runtime_mut("forward_gateway_advisory")?
.forwarded_advisories
.push(record.clone());
Ok(record)
}
pub fn reconcile_gateway_convergence(
&mut self,
elapsed: Duration,
) -> Result<GatewayConvergenceRecord, FederationError> {
let config = self
.gateway_config("reconcile_gateway_convergence")?
.clone();
self.ensure_operational_state("reconcile_gateway_convergence")?;
let timed_out = elapsed > config.convergence_timeout;
if timed_out {
self.mark_degraded()?;
}
let runtime = self.gateway_runtime_mut("reconcile_gateway_convergence")?;
let record = GatewayConvergenceRecord {
elapsed,
timed_out,
propagated_interest_count: runtime.propagated_interests.len(),
forwarded_advisory_count: runtime.forwarded_advisories.len(),
};
runtime.last_convergence = Some(record.clone());
Ok(record)
}
pub fn export_replication_transfer(
&mut self,
bridge: &mut RegionBridge,
now: Time,
) -> Result<ReplicationTransfer, FederationError> {
let config = self
.replication_config("export_replication_transfer")?
.clone();
self.ensure_not_closed("export_replication_transfer")?;
let snapshot = bridge.create_snapshot(now);
let transfer = ReplicationTransfer {
sequence: snapshot.sequence,
ordering_guarantee: config.ordering_guarantee,
snapshot_hash: snapshot.content_hash(),
snapshot_bytes: snapshot.to_bytes(),
};
self.replication_runtime_mut("export_replication_transfer")?
.last_exported_sequence = Some(transfer.sequence);
Ok(transfer)
}
pub fn plan_replication_catch_up(
&mut self,
local_sequence: u64,
remote_sequence: u64,
) -> Result<ReplicationCatchUpPlan, FederationError> {
let config = self
.replication_config("plan_replication_catch_up")?
.clone();
self.ensure_not_closed("plan_replication_catch_up")?;
if remote_sequence > local_sequence {
return Err(FederationError::ReplicationCatchUpRemoteAhead {
local_sequence,
remote_sequence,
});
}
let lag = local_sequence.saturating_sub(remote_sequence);
let action = if lag == 0 {
ReplicationCatchUpAction::AlreadyConverged
} else {
match config.catch_up_policy {
CatchUpPolicy::SnapshotRequired => ReplicationCatchUpAction::Snapshot,
CatchUpPolicy::SnapshotThenDelta => {
if remote_sequence == 0 || lag > 1 {
ReplicationCatchUpAction::SnapshotThenDelta
} else {
ReplicationCatchUpAction::DeltaOnly
}
}
CatchUpPolicy::LogOnly => ReplicationCatchUpAction::DeltaOnly,
}
};
let plan = ReplicationCatchUpPlan {
policy: config.catch_up_policy,
action,
local_sequence,
remote_sequence,
lag,
};
self.replication_runtime_mut("plan_replication_catch_up")?
.last_catch_up = Some(plan.clone());
Ok(plan)
}
pub fn apply_replication_transfer(
&mut self,
bridge: &mut RegionBridge,
transfer: &ReplicationTransfer,
) -> Result<RegionSnapshot, FederationError> {
self.replication_config("apply_replication_transfer")?;
self.ensure_not_closed("apply_replication_transfer")?;
let snapshot = RegionSnapshot::from_bytes(&transfer.snapshot_bytes)?;
if snapshot.sequence != transfer.sequence {
return Err(FederationError::ReplicationTransferSequenceMismatch {
expected: transfer.sequence,
actual: snapshot.sequence,
});
}
let actual_hash = snapshot.content_hash();
if actual_hash != transfer.snapshot_hash {
return Err(FederationError::ReplicationTransferHashMismatch {
expected: transfer.snapshot_hash,
actual: actual_hash,
});
}
bridge.apply_snapshot(&snapshot).map_err(|error| {
FederationError::DistributedBridgeOperationFailed {
operation: "apply_snapshot".to_owned(),
message: error.to_string(),
}
})?;
self.replication_runtime_mut("apply_replication_transfer")?
.last_applied_sequence = Some(snapshot.sequence);
Ok(snapshot)
}
pub fn retain_replay_artifact(
&mut self,
artifact: ReplayArtifactRecord,
) -> Result<(), FederationError> {
let config = self.edge_replay_config("retain_replay_artifact")?.clone();
self.ensure_not_closed("retain_replay_artifact")?;
let runtime = self.edge_replay_runtime_mut("retain_replay_artifact")?;
runtime.retained_artifacts.push(artifact);
trim_replay_artifacts(runtime, &config);
Ok(())
}
pub fn acknowledge_replay_artifact(
&mut self,
artifact_id: &str,
) -> Result<bool, FederationError> {
let config = self
.edge_replay_config("acknowledge_replay_artifact")?
.clone();
self.ensure_not_closed("acknowledge_replay_artifact")?;
if !matches!(config.trace_retention, TraceRetention::UntilAcknowledged) {
return Ok(false);
}
let runtime = self.edge_replay_runtime_mut("acknowledge_replay_artifact")?;
let before = runtime.retained_artifacts.len();
runtime
.retained_artifacts
.retain(|artifact| artifact.artifact_id != artifact_id);
Ok(runtime.retained_artifacts.len() != before)
}
pub fn plan_replay_shipping(&mut self) -> Result<ReplayShippingPlan, FederationError> {
let config = self.edge_replay_config("plan_replay_shipping")?.clone();
self.ensure_not_closed("plan_replay_shipping")?;
let state = self.state;
let runtime = self.edge_replay_runtime_mut("plan_replay_shipping")?;
let artifacts = match config.evidence_shipping_policy {
EvidenceShippingPolicy::OnReconnect => {
if state == FederationBridgeState::Active {
runtime.retained_artifacts.clone()
} else {
Vec::new()
}
}
EvidenceShippingPolicy::PeriodicBatch => {
let batch_size = usize::min(runtime.retained_artifacts.len(), 32);
runtime.retained_artifacts
[runtime.retained_artifacts.len().saturating_sub(batch_size)..]
.to_vec()
}
EvidenceShippingPolicy::ContinuousMirror => runtime
.retained_artifacts
.last()
.cloned()
.into_iter()
.collect(),
};
if !artifacts.is_empty() {
runtime.shipped_batches = runtime.shipped_batches.saturating_add(1);
}
Ok(ReplayShippingPlan {
policy: config.evidence_shipping_policy,
artifacts,
})
}
fn ensure_not_closed(&self, operation: &'static str) -> Result<(), FederationError> {
if self.state == FederationBridgeState::Closed {
return Err(FederationError::BridgeNotOperational {
operation,
state: self.state,
});
}
Ok(())
}
fn ensure_operational_state(&self, operation: &'static str) -> Result<(), FederationError> {
match self.state {
FederationBridgeState::Active | FederationBridgeState::Degraded => Ok(()),
state => Err(FederationError::BridgeNotOperational { operation, state }),
}
}
fn ensure_active_state(&self, operation: &'static str) -> Result<(), FederationError> {
if self.state != FederationBridgeState::Active {
return Err(FederationError::BridgeNotOperational {
operation,
state: self.state,
});
}
Ok(())
}
fn runtime_mut(&mut self) -> &mut FederationBridgeRuntime {
if self.runtime.is_none() {
self.runtime = Some(FederationBridgeRuntime::for_role(&self.role));
}
self.runtime
.as_mut()
.expect("runtime must exist after lazy initialization")
}
fn leaf_config(&self, operation: &'static str) -> Result<&LeafConfig, FederationError> {
match &self.role {
FederationRole::LeafFabric(config) => Ok(config),
_ => Err(FederationError::RoleOperationMismatch {
operation,
expected: "leaf_fabric",
actual: self.role.name(),
}),
}
}
fn gateway_config(&self, operation: &'static str) -> Result<&GatewayConfig, FederationError> {
match &self.role {
FederationRole::GatewayFabric(config) => Ok(config),
_ => Err(FederationError::RoleOperationMismatch {
operation,
expected: "gateway_fabric",
actual: self.role.name(),
}),
}
}
fn replication_config(
&self,
operation: &'static str,
) -> Result<&ReplicationConfig, FederationError> {
match &self.role {
FederationRole::ReplicationLink(config) => Ok(config),
_ => Err(FederationError::RoleOperationMismatch {
operation,
expected: "replication_link",
actual: self.role.name(),
}),
}
}
fn edge_replay_config(
&self,
operation: &'static str,
) -> Result<&EdgeReplayConfig, FederationError> {
match &self.role {
FederationRole::EdgeReplayLink(config) => Ok(config),
_ => Err(FederationError::RoleOperationMismatch {
operation,
expected: "edge_replay_link",
actual: self.role.name(),
}),
}
}
fn leaf_runtime_mut(
&mut self,
operation: &'static str,
) -> Result<&mut LeafBridgeRuntime, FederationError> {
let actual = self.role.name();
match self.runtime_mut() {
FederationBridgeRuntime::Leaf(runtime) => Ok(runtime),
_ => Err(FederationError::RoleOperationMismatch {
operation,
expected: "leaf_fabric",
actual,
}),
}
}
fn gateway_runtime_mut(
&mut self,
operation: &'static str,
) -> Result<&mut GatewayBridgeRuntime, FederationError> {
let actual = self.role.name();
match self.runtime_mut() {
FederationBridgeRuntime::Gateway(runtime) => Ok(runtime),
_ => Err(FederationError::RoleOperationMismatch {
operation,
expected: "gateway_fabric",
actual,
}),
}
}
fn replication_runtime_mut(
&mut self,
operation: &'static str,
) -> Result<&mut ReplicationBridgeRuntime, FederationError> {
let actual = self.role.name();
match self.runtime_mut() {
FederationBridgeRuntime::Replication(runtime) => Ok(runtime),
_ => Err(FederationError::RoleOperationMismatch {
operation,
expected: "replication_link",
actual,
}),
}
}
fn edge_replay_runtime_mut(
&mut self,
operation: &'static str,
) -> Result<&mut EdgeReplayBridgeRuntime, FederationError> {
let actual = self.role.name();
match self.runtime_mut() {
FederationBridgeRuntime::EdgeReplay(runtime) => Ok(runtime),
_ => Err(FederationError::RoleOperationMismatch {
operation,
expected: "edge_replay_link",
actual,
}),
}
}
}
fn trim_replay_artifacts(runtime: &mut EdgeReplayBridgeRuntime, config: &EdgeReplayConfig) {
match &config.trace_retention {
TraceRetention::LatestArtifacts { max_artifacts } => {
let keep = usize::try_from(*max_artifacts).unwrap_or(usize::MAX);
if runtime.retained_artifacts.len() > keep {
let drop_count = runtime.retained_artifacts.len() - keep;
runtime.retained_artifacts.drain(..drop_count);
}
}
TraceRetention::DurationWindow { retention } => {
if let Some(latest) = runtime
.retained_artifacts
.last()
.map(|artifact| artifact.captured_at)
{
runtime
.retained_artifacts
.retain(|artifact| latest.saturating_sub(artifact.captured_at) <= *retention);
}
}
TraceRetention::UntilAcknowledged => {}
}
let depth_limit = usize::try_from(config.reconnection_replay_depth).unwrap_or(usize::MAX);
if runtime.retained_artifacts.len() > depth_limit {
let drop_count = runtime.retained_artifacts.len() - depth_limit;
runtime.retained_artifacts.drain(..drop_count);
}
}
fn ensure_capability_scope(
capability_scope: &BTreeSet<FabricCapability>,
morphism: &Morphism,
) -> Result<(), FederationError> {
for capability in &morphism.capability_requirements {
if !capability_scope.contains(capability) {
return Err(FederationError::CapabilityScopeMissing {
capability: *capability,
});
}
}
Ok(())
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum DistributedRestartEnvelope {
#[default]
Stop,
Restart {
max_restarts: u32,
window: Duration,
restart_cost: u64,
min_remaining_for_restart: Option<Duration>,
min_polls_for_restart: u32,
},
Escalate,
}
impl DistributedRestartEnvelope {
#[must_use]
pub fn restart(max_restarts: u32, window: Duration) -> Self {
Self::Restart {
max_restarts,
window,
restart_cost: 0,
min_remaining_for_restart: None,
min_polls_for_restart: 0,
}
}
#[must_use]
pub fn with_restart_cost(self, restart_cost: u64) -> Self {
match self {
Self::Restart {
max_restarts,
window,
min_remaining_for_restart,
min_polls_for_restart,
..
} => Self::Restart {
max_restarts,
window,
restart_cost,
min_remaining_for_restart,
min_polls_for_restart,
},
other => other,
}
}
#[must_use]
pub fn with_min_remaining(self, min_remaining_for_restart: Duration) -> Self {
match self {
Self::Restart {
max_restarts,
window,
restart_cost,
min_polls_for_restart,
..
} => Self::Restart {
max_restarts,
window,
restart_cost,
min_remaining_for_restart: Some(min_remaining_for_restart),
min_polls_for_restart,
},
other => other,
}
}
#[must_use]
pub fn with_min_polls(self, min_polls_for_restart: u32) -> Self {
match self {
Self::Restart {
max_restarts,
window,
restart_cost,
min_remaining_for_restart,
..
} => Self::Restart {
max_restarts,
window,
restart_cost,
min_remaining_for_restart,
min_polls_for_restart,
},
other => other,
}
}
fn lease_ttl(&self) -> Duration {
match self {
Self::Restart { window, .. } if !window.is_zero() => *window,
Self::Stop | Self::Escalate | Self::Restart { .. } => Duration::from_secs(30),
}
}
fn to_supervision_strategy(&self) -> Result<SupervisionStrategy, FederationError> {
match self {
Self::Stop => Ok(SupervisionStrategy::Stop),
Self::Escalate => Ok(SupervisionStrategy::Escalate),
Self::Restart {
max_restarts,
window,
restart_cost,
min_remaining_for_restart,
min_polls_for_restart,
} => {
if *max_restarts == 0 {
return Err(FederationError::ZeroRestartBudget);
}
if window.is_zero() {
return Err(FederationError::ZeroDuration {
field: "distributed_supervision.restart.window".to_owned(),
});
}
if min_remaining_for_restart.is_some_and(|duration| duration.is_zero()) {
return Err(FederationError::ZeroDuration {
field: "distributed_supervision.restart.min_remaining_for_restart"
.to_owned(),
});
}
let mut config = RestartConfig::new(*max_restarts, *window)
.with_restart_cost(*restart_cost)
.with_min_polls(*min_polls_for_restart);
if let Some(min_remaining) = min_remaining_for_restart {
config = config.with_min_remaining(*min_remaining);
}
Ok(SupervisionStrategy::Restart(config))
}
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct DistributedSupervisionNodeSpec {
pub node_id: NodeId,
pub namespace: NamespaceKernel,
pub mailbox_component: NamespaceComponent,
pub failure_domain: NamespaceComponent,
pub mailbox_capacity: usize,
pub restart_envelope: DistributedRestartEnvelope,
pub export_morphisms: Vec<Morphism>,
pub import_morphisms: Vec<Morphism>,
pub monitor_targets: Vec<NodeId>,
pub link_targets: Vec<NodeId>,
pub failover_targets: Vec<NodeId>,
}
impl DistributedSupervisionNodeSpec {
pub fn new(
node_id: impl Into<String>,
tenant: impl AsRef<str>,
service: impl AsRef<str>,
failure_domain: impl AsRef<str>,
mailbox_capacity: usize,
restart_envelope: DistributedRestartEnvelope,
) -> Result<Self, FederationError> {
if mailbox_capacity == 0 {
return Err(FederationError::ZeroMailboxCapacity);
}
let node_id = NodeId::new(node_id.into());
let mailbox_component = NamespaceComponent::parse(node_id.as_str())?;
let failure_domain = NamespaceComponent::parse(failure_domain)?;
Ok(Self {
node_id,
namespace: NamespaceKernel::new(tenant, service)?,
mailbox_component,
failure_domain,
mailbox_capacity,
restart_envelope,
export_morphisms: Vec::new(),
import_morphisms: Vec::new(),
monitor_targets: Vec::new(),
link_targets: Vec::new(),
failover_targets: Vec::new(),
})
}
#[must_use]
pub fn with_export_morphisms(mut self, export_morphisms: Vec<Morphism>) -> Self {
self.export_morphisms = export_morphisms;
self
}
#[must_use]
pub fn with_import_morphisms(mut self, import_morphisms: Vec<Morphism>) -> Self {
self.import_morphisms = import_morphisms;
self
}
#[must_use]
pub fn with_monitor_targets<I, S>(mut self, monitor_targets: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.monitor_targets = monitor_targets
.into_iter()
.map(|target| NodeId::new(target.into()))
.collect();
self
}
#[must_use]
pub fn with_link_targets<I, S>(mut self, link_targets: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.link_targets = link_targets
.into_iter()
.map(|target| NodeId::new(target.into()))
.collect();
self
}
#[must_use]
pub fn with_failover_targets<I, S>(mut self, failover_targets: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.failover_targets = failover_targets
.into_iter()
.map(|target| NodeId::new(target.into()))
.collect();
self
}
fn mailbox_subject(&self) -> Result<Subject, FederationError> {
Ok(self
.namespace
.mailbox_subject(self.mailbox_component.as_str())?)
}
fn registry_subject(&self) -> SubjectPattern {
SubjectPattern::from(&self.namespace.service_discovery_subject())
}
fn observability_subject(&self) -> Result<SubjectPattern, FederationError> {
Ok(SubjectPattern::from(
&self.namespace.observability_subject(format!(
"supervision-{}",
self.mailbox_component.as_str()
))?,
))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CompiledDistributedSupervisionNode {
pub node_id: NodeId,
pub failure_domain: String,
pub mailbox_capacity: usize,
pub mailbox_subject: SubjectPattern,
pub exported_mailbox_subject: SubjectPattern,
pub imported_mailbox_subject: SubjectPattern,
pub supervision_strategy: SupervisionStrategy,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DistributedMailboxRoute {
pub node_id: NodeId,
pub mailbox_subject: SubjectPattern,
pub exported_mailbox_subject: SubjectPattern,
pub imported_mailbox_subject: SubjectPattern,
pub export_classes: Vec<MorphismClass>,
pub import_classes: Vec<MorphismClass>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DistributedMonitorPlan {
pub watcher: NodeId,
pub monitored: NodeId,
pub notification_subject: SubjectPattern,
pub monitored_mailbox_subject: SubjectPattern,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DistributedLinkPlan {
pub left_node: NodeId,
pub right_node: NodeId,
pub control_subject: SubjectPattern,
pub family: SystemSubjectFamily,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DistributedRegistryLeasePlan {
pub node_id: NodeId,
pub failure_domain: String,
pub registry_subject: SubjectPattern,
pub lease_subject: SubjectPattern,
pub lease_ttl: Duration,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DistributedFailoverHandoffContract {
pub source_node: NodeId,
pub target_node: NodeId,
pub source_failure_domain: String,
pub target_failure_domain: String,
pub handoff_subject: SubjectPattern,
pub drain_subject: SubjectPattern,
pub registry_lease_subject: SubjectPattern,
pub evidence_subject: SubjectPattern,
pub target_strategy: SupervisionStrategy,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DistributedEvidenceHook {
pub node_id: NodeId,
pub observability_subject: SubjectPattern,
pub replay_subject: SubjectPattern,
pub family: SystemSubjectFamily,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DistributedSupervisionPlan {
pub nodes: Vec<CompiledDistributedSupervisionNode>,
pub mailbox_routes: Vec<DistributedMailboxRoute>,
pub monitor_plans: Vec<DistributedMonitorPlan>,
pub link_plans: Vec<DistributedLinkPlan>,
pub registry_leases: Vec<DistributedRegistryLeasePlan>,
pub failover_handoffs: Vec<DistributedFailoverHandoffContract>,
pub evidence_hooks: Vec<DistributedEvidenceHook>,
}
#[derive(Debug)]
struct DistributedNodeCompilationPass {
nodes: Vec<CompiledDistributedSupervisionNode>,
mailbox_routes: Vec<DistributedMailboxRoute>,
registry_leases: Vec<DistributedRegistryLeasePlan>,
evidence_hooks: Vec<DistributedEvidenceHook>,
}
#[derive(Debug)]
struct DistributedNodeArtifacts {
compiled_node: CompiledDistributedSupervisionNode,
mailbox_route: DistributedMailboxRoute,
registry_lease: DistributedRegistryLeasePlan,
evidence_hook: DistributedEvidenceHook,
}
#[derive(Debug)]
struct DistributedRelationCompilationPass {
monitor_plans: Vec<DistributedMonitorPlan>,
link_plans: Vec<DistributedLinkPlan>,
failover_handoffs: Vec<DistributedFailoverHandoffContract>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct DistributedSupervisionCompiler;
impl DistributedSupervisionCompiler {
pub fn compile(
nodes: &[DistributedSupervisionNodeSpec],
) -> Result<DistributedSupervisionPlan, FederationError> {
let by_id = build_distributed_supervision_index(nodes)?;
let node_pass = compile_distributed_nodes(&by_id)?;
let relation_pass = compile_distributed_relations(&by_id)?;
Ok(DistributedSupervisionPlan {
nodes: node_pass.nodes,
mailbox_routes: node_pass.mailbox_routes,
monitor_plans: relation_pass.monitor_plans,
link_plans: relation_pass.link_plans,
registry_leases: node_pass.registry_leases,
failover_handoffs: relation_pass.failover_handoffs,
evidence_hooks: node_pass.evidence_hooks,
})
}
}
fn build_distributed_supervision_index(
nodes: &[DistributedSupervisionNodeSpec],
) -> Result<BTreeMap<String, &DistributedSupervisionNodeSpec>, FederationError> {
if nodes.is_empty() {
return Err(FederationError::EmptyDistributedSupervisionGraph);
}
let mut by_id = BTreeMap::new();
for node in nodes {
if by_id
.insert(node.node_id.as_str().to_owned(), node)
.is_some()
{
return Err(FederationError::DuplicateDistributedSupervisionNode {
node_id: node.node_id.as_str().to_owned(),
});
}
if node.mailbox_capacity == 0 {
return Err(FederationError::ZeroMailboxCapacity);
}
for morphism in node
.export_morphisms
.iter()
.chain(node.import_morphisms.iter())
{
morphism.validate()?;
}
}
Ok(by_id)
}
fn compile_distributed_nodes(
by_id: &BTreeMap<String, &DistributedSupervisionNodeSpec>,
) -> Result<DistributedNodeCompilationPass, FederationError> {
let mut pass = DistributedNodeCompilationPass {
nodes: Vec::new(),
mailbox_routes: Vec::new(),
registry_leases: Vec::new(),
evidence_hooks: Vec::new(),
};
for node in by_id.values().copied() {
let artifacts = compile_distributed_node_artifacts(node)?;
pass.nodes.push(artifacts.compiled_node);
pass.mailbox_routes.push(artifacts.mailbox_route);
pass.registry_leases.push(artifacts.registry_lease);
pass.evidence_hooks.push(artifacts.evidence_hook);
}
Ok(pass)
}
fn compile_distributed_node_artifacts(
node: &DistributedSupervisionNodeSpec,
) -> Result<DistributedNodeArtifacts, FederationError> {
let mailbox_subject = node.mailbox_subject()?;
let mailbox_pattern = SubjectPattern::from(&mailbox_subject);
let exported_mailbox_subject =
apply_morphisms_to_subject(&mailbox_subject, &node.export_morphisms)?;
let imported_mailbox_subject =
apply_morphisms_to_subject(&mailbox_subject, &node.import_morphisms)?;
let supervision_strategy = node.restart_envelope.to_supervision_strategy()?;
let lease_subject = system_subject_pattern(
SystemSubjectFamily::Route,
&["registry-lease", node.mailbox_component.as_str()],
)?;
let replay_subject = system_subject_pattern(
SystemSubjectFamily::Replay,
&["supervision", node.mailbox_component.as_str()],
)?;
Ok(DistributedNodeArtifacts {
compiled_node: CompiledDistributedSupervisionNode {
node_id: node.node_id.clone(),
failure_domain: node.failure_domain.as_str().to_owned(),
mailbox_capacity: node.mailbox_capacity,
mailbox_subject: mailbox_pattern.clone(),
exported_mailbox_subject: exported_mailbox_subject.clone(),
imported_mailbox_subject: imported_mailbox_subject.clone(),
supervision_strategy,
},
mailbox_route: DistributedMailboxRoute {
node_id: node.node_id.clone(),
mailbox_subject: mailbox_pattern,
exported_mailbox_subject,
imported_mailbox_subject,
export_classes: node.export_morphisms.iter().map(|m| m.class).collect(),
import_classes: node.import_morphisms.iter().map(|m| m.class).collect(),
},
registry_lease: DistributedRegistryLeasePlan {
node_id: node.node_id.clone(),
failure_domain: node.failure_domain.as_str().to_owned(),
registry_subject: node.registry_subject(),
lease_subject,
lease_ttl: node.restart_envelope.lease_ttl(),
},
evidence_hook: DistributedEvidenceHook {
node_id: node.node_id.clone(),
observability_subject: node.observability_subject()?,
replay_subject,
family: SystemSubjectFamily::Replay,
},
})
}
fn compile_distributed_relations(
by_id: &BTreeMap<String, &DistributedSupervisionNodeSpec>,
) -> Result<DistributedRelationCompilationPass, FederationError> {
let mut pass = DistributedRelationCompilationPass {
monitor_plans: Vec::new(),
link_plans: Vec::new(),
failover_handoffs: Vec::new(),
};
let mut link_pairs = BTreeSet::new();
for node in by_id.values().copied() {
extend_monitor_plans(&mut pass.monitor_plans, by_id, node)?;
extend_link_plans(&mut pass.link_plans, &mut link_pairs, by_id, node)?;
extend_failover_handoffs(&mut pass.failover_handoffs, by_id, node)?;
}
Ok(pass)
}
fn extend_monitor_plans(
monitor_plans: &mut Vec<DistributedMonitorPlan>,
by_id: &BTreeMap<String, &DistributedSupervisionNodeSpec>,
node: &DistributedSupervisionNodeSpec,
) -> Result<(), FederationError> {
for target_id in dedup_node_targets(&node.monitor_targets) {
let target = resolve_distributed_target(by_id, node, &target_id, "monitor")?;
monitor_plans.push(DistributedMonitorPlan {
watcher: node.node_id.clone(),
monitored: target.node_id.clone(),
notification_subject: system_subject_pattern(
SystemSubjectFamily::Drain,
&[
"monitor",
node.mailbox_component.as_str(),
target.mailbox_component.as_str(),
],
)?,
monitored_mailbox_subject: SubjectPattern::from(&target.mailbox_subject()?),
});
}
Ok(())
}
fn extend_link_plans(
link_plans: &mut Vec<DistributedLinkPlan>,
link_pairs: &mut BTreeSet<(String, String)>,
by_id: &BTreeMap<String, &DistributedSupervisionNodeSpec>,
node: &DistributedSupervisionNodeSpec,
) -> Result<(), FederationError> {
for target_id in dedup_node_targets(&node.link_targets) {
let target = resolve_distributed_target(by_id, node, &target_id, "link")?;
let (left, right) = canonical_node_pair(&node.node_id, &target.node_id);
if link_pairs.insert((left.as_str().to_owned(), right.as_str().to_owned())) {
link_plans.push(DistributedLinkPlan {
left_node: left.clone(),
right_node: right.clone(),
control_subject: system_subject_pattern(
SystemSubjectFamily::Drain,
&["link", node_component(&left)?, node_component(&right)?],
)?,
family: SystemSubjectFamily::Drain,
});
}
}
Ok(())
}
fn extend_failover_handoffs(
failover_handoffs: &mut Vec<DistributedFailoverHandoffContract>,
by_id: &BTreeMap<String, &DistributedSupervisionNodeSpec>,
node: &DistributedSupervisionNodeSpec,
) -> Result<(), FederationError> {
for target_id in dedup_node_targets(&node.failover_targets) {
let target = resolve_distributed_target(by_id, node, &target_id, "failover_target")?;
if node.failure_domain == target.failure_domain {
return Err(FederationError::FailoverTargetSameFailureDomain {
node_id: node.node_id.as_str().to_owned(),
target: target.node_id.as_str().to_owned(),
failure_domain: node.failure_domain.as_str().to_owned(),
});
}
failover_handoffs.push(DistributedFailoverHandoffContract {
source_node: node.node_id.clone(),
target_node: target.node_id.clone(),
source_failure_domain: node.failure_domain.as_str().to_owned(),
target_failure_domain: target.failure_domain.as_str().to_owned(),
handoff_subject: system_subject_pattern(
SystemSubjectFamily::Drain,
&[
"handoff",
node.mailbox_component.as_str(),
target.mailbox_component.as_str(),
],
)?,
drain_subject: system_subject_pattern(
SystemSubjectFamily::Drain,
&[
"failover",
node.mailbox_component.as_str(),
target.mailbox_component.as_str(),
],
)?,
registry_lease_subject: system_subject_pattern(
SystemSubjectFamily::Route,
&["registry-lease", target.mailbox_component.as_str()],
)?,
evidence_subject: system_subject_pattern(
SystemSubjectFamily::Replay,
&[
"failover",
node.mailbox_component.as_str(),
target.mailbox_component.as_str(),
],
)?,
target_strategy: target.restart_envelope.to_supervision_strategy()?,
});
}
Ok(())
}
fn apply_morphisms_to_subject(
subject: &Subject,
morphisms: &[Morphism],
) -> Result<SubjectPattern, FederationError> {
let mut tokens = subject.tokens().to_vec();
for morphism in morphisms {
morphism.validate()?;
tokens = morphism.transform.apply_tokens(&tokens)?;
}
Ok(SubjectPattern::from(&Subject::parse(&tokens.join("."))?))
}
fn dedup_node_targets(targets: &[NodeId]) -> Vec<NodeId> {
targets
.iter()
.cloned()
.collect::<BTreeSet<_>>()
.into_iter()
.collect()
}
fn resolve_distributed_target<'a>(
by_id: &BTreeMap<String, &'a DistributedSupervisionNodeSpec>,
source: &DistributedSupervisionNodeSpec,
target: &NodeId,
relation: &'static str,
) -> Result<&'a DistributedSupervisionNodeSpec, FederationError> {
if source.node_id == *target {
return Err(FederationError::SelfReferentialDistributedRelation {
node_id: source.node_id.as_str().to_owned(),
relation,
});
}
by_id.get(target.as_str()).copied().ok_or_else(|| {
FederationError::UnknownDistributedSupervisionTarget {
node_id: source.node_id.as_str().to_owned(),
target: target.as_str().to_owned(),
relation,
}
})
}
fn canonical_node_pair(left: &NodeId, right: &NodeId) -> (NodeId, NodeId) {
if left <= right {
(left.clone(), right.clone())
} else {
(right.clone(), left.clone())
}
}
fn node_component(node: &NodeId) -> Result<&str, FederationError> {
NamespaceComponent::parse(node.as_str())?;
Ok(node.as_str())
}
fn system_subject_pattern(
family: SystemSubjectFamily,
suffix: &[&str],
) -> Result<SubjectPattern, FederationError> {
let mut raw = family.prefix();
for component in suffix {
NamespaceComponent::parse(component)?;
raw.push('.');
raw.push_str(component);
}
Ok(SubjectPattern::parse(&raw)?)
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum FederationError {
#[error("duration at `{field}` must be greater than zero")]
ZeroDuration {
field: String,
},
#[error("leaf offline buffer limit must be greater than zero")]
ZeroOfflineBufferLimit,
#[error("leaf morphism constraints must allow at least one morphism class")]
EmptyAllowedMorphismClasses,
#[error("leaf morphism max expansion factor must be greater than zero")]
ZeroMaxExpansionFactor,
#[error("leaf morphism max fanout must be greater than zero")]
ZeroMaxFanout,
#[error("gateway amplification limit must be greater than zero")]
ZeroAmplificationLimit,
#[error("trace-retention artifact limit must be greater than zero")]
ZeroTraceArtifactLimit,
#[error("reconnection replay depth must be greater than zero")]
ZeroReplayDepth,
#[error("federation bridge capability scope must not be empty")]
EmptyCapabilityScope,
#[error("federation bridge must declare at least one local or remote morphism")]
EmptyMorphismSet,
#[error("bridge capability scope is missing required capability `{capability:?}`")]
CapabilityScopeMissing {
capability: FabricCapability,
},
#[error("leaf morphism constraints do not admit class `{class:?}`")]
LeafMorphismClassNotAllowed {
class: MorphismClass,
},
#[error("leaf morphism expansion factor {actual} exceeds configured max {max}")]
LeafExpansionFactorExceeded {
actual: u16,
max: u16,
},
#[error("leaf morphism fanout {actual} exceeds configured max {max}")]
LeafFanoutExceeded {
actual: u16,
max: u16,
},
#[error("gateway morphism fanout {actual} exceeds amplification limit {max}")]
GatewayAmplificationExceeded {
actual: u16,
max: u16,
},
#[error("operation `{operation}` requires role `{expected}`, but bridge role is `{actual}`")]
RoleOperationMismatch {
operation: &'static str,
expected: &'static str,
actual: &'static str,
},
#[error("operation `{operation}` is not available while bridge state is `{state:?}`")]
BridgeNotOperational {
operation: &'static str,
state: FederationBridgeState,
},
#[error("edge replay links require observe-evidence capability in scope")]
EdgeReplayRequiresObserveEvidence,
#[error("distributed supervision mailbox capacity must be greater than zero")]
ZeroMailboxCapacity,
#[error("distributed supervision restart envelope must allow at least one restart")]
ZeroRestartBudget,
#[error("distributed supervision graph must contain at least one node")]
EmptyDistributedSupervisionGraph,
#[error("duplicate distributed supervision node `{node_id}`")]
DuplicateDistributedSupervisionNode {
node_id: String,
},
#[error(
"distributed supervision node `{node_id}` references unknown {relation} target `{target}`"
)]
UnknownDistributedSupervisionTarget {
node_id: String,
target: String,
relation: &'static str,
},
#[error("distributed supervision node `{node_id}` must not declare a self-{relation}")]
SelfReferentialDistributedRelation {
node_id: String,
relation: &'static str,
},
#[error(
"distributed supervision failover from `{node_id}` to `{target}` must cross failure domains (current domain `{failure_domain}`)"
)]
FailoverTargetSameFailureDomain {
node_id: String,
target: String,
failure_domain: String,
},
#[error("cannot activate a closed federation bridge")]
CannotActivateClosedBridge,
#[error("cannot degrade a closed federation bridge")]
CannotDegradeClosedBridge,
#[error(transparent)]
SnapshotDecode(#[from] SnapshotError),
#[error("distributed bridge operation `{operation}` failed: {message}")]
DistributedBridgeOperationFailed {
operation: String,
message: String,
},
#[error("replication transfer sequence mismatch: expected {expected}, got {actual}")]
ReplicationTransferSequenceMismatch {
expected: u64,
actual: u64,
},
#[error("replication transfer hash mismatch: expected {expected}, got {actual}")]
ReplicationTransferHashMismatch {
expected: u64,
actual: u64,
},
#[error(
"replication catch-up cannot treat remote peer as converged when remote sequence {remote_sequence} exceeds local sequence {local_sequence}"
)]
ReplicationCatchUpRemoteAhead {
local_sequence: u64,
remote_sequence: u64,
},
#[error(transparent)]
MorphismValidation(#[from] MorphismValidationError),
#[error(transparent)]
MorphismEvaluation(#[from] MorphismEvaluationError),
#[error(transparent)]
SubjectPattern(#[from] SubjectPatternError),
#[error(transparent)]
NamespaceKernel(#[from] NamespaceKernelError),
}
#[cfg(test)]
mod tests {
use super::super::morphism::{
ResponsePolicy, ReversibilityRequirement, SharingPolicy, SubjectTransform,
};
use super::*;
use crate::types::{Budget, RegionId, TaskId};
use crate::util::ArenaIndex;
fn derived_view_morphism() -> Morphism {
Morphism::default()
}
fn authoritative_morphism() -> Morphism {
Morphism {
class: MorphismClass::Authoritative,
reversibility: ReversibilityRequirement::Bijective,
capability_requirements: vec![FabricCapability::CarryAuthority],
response_policy: ResponsePolicy::ReplyAuthoritative,
..Morphism::default()
}
}
fn region_id(n: u32) -> RegionId {
RegionId::from_arena(ArenaIndex::new(n, 0))
}
fn task_id(n: u32) -> TaskId {
TaskId::from_arena(ArenaIndex::new(n, 0))
}
fn replay_artifact(
artifact_id: &str,
family: SystemSubjectFamily,
captured_secs: u64,
sequence: u64,
) -> ReplayArtifactRecord {
ReplayArtifactRecord {
artifact_id: artifact_id.to_owned(),
family,
captured_at: Duration::from_secs(captured_secs),
sequence,
bytes: 256,
}
}
fn distributed_node(
node_id: &str,
service: &str,
failure_domain: &str,
) -> DistributedSupervisionNodeSpec {
DistributedSupervisionNodeSpec::new(
node_id,
"acme",
service,
failure_domain,
64,
DistributedRestartEnvelope::restart(3, Duration::from_secs(45)),
)
.expect("distributed node should be valid")
}
fn rename_mailbox_prefix(service: &str, replacement: &str) -> Morphism {
let mut morphism = derived_view_morphism();
morphism.transform = SubjectTransform::RenamePrefix {
from: SubjectPattern::new(format!("tenant.acme.service.{service}.mailbox")),
to: SubjectPattern::new(format!("tenant.acme.service.{service}.{replacement}")),
};
morphism
}
#[test]
fn leaf_bridge_accepts_constrained_morphisms() {
let bridge = FederationBridge::new(
FederationRole::LeafFabric(LeafConfig::default()),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.expect("leaf bridge should accept bounded derived-view morphisms");
assert_eq!(bridge.role.name(), "leaf_fabric");
assert_eq!(bridge.state, FederationBridgeState::Provisioning);
}
#[test]
fn leaf_bridge_rejects_disallowed_authoritative_morphism() {
let err = FederationBridge::new(
FederationRole::LeafFabric(LeafConfig::default()),
vec![authoritative_morphism()],
Vec::new(),
[FabricCapability::CarryAuthority],
)
.expect_err("leaf bridge should reject authoritative morphisms");
assert_eq!(
err,
FederationError::LeafMorphismClassNotAllowed {
class: MorphismClass::Authoritative,
}
);
}
#[test]
fn gateway_config_rejects_zero_convergence_timeout() {
let role = FederationRole::GatewayFabric(GatewayConfig {
convergence_timeout: Duration::ZERO,
..GatewayConfig::default()
});
let err = role
.validate()
.expect_err("zero convergence timeout must be rejected");
assert_eq!(
err,
FederationError::ZeroDuration {
field: "role.gateway_fabric.convergence_timeout".to_owned(),
}
);
}
#[test]
fn gateway_bridge_rejects_morphism_fanout_above_limit() {
let mut morphism = derived_view_morphism();
morphism.quota_policy.max_fanout = 9;
let role = FederationRole::GatewayFabric(GatewayConfig {
amplification_limit: 4,
..GatewayConfig::default()
});
let err = FederationBridge::new(
role,
vec![morphism],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.expect_err("gateway should reject excessive fanout");
assert_eq!(
err,
FederationError::GatewayAmplificationExceeded { actual: 9, max: 4 }
);
}
#[test]
fn edge_replay_bridge_requires_observe_evidence_capability() {
let err = FederationBridge::new(
FederationRole::EdgeReplayLink(EdgeReplayConfig::default()),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.expect_err("edge replay should require evidence capability");
assert_eq!(err, FederationError::EdgeReplayRequiresObserveEvidence);
}
#[test]
fn bridge_lifecycle_moves_through_active_degraded_and_closed_states() {
let mut bridge = FederationBridge::new(
FederationRole::ReplicationLink(ReplicationConfig::default()),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.expect("replication bridge should be valid");
bridge.activate().expect("bridge should activate");
assert_eq!(bridge.state, FederationBridgeState::Active);
bridge
.mark_degraded()
.expect("bridge should enter degraded state");
assert_eq!(bridge.state, FederationBridgeState::Degraded);
bridge.activate().expect("bridge should reactivate");
assert_eq!(bridge.state, FederationBridgeState::Active);
bridge.close();
assert_eq!(bridge.state, FederationBridgeState::Closed);
assert_eq!(
bridge
.activate()
.expect_err("closed bridge must not reactivate"),
FederationError::CannotActivateClosedBridge
);
}
#[test]
fn morphism_constraints_default_allows_derived_view_and_egress() {
let mc = MorphismConstraints::default();
assert!(mc.allowed_classes.contains(&MorphismClass::DerivedView));
assert!(mc.allowed_classes.contains(&MorphismClass::Egress));
assert_eq!(mc.allowed_classes.len(), 2);
assert!(mc.validate().is_ok());
}
#[test]
fn morphism_constraints_rejects_empty_allowed_classes() {
let mc = MorphismConstraints {
allowed_classes: BTreeSet::new(),
..MorphismConstraints::default()
};
assert_eq!(
mc.validate().unwrap_err(),
FederationError::EmptyAllowedMorphismClasses
);
}
#[test]
fn morphism_constraints_rejects_zero_expansion_factor() {
let mc = MorphismConstraints {
max_expansion_factor: 0,
..MorphismConstraints::default()
};
assert_eq!(
mc.validate().unwrap_err(),
FederationError::ZeroMaxExpansionFactor
);
}
#[test]
fn morphism_constraints_rejects_zero_fanout() {
let mc = MorphismConstraints {
max_fanout: 0,
..MorphismConstraints::default()
};
assert_eq!(mc.validate().unwrap_err(), FederationError::ZeroMaxFanout);
}
#[test]
fn morphism_constraints_admits_within_bounds() {
let mc = MorphismConstraints::default();
let m = derived_view_morphism();
assert!(mc.admits(&m).is_ok());
}
#[test]
fn morphism_constraints_rejects_expansion_factor_exceeded() {
let mc = MorphismConstraints {
max_expansion_factor: 2,
..MorphismConstraints::default()
};
let mut m = derived_view_morphism();
m.quota_policy.max_expansion_factor = 5;
match mc.admits(&m) {
Err(FederationError::LeafExpansionFactorExceeded { actual, max }) => {
assert_eq!(actual, 5);
assert_eq!(max, 2);
}
other => panic!("expected LeafExpansionFactorExceeded, got {other:?}"),
}
}
#[test]
fn morphism_constraints_rejects_fanout_exceeded() {
let mc = MorphismConstraints {
max_fanout: 3,
..MorphismConstraints::default()
};
let mut m = derived_view_morphism();
m.quota_policy.max_fanout = 10;
match mc.admits(&m) {
Err(FederationError::LeafFanoutExceeded { actual, max }) => {
assert_eq!(actual, 10);
assert_eq!(max, 3);
}
other => panic!("expected LeafFanoutExceeded, got {other:?}"),
}
}
#[test]
fn leaf_config_default_validates() {
let config = LeafConfig::default();
assert!(config.validate().is_ok());
assert!(config.max_reconnect_backoff > Duration::ZERO);
assert!(config.offline_buffer_limit > 0);
}
#[test]
fn leaf_config_rejects_zero_reconnect_backoff() {
let config = LeafConfig {
max_reconnect_backoff: Duration::ZERO,
..LeafConfig::default()
};
match config.validate() {
Err(FederationError::ZeroDuration { field }) => {
assert!(field.contains("max_reconnect_backoff"));
}
other => panic!("expected ZeroDuration, got {other:?}"),
}
}
#[test]
fn leaf_config_rejects_zero_offline_buffer() {
let config = LeafConfig {
offline_buffer_limit: 0,
..LeafConfig::default()
};
assert_eq!(
config.validate().unwrap_err(),
FederationError::ZeroOfflineBufferLimit
);
}
#[test]
fn gateway_config_default_validates() {
let config = GatewayConfig::default();
assert!(config.validate().is_ok());
assert_eq!(
config.interest_propagation_policy,
InterestPropagationPolicy::DemandDriven
);
}
#[test]
fn gateway_config_rejects_zero_amplification_limit() {
let config = GatewayConfig {
amplification_limit: 0,
..GatewayConfig::default()
};
assert_eq!(
config.validate().unwrap_err(),
FederationError::ZeroAmplificationLimit
);
}
#[test]
fn replication_config_default_validates() {
let config = ReplicationConfig::default();
assert!(config.validate().is_ok());
assert_eq!(config.ordering_guarantee, OrderingGuarantee::PerSubject);
assert_eq!(config.catch_up_policy, CatchUpPolicy::SnapshotThenDelta);
}
#[test]
fn replication_config_rejects_zero_snapshot_interval() {
let config = ReplicationConfig {
snapshot_interval: Duration::ZERO,
..ReplicationConfig::default()
};
match config.validate() {
Err(FederationError::ZeroDuration { field }) => {
assert!(field.contains("snapshot_interval"));
}
other => panic!("expected ZeroDuration, got {other:?}"),
}
}
#[test]
fn trace_retention_default_validates() {
let retention = TraceRetention::default();
assert!(retention.validate().is_ok());
assert!(matches!(
retention,
TraceRetention::LatestArtifacts { max_artifacts: 128 }
));
}
#[test]
fn trace_retention_rejects_zero_artifacts() {
let retention = TraceRetention::LatestArtifacts { max_artifacts: 0 };
assert_eq!(
retention.validate().unwrap_err(),
FederationError::ZeroTraceArtifactLimit
);
}
#[test]
fn trace_retention_rejects_zero_duration_window() {
let retention = TraceRetention::DurationWindow {
retention: Duration::ZERO,
};
match retention.validate() {
Err(FederationError::ZeroDuration { .. }) => {}
other => panic!("expected ZeroDuration, got {other:?}"),
}
}
#[test]
fn trace_retention_until_acknowledged_validates() {
let retention = TraceRetention::UntilAcknowledged;
assert!(retention.validate().is_ok());
}
#[test]
fn edge_replay_config_default_validates() {
let config = EdgeReplayConfig::default();
assert!(config.validate().is_ok());
assert_eq!(
config.evidence_shipping_policy,
EvidenceShippingPolicy::OnReconnect
);
assert!(config.reconnection_replay_depth > 0);
}
#[test]
fn edge_replay_config_rejects_zero_replay_depth() {
let config = EdgeReplayConfig {
reconnection_replay_depth: 0,
..EdgeReplayConfig::default()
};
assert_eq!(
config.validate().unwrap_err(),
FederationError::ZeroReplayDepth
);
}
#[test]
fn all_role_names_are_distinct() {
let roles = [
FederationRole::LeafFabric(LeafConfig::default()),
FederationRole::GatewayFabric(GatewayConfig::default()),
FederationRole::ReplicationLink(ReplicationConfig::default()),
FederationRole::EdgeReplayLink(EdgeReplayConfig::default()),
];
let mut names: Vec<&str> = roles.iter().map(super::FederationRole::name).collect();
let orig = names.len();
names.sort_unstable();
names.dedup();
assert_eq!(names.len(), orig, "role names must be unique");
}
#[test]
fn all_default_role_configs_validate() {
let roles = [
FederationRole::LeafFabric(LeafConfig::default()),
FederationRole::GatewayFabric(GatewayConfig::default()),
FederationRole::ReplicationLink(ReplicationConfig::default()),
FederationRole::EdgeReplayLink(EdgeReplayConfig::default()),
];
for role in &roles {
assert!(
role.validate().is_ok(),
"role {} default config should validate",
role.name()
);
}
}
#[test]
fn bridge_rejects_empty_capability_scope() {
let err = FederationBridge::new(
FederationRole::ReplicationLink(ReplicationConfig::default()),
vec![derived_view_morphism()],
Vec::new(),
Vec::<FabricCapability>::new(),
)
.unwrap_err();
assert_eq!(err, FederationError::EmptyCapabilityScope);
}
#[test]
fn bridge_rejects_empty_morphism_set() {
let err = FederationBridge::new(
FederationRole::ReplicationLink(ReplicationConfig::default()),
Vec::new(),
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.unwrap_err();
assert_eq!(err, FederationError::EmptyMorphismSet);
}
#[test]
fn bridge_rejects_missing_capability_for_morphism() {
let err = FederationBridge::new(
FederationRole::ReplicationLink(ReplicationConfig::default()),
vec![authoritative_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.unwrap_err();
assert_eq!(
err,
FederationError::CapabilityScopeMissing {
capability: FabricCapability::CarryAuthority,
}
);
}
#[test]
fn bridge_accepts_morphisms_on_remote_side_only() {
let bridge = FederationBridge::new(
FederationRole::ReplicationLink(ReplicationConfig::default()),
Vec::new(),
vec![derived_view_morphism()],
[FabricCapability::RewriteNamespace],
)
.expect("remote-only morphisms should be accepted");
assert!(bridge.local_morphisms.is_empty());
assert_eq!(bridge.remote_morphisms.len(), 1);
}
#[test]
fn bridge_accepts_morphisms_on_both_sides() {
let bridge = FederationBridge::new(
FederationRole::ReplicationLink(ReplicationConfig::default()),
vec![derived_view_morphism()],
vec![derived_view_morphism()],
[FabricCapability::RewriteNamespace],
)
.expect("morphisms on both sides should be accepted");
assert_eq!(bridge.local_morphisms.len(), 1);
assert_eq!(bridge.remote_morphisms.len(), 1);
}
#[test]
fn edge_replay_bridge_succeeds_with_observe_evidence() {
let bridge = FederationBridge::new(
FederationRole::EdgeReplayLink(EdgeReplayConfig::default()),
vec![derived_view_morphism()],
Vec::new(),
[
FabricCapability::RewriteNamespace,
FabricCapability::ObserveEvidence,
],
)
.expect("edge replay with ObserveEvidence should succeed");
assert_eq!(bridge.role.name(), "edge_replay_link");
}
#[test]
fn bridge_starts_in_provisioning() {
let bridge = FederationBridge::new(
FederationRole::ReplicationLink(ReplicationConfig::default()),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.unwrap();
assert_eq!(bridge.state, FederationBridgeState::Provisioning);
}
#[test]
fn closed_bridge_cannot_be_degraded() {
let mut bridge = FederationBridge::new(
FederationRole::ReplicationLink(ReplicationConfig::default()),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.unwrap();
bridge.close();
assert_eq!(
bridge.mark_degraded().unwrap_err(),
FederationError::CannotDegradeClosedBridge
);
}
#[test]
fn degraded_bridge_can_be_reactivated() {
let mut bridge = FederationBridge::new(
FederationRole::ReplicationLink(ReplicationConfig::default()),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.unwrap();
bridge.mark_degraded().unwrap();
bridge
.activate()
.expect("degraded bridge should reactivate");
assert_eq!(bridge.state, FederationBridgeState::Active);
}
#[test]
fn leaf_config_json_round_trip() {
let config = LeafConfig::default();
let json = serde_json::to_string(&config).expect("serialize");
let roundtrip: LeafConfig = serde_json::from_str(&json).expect("deserialize");
assert_eq!(config, roundtrip);
}
#[test]
fn gateway_config_json_round_trip() {
let config = GatewayConfig::default();
let json = serde_json::to_string(&config).expect("serialize");
let roundtrip: GatewayConfig = serde_json::from_str(&json).expect("deserialize");
assert_eq!(config, roundtrip);
}
#[test]
fn replication_config_json_round_trip() {
let config = ReplicationConfig::default();
let json = serde_json::to_string(&config).expect("serialize");
let roundtrip: ReplicationConfig = serde_json::from_str(&json).expect("deserialize");
assert_eq!(config, roundtrip);
}
#[test]
fn edge_replay_config_json_round_trip() {
let config = EdgeReplayConfig::default();
let json = serde_json::to_string(&config).expect("serialize");
let roundtrip: EdgeReplayConfig = serde_json::from_str(&json).expect("deserialize");
assert_eq!(config, roundtrip);
}
#[test]
fn federation_role_tagged_json_round_trip() {
for role in [
FederationRole::LeafFabric(LeafConfig::default()),
FederationRole::GatewayFabric(GatewayConfig::default()),
FederationRole::ReplicationLink(ReplicationConfig::default()),
FederationRole::EdgeReplayLink(EdgeReplayConfig::default()),
] {
let json = serde_json::to_string(&role).expect("serialize");
let roundtrip: FederationRole = serde_json::from_str(&json).expect("deserialize");
assert_eq!(role, roundtrip);
}
}
#[test]
fn bridge_state_json_round_trip() {
for state in [
FederationBridgeState::Provisioning,
FederationBridgeState::Active,
FederationBridgeState::Degraded,
FederationBridgeState::Closed,
] {
let json = serde_json::to_string(&state).expect("serialize");
let roundtrip: FederationBridgeState =
serde_json::from_str(&json).expect("deserialize");
assert_eq!(state, roundtrip);
}
}
#[test]
fn federation_bridge_json_round_trip_preserves_config_and_resets_ephemeral_runtime() {
let mut bridge = FederationBridge::new(
FederationRole::LeafFabric(LeafConfig::default()),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.unwrap();
bridge.mark_degraded().unwrap();
let _ = bridge
.queue_leaf_route(
FederationDirection::LocalToRemote,
SubjectPattern::new("tenant.audit.>"),
1,
)
.unwrap();
let json = serde_json::to_string(&bridge).expect("serialize bridge");
let roundtrip: FederationBridge = serde_json::from_str(&json).expect("deserialize bridge");
assert_eq!(bridge.role, roundtrip.role);
assert_eq!(bridge.local_morphisms, roundtrip.local_morphisms);
assert_eq!(bridge.remote_morphisms, roundtrip.remote_morphisms);
assert_eq!(bridge.capability_scope, roundtrip.capability_scope);
assert_eq!(bridge.state, roundtrip.state);
assert!(roundtrip.runtime.is_none());
assert_eq!(
roundtrip.runtime(),
FederationBridgeRuntime::Leaf(LeafBridgeRuntime::default())
);
assert_ne!(bridge.runtime(), roundtrip.runtime());
assert_eq!(bridge, roundtrip);
}
#[test]
fn federation_bridge_pristine_round_trip_preserves_normalized_equality() {
let bridge = FederationBridge::new(
FederationRole::GatewayFabric(GatewayConfig::default()),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.unwrap();
let json = serde_json::to_string(&bridge).expect("serialize bridge");
let roundtrip: FederationBridge = serde_json::from_str(&json).expect("deserialize bridge");
assert!(roundtrip.runtime.is_none());
assert_eq!(bridge.runtime(), roundtrip.runtime());
assert_eq!(bridge, roundtrip);
}
#[test]
fn federation_bridge_equality_ignores_ephemeral_runtime_state() {
let mut buffered = FederationBridge::new(
FederationRole::LeafFabric(LeafConfig::default()),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.unwrap();
let mut pristine = FederationBridge::new(
FederationRole::LeafFabric(LeafConfig::default()),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.unwrap();
buffered.mark_degraded().unwrap();
pristine.mark_degraded().unwrap();
let _ = buffered
.queue_leaf_route(
FederationDirection::LocalToRemote,
SubjectPattern::new("tenant.audit.>"),
1,
)
.unwrap();
assert_ne!(buffered.runtime(), pristine.runtime());
assert_eq!(buffered, pristine);
}
#[test]
fn interest_propagation_all_variants_json_round_trip() {
for policy in [
InterestPropagationPolicy::ExplicitSubscriptions,
InterestPropagationPolicy::PrefixAnnouncements,
InterestPropagationPolicy::DemandDriven,
] {
let json = serde_json::to_string(&policy).expect("serialize");
let roundtrip: InterestPropagationPolicy =
serde_json::from_str(&json).expect("deserialize");
assert_eq!(policy, roundtrip);
}
}
#[test]
fn ordering_guarantee_all_variants_json_round_trip() {
for guarantee in [
OrderingGuarantee::PerSubject,
OrderingGuarantee::SnapshotConsistent,
OrderingGuarantee::CheckpointBounded,
] {
let json = serde_json::to_string(&guarantee).expect("serialize");
let roundtrip: OrderingGuarantee = serde_json::from_str(&json).expect("deserialize");
assert_eq!(guarantee, roundtrip);
}
}
#[test]
fn catch_up_policy_all_variants_json_round_trip() {
for policy in [
CatchUpPolicy::SnapshotRequired,
CatchUpPolicy::SnapshotThenDelta,
CatchUpPolicy::LogOnly,
] {
let json = serde_json::to_string(&policy).expect("serialize");
let roundtrip: CatchUpPolicy = serde_json::from_str(&json).expect("deserialize");
assert_eq!(policy, roundtrip);
}
}
#[test]
fn trace_retention_all_variants_json_round_trip() {
for retention in [
TraceRetention::LatestArtifacts { max_artifacts: 42 },
TraceRetention::DurationWindow {
retention: Duration::from_secs(3600),
},
TraceRetention::UntilAcknowledged,
] {
let json = serde_json::to_string(&retention).expect("serialize");
let roundtrip: TraceRetention = serde_json::from_str(&json).expect("deserialize");
assert_eq!(retention, roundtrip);
}
}
#[test]
fn bridge_states_have_consistent_ordering() {
assert!(FederationBridgeState::Provisioning < FederationBridgeState::Active);
assert!(FederationBridgeState::Active < FederationBridgeState::Degraded);
assert!(FederationBridgeState::Degraded < FederationBridgeState::Closed);
}
#[test]
fn gateway_bridge_accepts_morphism_within_limit() {
let mut morphism = derived_view_morphism();
morphism.quota_policy.max_fanout = 4;
let bridge = FederationBridge::new(
FederationRole::GatewayFabric(GatewayConfig {
amplification_limit: 4,
..GatewayConfig::default()
}),
vec![morphism],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.expect("gateway should accept morphism at limit boundary");
assert_eq!(bridge.role.name(), "gateway_fabric");
}
#[test]
fn leaf_accepts_egress_morphism() {
let mut morphism = derived_view_morphism();
morphism.class = MorphismClass::Egress;
morphism.response_policy = ResponsePolicy::StripReplies;
morphism.reversibility = ReversibilityRequirement::Irreversible;
morphism.sharing_policy = SharingPolicy::Federated;
let bridge = FederationBridge::new(
FederationRole::LeafFabric(LeafConfig::default()),
vec![morphism],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.expect("leaf should accept egress morphisms");
assert_eq!(bridge.role.name(), "leaf_fabric");
}
#[test]
fn leaf_bridge_buffers_routes_until_reactivation() {
let mut bridge = FederationBridge::new(
FederationRole::LeafFabric(LeafConfig {
offline_buffer_limit: 2,
..LeafConfig::default()
}),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.unwrap();
bridge.mark_degraded().unwrap();
let first = bridge
.queue_leaf_route(
FederationDirection::LocalToRemote,
SubjectPattern::new("tenant.alpha.>"),
1,
)
.unwrap();
let second = bridge
.queue_leaf_route(
FederationDirection::LocalToRemote,
SubjectPattern::new("tenant.beta.>"),
1,
)
.unwrap();
let third = bridge
.queue_leaf_route(
FederationDirection::RemoteToLocal,
SubjectPattern::new("tenant.gamma.>"),
1,
)
.unwrap();
assert!(matches!(
first,
LeafRouteDisposition::Buffered {
buffered_entries: 1,
dropped_entries: 0,
..
}
));
assert!(matches!(
second,
LeafRouteDisposition::Buffered {
buffered_entries: 2,
dropped_entries: 0,
..
}
));
assert!(matches!(
third,
LeafRouteDisposition::Buffered {
buffered_entries: 2,
dropped_entries: 1,
..
}
));
bridge.activate().unwrap();
let drain = bridge.drain_leaf_buffer().unwrap();
let drained_subjects: Vec<&str> = drain
.routes
.iter()
.map(|route| route.subject.as_str())
.collect();
assert_eq!(drain.dropped_entries, 1);
assert_eq!(drained_subjects, vec!["tenant.beta.>", "tenant.gamma.>"]);
}
#[test]
fn gateway_bridge_applies_budgeted_interest_and_convergence() {
let mut bridge = FederationBridge::new(
FederationRole::GatewayFabric(GatewayConfig {
amplification_limit: 4,
convergence_timeout: Duration::from_secs(5),
..GatewayConfig::default()
}),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.unwrap();
bridge.activate().unwrap();
let err = bridge
.plan_gateway_interest(
SystemSubjectFamily::Route,
SubjectPattern::new("tenant.route.>"),
5,
ControlBudget {
poll_quota: 3,
..ControlBudget::default()
},
)
.unwrap_err();
assert_eq!(
err,
FederationError::GatewayAmplificationExceeded { actual: 5, max: 3 }
);
let plan = bridge
.plan_gateway_interest(
SystemSubjectFamily::Route,
SubjectPattern::new("tenant.route.>"),
2,
ControlBudget {
poll_quota: 3,
..ControlBudget::default()
},
)
.unwrap();
assert_eq!(plan.admitted_amplification, 2);
let advisory = bridge
.forward_gateway_advisory(
SystemSubjectFamily::Replay,
SubjectPattern::new("$SYS.FABRIC.REPLAY.>"),
ControlBudget::default(),
)
.unwrap();
assert_eq!(advisory.family, SystemSubjectFamily::Replay);
let convergence = bridge
.reconcile_gateway_convergence(Duration::from_secs(6))
.unwrap();
assert!(convergence.timed_out);
assert_eq!(bridge.state, FederationBridgeState::Degraded);
match bridge.runtime() {
FederationBridgeRuntime::Gateway(runtime) => {
assert!(
runtime
.propagated_interests
.contains(&GatewayInterestRecord {
family: SystemSubjectFamily::Route,
pattern: SubjectPattern::new("tenant.route.>"),
})
);
assert_eq!(runtime.forwarded_advisories.len(), 1);
}
other => panic!("expected gateway runtime, got {other:?}"),
}
}
#[test]
fn gateway_runtime_distinguishes_interest_family_from_pattern() {
let mut bridge = FederationBridge::new(
FederationRole::GatewayFabric(GatewayConfig::default()),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.unwrap();
bridge.activate().unwrap();
bridge
.plan_gateway_interest(
SystemSubjectFamily::Route,
SubjectPattern::new("tenant.shared.>"),
1,
ControlBudget::default(),
)
.unwrap();
bridge
.plan_gateway_interest(
SystemSubjectFamily::Replay,
SubjectPattern::new("tenant.shared.>"),
1,
ControlBudget::default(),
)
.unwrap();
match bridge.runtime() {
FederationBridgeRuntime::Gateway(runtime) => {
assert_eq!(runtime.propagated_interests.len(), 2);
}
other => panic!("expected gateway runtime, got {other:?}"),
}
}
#[test]
fn replication_bridge_exports_and_applies_region_snapshots() {
let mut federation = FederationBridge::new(
FederationRole::ReplicationLink(ReplicationConfig::default()),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.unwrap();
let region = region_id(10);
let mut source = RegionBridge::new_local(region, None, Budget::new());
source.add_task(task_id(11)).unwrap();
source.add_child(region_id(12)).unwrap();
let transfer = federation
.export_replication_transfer(&mut source, Time::from_secs(1))
.unwrap();
assert_eq!(transfer.sequence, 1);
let mut target = RegionBridge::new_local(region, None, Budget::new());
let applied = federation
.apply_replication_transfer(&mut target, &transfer)
.unwrap();
assert_eq!(applied.sequence, 1);
assert_eq!(target.local().task_ids(), vec![task_id(11)]);
assert_eq!(target.local().child_ids(), vec![region_id(12)]);
match federation.runtime() {
FederationBridgeRuntime::Replication(runtime) => {
assert_eq!(runtime.last_exported_sequence, Some(1));
assert_eq!(runtime.last_applied_sequence, Some(1));
}
other => panic!("expected replication runtime, got {other:?}"),
}
}
#[test]
fn replication_bridge_catch_up_plan_respects_policy() {
let mut log_only = FederationBridge::new(
FederationRole::ReplicationLink(ReplicationConfig {
catch_up_policy: CatchUpPolicy::LogOnly,
..ReplicationConfig::default()
}),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.unwrap();
let mut snapshot_required = FederationBridge::new(
FederationRole::ReplicationLink(ReplicationConfig {
catch_up_policy: CatchUpPolicy::SnapshotRequired,
..ReplicationConfig::default()
}),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.unwrap();
let log_plan = log_only.plan_replication_catch_up(10, 4).unwrap();
let snapshot_plan = snapshot_required.plan_replication_catch_up(10, 4).unwrap();
assert_eq!(log_plan.action, ReplicationCatchUpAction::DeltaOnly);
assert_eq!(snapshot_plan.action, ReplicationCatchUpAction::Snapshot);
}
#[test]
fn replication_bridge_rejects_remote_ahead_catch_up_plan() {
let mut federation = FederationBridge::new(
FederationRole::ReplicationLink(ReplicationConfig::default()),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.unwrap();
let err = federation.plan_replication_catch_up(4, 7).unwrap_err();
assert_eq!(
err,
FederationError::ReplicationCatchUpRemoteAhead {
local_sequence: 4,
remote_sequence: 7,
}
);
match federation.runtime() {
FederationBridgeRuntime::Replication(runtime) => {
assert_eq!(runtime.last_catch_up, None);
}
other => panic!("expected replication runtime, got {other:?}"),
}
}
#[test]
fn replication_bridge_rejects_mismatched_transfer_metadata() {
let mut federation = FederationBridge::new(
FederationRole::ReplicationLink(ReplicationConfig::default()),
vec![derived_view_morphism()],
Vec::new(),
[FabricCapability::RewriteNamespace],
)
.unwrap();
let region = region_id(20);
let mut source = RegionBridge::new_local(region, None, Budget::new());
source.add_task(task_id(21)).unwrap();
let mut transfer = federation
.export_replication_transfer(&mut source, Time::from_secs(2))
.unwrap();
transfer.sequence += 1;
let mut target = RegionBridge::new_local(region, None, Budget::new());
let err = federation
.apply_replication_transfer(&mut target, &transfer)
.unwrap_err();
assert_eq!(
err,
FederationError::ReplicationTransferSequenceMismatch {
expected: 2,
actual: 1,
}
);
}
#[test]
fn edge_replay_bridge_retains_latest_artifacts_and_ships_on_reconnect() {
let mut bridge = FederationBridge::new(
FederationRole::EdgeReplayLink(EdgeReplayConfig {
trace_retention: TraceRetention::LatestArtifacts { max_artifacts: 2 },
evidence_shipping_policy: EvidenceShippingPolicy::OnReconnect,
reconnection_replay_depth: 2,
}),
vec![derived_view_morphism()],
Vec::new(),
[
FabricCapability::RewriteNamespace,
FabricCapability::ObserveEvidence,
],
)
.unwrap();
bridge
.retain_replay_artifact(replay_artifact(
"artifact-a",
SystemSubjectFamily::Replay,
1,
1,
))
.unwrap();
bridge
.retain_replay_artifact(replay_artifact(
"artifact-b",
SystemSubjectFamily::Replay,
2,
2,
))
.unwrap();
bridge
.retain_replay_artifact(replay_artifact(
"artifact-c",
SystemSubjectFamily::Replay,
3,
3,
))
.unwrap();
let before_reconnect = bridge.plan_replay_shipping().unwrap();
assert!(before_reconnect.artifacts.is_empty());
bridge.activate().unwrap();
let shipping = bridge.plan_replay_shipping().unwrap();
let shipped_ids: Vec<&str> = shipping
.artifacts
.iter()
.map(|artifact| artifact.artifact_id.as_str())
.collect();
assert_eq!(shipped_ids, vec!["artifact-b", "artifact-c"]);
match bridge.runtime() {
FederationBridgeRuntime::EdgeReplay(runtime) => {
assert_eq!(runtime.retained_artifacts.len(), 2);
assert_eq!(runtime.shipped_batches, 1);
}
other => panic!("expected edge replay runtime, got {other:?}"),
}
}
#[test]
fn distributed_supervision_rejects_duplicate_node_ids() {
let alpha_orders = distributed_node("alpha", "orders", "zone-a");
let alpha_payments = distributed_node("alpha", "payments", "zone-b");
let err = DistributedSupervisionCompiler::compile(&[alpha_orders, alpha_payments])
.expect_err("duplicate node ids must be rejected");
assert_eq!(
err,
FederationError::DuplicateDistributedSupervisionNode {
node_id: "alpha".to_owned(),
}
);
}
#[test]
fn distributed_supervision_rejects_unknown_monitor_target() {
let alpha =
distributed_node("alpha", "orders", "zone-a").with_monitor_targets(["missing-node"]);
let err = DistributedSupervisionCompiler::compile(&[alpha])
.expect_err("unknown monitor targets must be rejected");
assert_eq!(
err,
FederationError::UnknownDistributedSupervisionTarget {
node_id: "alpha".to_owned(),
target: "missing-node".to_owned(),
relation: "monitor",
}
);
}
#[test]
fn distributed_supervision_rejects_same_domain_failover() {
let alpha = distributed_node("alpha", "orders", "zone-a").with_failover_targets(["beta"]);
let beta = distributed_node("beta", "billing", "zone-a");
let err = DistributedSupervisionCompiler::compile(&[alpha, beta])
.expect_err("failover must cross failure domains");
assert_eq!(
err,
FederationError::FailoverTargetSameFailureDomain {
node_id: "alpha".to_owned(),
target: "beta".to_owned(),
failure_domain: "zone-a".to_owned(),
}
);
}
#[test]
fn distributed_supervision_compiles_mailbox_monitor_link_lease_and_evidence_plans() {
let alpha = distributed_node("alpha", "orders", "zone-a")
.with_export_morphisms(vec![rename_mailbox_prefix("orders", "fabric-egress")])
.with_monitor_targets(["beta"])
.with_link_targets(["beta"])
.with_failover_targets(["beta"]);
let beta = distributed_node("beta", "billing", "zone-b")
.with_link_targets(["alpha"])
.with_import_morphisms(vec![rename_mailbox_prefix("billing", "fabric-ingress")]);
let plan = DistributedSupervisionCompiler::compile(&[alpha, beta])
.expect("distributed supervision plan should compile");
assert_eq!(plan.nodes.len(), 2);
assert_eq!(plan.mailbox_routes.len(), 2);
assert_eq!(plan.monitor_plans.len(), 1);
assert_eq!(plan.link_plans.len(), 1);
assert_eq!(plan.registry_leases.len(), 2);
assert_eq!(plan.failover_handoffs.len(), 1);
assert_eq!(plan.evidence_hooks.len(), 2);
let alpha_node = plan
.nodes
.iter()
.find(|node| node.node_id.as_str() == "alpha")
.expect("alpha node should exist");
assert_eq!(
alpha_node.mailbox_subject.as_str(),
"tenant.acme.service.orders.mailbox.alpha"
);
assert_eq!(
alpha_node.exported_mailbox_subject.as_str(),
"tenant.acme.service.orders.fabric-egress.alpha"
);
assert!(matches!(
&alpha_node.supervision_strategy,
SupervisionStrategy::Restart(config)
if config.max_restarts == 3 && config.window == Duration::from_secs(45)
));
let beta_route = plan
.mailbox_routes
.iter()
.find(|route| route.node_id.as_str() == "beta")
.expect("beta route should exist");
assert_eq!(
beta_route.imported_mailbox_subject.as_str(),
"tenant.acme.service.billing.fabric-ingress.beta"
);
let monitor = &plan.monitor_plans[0];
assert_eq!(monitor.watcher.as_str(), "alpha");
assert_eq!(monitor.monitored.as_str(), "beta");
assert_eq!(
monitor.notification_subject.as_str(),
"$SYS.FABRIC.DRAIN.monitor.alpha.beta"
);
assert_eq!(
monitor.monitored_mailbox_subject.as_str(),
"tenant.acme.service.billing.mailbox.beta"
);
let link = &plan.link_plans[0];
assert_eq!(link.left_node.as_str(), "alpha");
assert_eq!(link.right_node.as_str(), "beta");
assert_eq!(
link.control_subject.as_str(),
"$SYS.FABRIC.DRAIN.link.alpha.beta"
);
assert_eq!(link.family, SystemSubjectFamily::Drain);
let alpha_lease = plan
.registry_leases
.iter()
.find(|lease| lease.node_id.as_str() == "alpha")
.expect("alpha lease should exist");
assert_eq!(
alpha_lease.registry_subject.as_str(),
"tenant.acme.service.orders.discover"
);
assert_eq!(
alpha_lease.lease_subject.as_str(),
"$SYS.FABRIC.ROUTE.registry-lease.alpha"
);
assert_eq!(alpha_lease.lease_ttl, Duration::from_secs(45));
let failover = &plan.failover_handoffs[0];
assert_eq!(failover.source_node.as_str(), "alpha");
assert_eq!(failover.target_node.as_str(), "beta");
assert_eq!(failover.source_failure_domain, "zone-a");
assert_eq!(failover.target_failure_domain, "zone-b");
assert_eq!(
failover.handoff_subject.as_str(),
"$SYS.FABRIC.DRAIN.handoff.alpha.beta"
);
assert_eq!(
failover.drain_subject.as_str(),
"$SYS.FABRIC.DRAIN.failover.alpha.beta"
);
assert_eq!(
failover.registry_lease_subject.as_str(),
"$SYS.FABRIC.ROUTE.registry-lease.beta"
);
assert_eq!(
failover.evidence_subject.as_str(),
"$SYS.FABRIC.REPLAY.failover.alpha.beta"
);
assert!(matches!(
&failover.target_strategy,
SupervisionStrategy::Restart(_)
));
let alpha_hook = plan
.evidence_hooks
.iter()
.find(|hook| hook.node_id.as_str() == "alpha")
.expect("alpha evidence hook should exist");
assert_eq!(
alpha_hook.observability_subject.as_str(),
"tenant.acme.service.orders.telemetry.supervision-alpha"
);
assert_eq!(
alpha_hook.replay_subject.as_str(),
"$SYS.FABRIC.REPLAY.supervision.alpha"
);
assert_eq!(alpha_hook.family, SystemSubjectFamily::Replay);
}
#[test]
fn distributed_supervision_deduplicates_bidirectional_links() {
let alpha = distributed_node("alpha", "orders", "zone-a").with_link_targets(["beta"]);
let beta = distributed_node("beta", "billing", "zone-b").with_link_targets(["alpha"]);
let plan =
DistributedSupervisionCompiler::compile(&[alpha, beta]).expect("links should compile");
assert_eq!(plan.link_plans.len(), 1);
assert_eq!(
plan.link_plans[0].control_subject.as_str(),
"$SYS.FABRIC.DRAIN.link.alpha.beta"
);
}
#[test]
fn default_enum_values_are_expected() {
assert_eq!(
InterestPropagationPolicy::default(),
InterestPropagationPolicy::DemandDriven
);
assert_eq!(OrderingGuarantee::default(), OrderingGuarantee::PerSubject);
assert_eq!(CatchUpPolicy::default(), CatchUpPolicy::SnapshotThenDelta);
assert_eq!(
EvidenceShippingPolicy::default(),
EvidenceShippingPolicy::OnReconnect
);
assert_eq!(
FederationBridgeState::default(),
FederationBridgeState::Provisioning
);
}
}