use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, SystemTime};
use futures::Stream;
use tokio::time::{interval, Interval};
use super::meshos::{
ice_proposal_signing_payload, simulate_ice_proposal, AdminEvent, BlastRadius, ChainId,
IceActionProposal, MeshOsEvent, MeshOsHandle, MeshOsHandleError, MeshOsRuntime, MeshOsSnapshot,
MeshOsSnapshotReader, NodeId,
};
use crate::adapter::net::behavior::aggregator::{AggregatorDaemon, SummaryAnnouncement};
use crate::adapter::net::identity::EntityKeypair;
use crate::adapter::net::subnet::SubnetId;
use crate::adapter::net::MeshNode;
use crate::adapter::net::{ChannelHash, Visibility};
#[derive(Clone, Debug)]
pub struct OperatorIdentity {
keypair: Arc<EntityKeypair>,
operator_id: u64,
}
impl OperatorIdentity {
pub fn from_keypair(keypair: EntityKeypair) -> Self {
let operator_id = keypair.origin_hash();
Self {
keypair: Arc::new(keypair),
operator_id,
}
}
pub fn generate() -> Self {
Self::from_keypair(EntityKeypair::generate())
}
pub fn operator_id(&self) -> u64 {
self.operator_id
}
pub fn keypair(&self) -> &EntityKeypair {
&self.keypair
}
}
#[derive(Clone, Debug, thiserror::Error)]
#[error("<<deck-sdk-kind:{kind}>>{message}")]
pub struct DeckError {
pub kind: &'static str,
pub message: String,
}
impl DeckError {
fn new(kind: &'static str, message: impl Into<String>) -> Self {
Self {
kind,
message: message.into(),
}
}
}
impl From<MeshOsHandleError> for DeckError {
fn from(err: MeshOsHandleError) -> Self {
match err {
MeshOsHandleError::LoopClosed => Self::new("loop_closed", "MeshOS loop has exited"),
MeshOsHandleError::QueueFull => Self::new(
"queue_full",
"MeshOS source channel at capacity — back off + retry",
),
}
}
}
pub type AdminError = DeckError;
pub type IceError = DeckError;
#[derive(Clone, Debug)]
pub struct ChainCommit {
commit_id: u64,
operator_id: u64,
event_kind: &'static str,
committed_at: SystemTime,
}
impl ChainCommit {
pub fn commit_id(&self) -> u64 {
self.commit_id
}
pub fn operator_id(&self) -> u64 {
self.operator_id
}
pub fn event_kind(&self) -> &'static str {
self.event_kind
}
pub fn committed_at(&self) -> SystemTime {
self.committed_at
}
}
#[derive(Clone, Debug)]
pub struct DeckClientConfig {
pub snapshot_poll_interval: Duration,
pub ice_signature_threshold: usize,
}
impl Default for DeckClientConfig {
fn default() -> Self {
Self {
snapshot_poll_interval: Duration::from_millis(100),
ice_signature_threshold: 1,
}
}
}
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct StatusSummary {
pub peers: PeerCounts,
pub daemons: DaemonCounts,
pub replica_chains: usize,
pub avoid_list_entries: usize,
pub recently_emitted_count: usize,
pub recent_failure_count: usize,
pub admin_audit_ring_depth: usize,
pub freeze_remaining_ms: Option<u64>,
pub local_maintenance_active: bool,
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct PeerCounts {
pub healthy: usize,
pub degraded: usize,
pub unreachable: usize,
pub unknown: usize,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct GatewayStats {
pub local_subnet: SubnetId,
pub forwarded: u64,
pub dropped: u64,
pub peer_subnets: Vec<SubnetId>,
pub export_rules: u64,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct SubnetRollup {
pub subnet: SubnetId,
pub members: Vec<u64>,
pub is_local: bool,
}
#[derive(Clone, Debug)]
pub struct AggregatorSnapshot {
pub source_subnet: SubnetId,
pub fold_kinds: Vec<u16>,
pub generation: u64,
pub summary_interval: std::time::Duration,
pub summaries: Arc<Vec<SummaryAnnouncement>>,
}
#[derive(Clone, Debug)]
pub struct AggregatorReplicaRow {
pub generation: u64,
pub healthy: bool,
pub diagnostic: Option<String>,
pub placement_node_id: Option<u64>,
}
#[derive(Clone, Debug)]
pub struct AggregatorRegistryGroupSnapshot {
pub name: String,
pub group_seed: [u8; 32],
pub replicas: Vec<AggregatorReplicaRow>,
}
#[derive(Clone, Debug, Default)]
pub struct AggregatorRegistrySnapshot {
pub groups: Vec<AggregatorRegistryGroupSnapshot>,
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct DaemonCounts {
pub running: usize,
pub starting: usize,
pub stopping: usize,
pub stopped: usize,
pub backing_off: usize,
pub crash_looping: usize,
}
fn build_status_summary(snap: &MeshOsSnapshot) -> StatusSummary {
let mut peers = PeerCounts::default();
for (_, peer) in snap.peers.iter() {
match peer.health {
Some(super::meshos::PeerHealthSnapshot::Healthy) => peers.healthy += 1,
Some(super::meshos::PeerHealthSnapshot::Degraded) => peers.degraded += 1,
Some(super::meshos::PeerHealthSnapshot::Unreachable) => peers.unreachable += 1,
None => peers.unknown += 1,
}
}
let mut daemons = DaemonCounts::default();
for (_, d) in snap.daemons.iter() {
match d.lifecycle {
super::meshos::DaemonLifecycleSnapshot::Running => daemons.running += 1,
super::meshos::DaemonLifecycleSnapshot::Starting => daemons.starting += 1,
super::meshos::DaemonLifecycleSnapshot::Stopping => daemons.stopping += 1,
super::meshos::DaemonLifecycleSnapshot::Stopped => daemons.stopped += 1,
}
match d.restart_state {
super::meshos::RestartStateSnapshot::Idle => {}
super::meshos::RestartStateSnapshot::BackingOff { .. } => daemons.backing_off += 1,
super::meshos::RestartStateSnapshot::CrashLooping { .. } => daemons.crash_looping += 1,
}
}
let maintenance_active = !matches!(
snap.local_maintenance,
super::meshos::MaintenanceStateSnapshot::Active
);
StatusSummary {
peers,
daemons,
replica_chains: snap.replicas.len(),
avoid_list_entries: snap.avoid_list.len(),
recently_emitted_count: snap.recently_emitted.len(),
recent_failure_count: snap.recent_failures.len(),
admin_audit_ring_depth: snap.admin_audit.len(),
freeze_remaining_ms: snap.freeze_remaining_ms,
local_maintenance_active: maintenance_active,
}
}
#[derive(Clone)]
pub struct DeckClient {
handle: MeshOsHandle,
snapshot_reader: MeshOsSnapshotReader,
identity: OperatorIdentity,
config: DeckClientConfig,
commit_seq: Arc<AtomicU64>,
operator_registry: Option<Arc<OperatorRegistry>>,
mesh: Option<Arc<MeshNode>>,
aggregator: Option<Arc<AggregatorDaemon>>,
}
impl DeckClient {
pub fn new(
handle: MeshOsHandle,
snapshot_reader: MeshOsSnapshotReader,
identity: OperatorIdentity,
config: DeckClientConfig,
) -> Self {
Self {
handle,
snapshot_reader,
identity,
config,
commit_seq: Arc::new(AtomicU64::new(0)),
operator_registry: None,
mesh: None,
aggregator: None,
}
}
pub fn with_mesh(mut self, mesh: Arc<MeshNode>) -> Self {
self.mesh = Some(mesh);
self
}
pub fn with_aggregator(mut self, aggregator: Arc<AggregatorDaemon>) -> Self {
self.aggregator = Some(aggregator);
self
}
pub fn from_runtime(runtime: &MeshOsRuntime, identity: OperatorIdentity) -> Self {
Self::new(
runtime.handle_clone(),
runtime.snapshot_reader().clone(),
identity,
DeckClientConfig::default(),
)
}
pub fn with_config(mut self, config: DeckClientConfig) -> Self {
self.config = config;
self
}
pub fn with_operator_registry(mut self, registry: OperatorRegistry) -> Self {
self.operator_registry = Some(Arc::new(registry));
self
}
pub fn operator_registry(&self) -> Option<&OperatorRegistry> {
self.operator_registry.as_deref()
}
pub fn identity(&self) -> &OperatorIdentity {
&self.identity
}
pub fn admin(&self) -> AdminCommands<'_> {
AdminCommands { client: self }
}
pub fn ice(&self) -> IceCommands<'_> {
IceCommands { client: self }
}
pub fn audit(&self) -> AuditQuery<'_> {
AuditQuery::new(self)
}
pub fn subscribe_failures(&self, since_seq: u64) -> FailureStream {
FailureStream::new(
self.snapshot_reader.clone(),
self.config.snapshot_poll_interval,
since_seq,
)
}
pub fn subscribe_logs(&self, filter: LogFilter) -> LogStream {
LogStream::new(
self.snapshot_reader.clone(),
self.config.snapshot_poll_interval,
filter,
)
}
pub fn snapshots(&self) -> SnapshotStream {
SnapshotStream::new(
self.snapshot_reader.clone(),
self.config.snapshot_poll_interval,
)
}
pub fn status(&self) -> MeshOsSnapshot {
self.snapshot_reader.read()
}
pub fn status_summary_stream(&self) -> StatusSummaryStream {
StatusSummaryStream::new(
self.snapshot_reader.clone(),
self.config.snapshot_poll_interval,
)
}
pub fn status_summary(&self) -> StatusSummary {
build_status_summary(&self.snapshot_reader.load())
}
pub fn peers(&self) -> std::collections::BTreeMap<NodeId, super::meshos::PeerSnapshot> {
self.snapshot_reader.load().peers.clone()
}
pub fn local_subnet(&self) -> Option<SubnetId> {
self.mesh.as_ref().map(|m| m.local_subnet())
}
pub fn known_subnets(&self) -> Vec<(u64, SubnetId)> {
self.mesh
.as_ref()
.map(|m| m.known_subnets())
.unwrap_or_default()
}
pub fn subnets_with_members(&self, local_node_id: Option<u64>) -> Vec<SubnetRollup> {
let local = self.local_subnet();
let mut buckets: std::collections::BTreeMap<u32, std::collections::BTreeSet<u64>> =
std::collections::BTreeMap::new();
for (node_id, subnet) in self.known_subnets() {
buckets.entry(subnet.raw()).or_default().insert(node_id);
}
if let Some(local_subnet) = local {
let entry = buckets.entry(local_subnet.raw()).or_default();
if let Some(id) = local_node_id {
entry.insert(id);
}
}
buckets
.into_iter()
.map(|(raw, members)| {
let subnet = SubnetId::from_raw(raw);
SubnetRollup {
subnet,
members: members.into_iter().collect(),
is_local: local == Some(subnet),
}
})
.collect()
}
pub fn gateway_stats(&self) -> Option<GatewayStats> {
let gw = self.mesh.as_ref().and_then(|m| m.gateway())?;
Some(GatewayStats {
local_subnet: gw.local_subnet(),
forwarded: gw.forwarded_count(),
dropped: gw.dropped_count(),
peer_subnets: gw.peer_subnets(),
export_rules: gw.exports().len() as u64,
})
}
pub fn gateway_exports(&self) -> Vec<(u16, Vec<SubnetId>)> {
self.mesh
.as_ref()
.and_then(|m| m.gateway())
.map(|gw| gw.exports())
.unwrap_or_default()
}
pub fn channel_visibility(&self, channel_name: &str) -> Option<Visibility> {
let mesh = self.mesh.as_ref()?;
let registry = mesh.channel_configs()?;
let cfg = registry.get_by_name(channel_name)?;
Some(cfg.visibility)
}
pub fn channels(&self) -> Vec<(String, Visibility)> {
let Some(mesh) = self.mesh.as_ref() else {
return Vec::new();
};
let Some(registry) = mesh.channel_configs() else {
return Vec::new();
};
registry
.snapshot()
.into_iter()
.map(|(name, cfg)| (name, cfg.visibility))
.collect()
}
pub fn channel_wire_hash(&self, channel_name: &str) -> Option<u16> {
let mesh = self.mesh.as_ref()?;
let registry = mesh.channel_configs()?;
let cfg = registry.get_by_name(channel_name)?;
Some(cfg.channel_id.wire_hash())
}
pub fn channel_canonical_hash(&self, channel_name: &str) -> Option<ChannelHash> {
let mesh = self.mesh.as_ref()?;
let registry = mesh.channel_configs()?;
let cfg = registry.get_by_name(channel_name)?;
Some(cfg.channel_id.hash())
}
pub fn aggregator_installed(&self) -> bool {
self.aggregator.is_some()
}
pub fn aggregator_summaries(&self) -> Vec<SummaryAnnouncement> {
self.aggregator
.as_ref()
.map(|a| a.latest_summaries())
.unwrap_or_default()
}
pub fn aggregator_summaries_arc(&self) -> Arc<Vec<SummaryAnnouncement>> {
self.aggregator
.as_ref()
.map(|a| a.latest_summaries_arc())
.unwrap_or_else(|| Arc::new(Vec::new()))
}
pub fn aggregator_snapshot(&self) -> Option<AggregatorSnapshot> {
let agg = self.aggregator.as_ref()?;
let config = agg.config();
Some(AggregatorSnapshot {
source_subnet: config.source_subnet,
fold_kinds: config.fold_kinds.clone(),
generation: agg.generation(),
summary_interval: config.summary_interval,
summaries: agg.latest_summaries_arc(),
})
}
pub async fn aggregator_registry_snapshot(&self) -> Option<AggregatorRegistrySnapshot> {
let mesh = self.mesh.as_ref()?;
let registry = mesh.aggregator_registry()?;
let entries = registry.entries();
let mut groups = Vec::with_capacity(entries.len());
for entry in entries {
let snap = entry.snapshot().await;
let rows = snap
.replicas
.iter()
.enumerate()
.map(|(idx, replica)| {
let health = snap.healths.get(idx).cloned().unwrap_or(
crate::adapter::net::behavior::lifecycle::ReplicaHealth {
healthy: true,
diagnostic: None,
},
);
let placement_node_id = snap.placements.get(idx).map(|p| p.node_id);
AggregatorReplicaRow {
generation: replica.generation(),
healthy: health.healthy,
diagnostic: health.diagnostic,
placement_node_id,
}
})
.collect();
groups.push(AggregatorRegistryGroupSnapshot {
name: entry.name.clone(),
group_seed: entry.group_seed,
replicas: rows,
});
}
Some(AggregatorRegistrySnapshot { groups })
}
pub fn aggregator_generation(&self) -> u64 {
self.aggregator
.as_ref()
.map(|a| a.generation())
.unwrap_or(0)
}
pub fn aggregator_source_subnet(&self) -> Option<SubnetId> {
self.aggregator.as_ref().map(|a| a.config().source_subnet)
}
pub fn aggregator_fold_kinds(&self) -> Vec<u16> {
self.aggregator
.as_ref()
.map(|a| a.config().fold_kinds.clone())
.unwrap_or_default()
}
pub fn aggregator_summary_interval(&self) -> std::time::Duration {
self.aggregator
.as_ref()
.map(|a| a.config().summary_interval)
.unwrap_or_default()
}
pub fn daemons(&self) -> std::collections::BTreeMap<u64, super::meshos::DaemonSnapshot> {
self.snapshot_reader.load().daemons.clone()
}
pub fn replicas(&self) -> std::collections::BTreeMap<ChainId, super::meshos::ReplicaSnapshot> {
self.snapshot_reader.load().replicas.clone()
}
pub fn local_maintenance(&self) -> super::meshos::MaintenanceStateSnapshot {
self.snapshot_reader.load().local_maintenance.clone()
}
pub fn freeze_remaining_ms(&self) -> Option<u64> {
self.snapshot_reader.load().freeze_remaining_ms
}
pub fn recent_failures(&self) -> Vec<super::meshos::FailureRecord> {
self.snapshot_reader
.load()
.recent_failures
.iter()
.cloned()
.collect()
}
pub fn runtime_epoch_id(&self) -> u64 {
self.snapshot_reader.load().runtime_epoch_id
}
pub fn audit_head_seq(&self) -> u64 {
self.snapshot_reader
.load()
.admin_audit
.last()
.map(|r| r.seq)
.unwrap_or(0)
}
pub fn log_head_seq(&self) -> u64 {
self.snapshot_reader
.load()
.log_ring
.last()
.map(|r| r.seq)
.unwrap_or(0)
}
pub fn failure_head_seq(&self) -> u64 {
self.snapshot_reader
.load()
.recent_failures
.iter()
.next_back()
.map(|r| r.seq)
.unwrap_or(0)
}
pub fn recent_failures_since(&self, since_ms: u64) -> Vec<super::meshos::FailureRecord> {
self.snapshot_reader
.load()
.recent_failures
.iter()
.filter(|r| r.recorded_at_ms > since_ms)
.cloned()
.collect()
}
pub async fn watch<F>(&self, mut predicate: F) -> MeshOsSnapshot
where
F: FnMut(&MeshOsSnapshot) -> bool,
{
let snap = self.snapshot_reader.read();
if predicate(&snap) {
return snap;
}
let interval = self
.config
.snapshot_poll_interval
.max(Duration::from_millis(1));
loop {
tokio::time::sleep(interval).await;
let snap = self.snapshot_reader.read();
if predicate(&snap) {
return snap;
}
}
}
pub async fn watch_timeout<F>(
&self,
predicate: F,
timeout: Duration,
) -> Result<MeshOsSnapshot, DeckError>
where
F: FnMut(&MeshOsSnapshot) -> bool,
{
tokio::time::timeout(timeout, self.watch(predicate))
.await
.map_err(|_| {
DeckError::new(
"watch_timeout",
format!(
"no snapshot matched the predicate within {} ms",
timeout.as_millis()
),
)
})
}
fn next_commit_id(&self) -> u64 {
self.commit_seq.fetch_add(1, Ordering::Relaxed) + 1
}
async fn publish_admin(
&self,
event: AdminEvent,
kind: &'static str,
) -> Result<ChainCommit, AdminError> {
let wire_event = if self.operator_registry.is_some() {
let issued_at_ms = super::meshos::now_ms_since_unix_epoch();
let signature = self.identity.sign_admin_event(&event, issued_at_ms);
MeshOsEvent::SignedAdminCommit {
event,
signature,
issued_at_ms,
}
} else {
MeshOsEvent::AdminEvent(event)
};
self.handle
.publish(wire_event)
.await
.map_err(AdminError::from)?;
Ok(ChainCommit {
commit_id: self.next_commit_id(),
operator_id: self.identity.operator_id,
event_kind: kind,
committed_at: SystemTime::now(),
})
}
async fn publish_signed_ice(
&self,
proposal: IceActionProposal,
signatures: Vec<OperatorSignature>,
issued_at_ms: u64,
blast_hash: super::meshos::BlastRadiusHash,
kind: &'static str,
) -> Result<ChainCommit, IceError> {
self.handle
.publish(MeshOsEvent::SignedIceCommit {
proposal,
signatures,
issued_at_ms,
blast_hash,
})
.await
.map_err(IceError::from)?;
Ok(ChainCommit {
commit_id: self.next_commit_id(),
operator_id: self.identity.operator_id,
event_kind: kind,
committed_at: SystemTime::now(),
})
}
}
pub struct AdminCommands<'a> {
client: &'a DeckClient,
}
impl AdminCommands<'_> {
pub async fn drain(
&self,
node: NodeId,
drain_for: Duration,
) -> Result<ChainCommit, AdminError> {
self.client
.publish_admin(AdminEvent::Drain { node, drain_for }, "drain")
.await
}
pub async fn enter_maintenance(
&self,
node: NodeId,
drain_for: Option<Duration>,
) -> Result<ChainCommit, AdminError> {
self.client
.publish_admin(
AdminEvent::EnterMaintenance { node, drain_for },
"enter_maintenance",
)
.await
}
pub async fn exit_maintenance(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
self.client
.publish_admin(AdminEvent::ExitMaintenance { node }, "exit_maintenance")
.await
}
pub async fn cordon(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
self.client
.publish_admin(AdminEvent::Cordon { node }, "cordon")
.await
}
pub async fn uncordon(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
self.client
.publish_admin(AdminEvent::Uncordon { node }, "uncordon")
.await
}
pub async fn drop_replicas(
&self,
node: NodeId,
chains: Vec<ChainId>,
) -> Result<ChainCommit, AdminError> {
self.client
.publish_admin(AdminEvent::DropReplicas { node, chains }, "drop_replicas")
.await
}
pub async fn invalidate_placement(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
self.client
.publish_admin(
AdminEvent::InvalidatePlacement { node },
"invalidate_placement",
)
.await
}
pub async fn restart_all_daemons(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
self.client
.publish_admin(
AdminEvent::RestartAllDaemons { node },
"restart_all_daemons",
)
.await
}
pub async fn clear_avoid_list(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
self.client
.publish_admin(AdminEvent::ClearAvoidList { node }, "clear_avoid_list")
.await
}
}
pub use super::meshos::{OperatorRegistry, OperatorSignature, VerifyError};
impl OperatorIdentity {
pub fn sign_proposal(
&self,
proposal: &IceActionProposal,
issued_at_ms: u64,
blast_hash: &super::meshos::BlastRadiusHash,
) -> OperatorSignature {
OperatorSignature::sign(self.keypair(), proposal, issued_at_ms, blast_hash)
}
pub fn sign_admin_event(&self, event: &AdminEvent, issued_at_ms: u64) -> OperatorSignature {
OperatorSignature::sign_admin(self.keypair(), event, issued_at_ms)
}
}
fn verify_error_to_ice(err: VerifyError) -> IceError {
let kind = err.kind();
IceError::new(kind, err.to_string())
}
pub struct IceCommands<'a> {
client: &'a DeckClient,
}
impl<'a> IceCommands<'a> {
pub fn freeze_cluster(&self, ttl: Duration) -> IceProposal<'a> {
IceProposal::new(self.client, IceActionProposal::FreezeCluster { ttl })
}
pub fn flush_avoid_lists(&self, scope: super::meshos::AvoidScope) -> IceProposal<'a> {
IceProposal::new(self.client, IceActionProposal::FlushAvoidLists { scope })
}
pub fn force_evict_replica(&self, chain: ChainId, victim: NodeId) -> IceProposal<'a> {
IceProposal::new(
self.client,
IceActionProposal::ForceEvictReplica { chain, victim },
)
}
pub fn force_restart_daemon(&self, daemon: super::meshos::DaemonRef) -> IceProposal<'a> {
IceProposal::new(
self.client,
IceActionProposal::ForceRestartDaemon { daemon },
)
}
pub fn force_cutover(&self, chain: ChainId, target: NodeId) -> IceProposal<'a> {
IceProposal::new(
self.client,
IceActionProposal::ForceCutover { chain, target },
)
}
pub fn kill_migration(&self, migration: super::meshos::MigrationId) -> IceProposal<'a> {
IceProposal::new(self.client, IceActionProposal::KillMigration { migration })
}
pub fn thaw_cluster(&self) -> IceProposal<'a> {
IceProposal::new(self.client, IceActionProposal::ThawCluster)
}
}
pub struct IceProposal<'a> {
client: &'a DeckClient,
action: IceActionProposal,
issued_at_ms: u64,
}
impl<'a> IceProposal<'a> {
fn new(client: &'a DeckClient, action: IceActionProposal) -> Self {
Self {
client,
action,
issued_at_ms: super::meshos::now_ms_since_unix_epoch(),
}
}
pub fn action(&self) -> &IceActionProposal {
&self.action
}
pub fn issued_at_ms(&self) -> u64 {
self.issued_at_ms
}
pub async fn simulate(self) -> Result<SimulatedIceProposal<'a>, IceError> {
let snap = self.client.snapshot_reader.read();
let blast = simulate_ice_proposal(&snap, &self.action);
Ok(SimulatedIceProposal {
client: self.client,
action: self.action,
issued_at_ms: self.issued_at_ms,
blast,
})
}
}
pub struct SimulatedIceProposal<'a> {
client: &'a DeckClient,
action: IceActionProposal,
issued_at_ms: u64,
blast: BlastRadius,
}
impl<'a> SimulatedIceProposal<'a> {
pub fn blast_radius(&self) -> &BlastRadius {
&self.blast
}
pub fn action(&self) -> &IceActionProposal {
&self.action
}
pub fn issued_at_ms(&self) -> u64 {
self.issued_at_ms
}
pub fn blast_hash(&self) -> super::meshos::BlastRadiusHash {
super::meshos::blast_radius_hash(&self.blast)
}
pub async fn commit(self, signatures: &[OperatorSignature]) -> Result<ChainCommit, IceError> {
let blast_hash = self.blast_hash();
let threshold = self.client.config.ice_signature_threshold;
if signatures.len() < threshold {
return Err(IceError::new(
"insufficient_signatures",
format!(
"ICE commit requires {} operator signatures; got {}",
threshold,
signatures.len()
),
));
}
if let Some(registry) = self.client.operator_registry.as_ref() {
let payload =
ice_proposal_signing_payload(&self.action, self.issued_at_ms, &blast_hash);
let mut unique_operators: std::collections::BTreeSet<u64> =
std::collections::BTreeSet::new();
for sig in signatures {
registry
.verify(sig, &payload)
.map_err(verify_error_to_ice)?;
unique_operators.insert(sig.operator_id);
}
if unique_operators.len() < threshold {
return Err(IceError::new(
"insufficient_signatures",
format!(
"ICE commit requires {} distinct operator signatures; got {} distinct",
threshold,
unique_operators.len()
),
));
}
let kind = self.action.kind();
self.client
.publish_signed_ice(
self.action,
signatures.to_vec(),
self.issued_at_ms,
blast_hash,
kind,
)
.await
} else {
let kind = self.action.kind();
let event = self.action.to_admin_event();
self.client.publish_admin(event, kind).await
}
}
}
pub struct AuditQuery<'a> {
client: &'a DeckClient,
limit: Option<usize>,
operator_filter: Option<u64>,
time_range: Option<(u64, u64)>,
force_only: bool,
since_seq: Option<u64>,
}
impl<'a> AuditQuery<'a> {
fn new(client: &'a DeckClient) -> Self {
Self {
client,
limit: None,
operator_filter: None,
time_range: None,
force_only: false,
since_seq: None,
}
}
pub fn recent(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}
pub fn by_operator(mut self, op_id: u64) -> Self {
self.operator_filter = Some(op_id);
self
}
pub fn between(mut self, start_ms: u64, end_ms: u64) -> Self {
self.time_range = Some((start_ms, end_ms));
self
}
pub fn force_only(mut self) -> Self {
self.force_only = true;
self
}
pub fn since(mut self, since_seq: u64) -> Self {
self.since_seq = Some(since_seq);
self
}
pub fn collect(self) -> Vec<super::meshos::AdminAuditRecord> {
let snap = self.client.snapshot_reader.read();
let mut matched: Vec<super::meshos::AdminAuditRecord> = snap
.admin_audit
.iter()
.filter(|r| {
if let Some(since) = self.since_seq {
if r.seq <= since {
return false;
}
}
if let Some(op_id) = self.operator_filter {
if !r.operator_ids.contains(&op_id) {
return false;
}
}
if let Some((start, end)) = self.time_range {
if r.committed_at_ms < start || r.committed_at_ms > end {
return false;
}
}
if self.force_only && !r.event.is_ice() {
return false;
}
true
})
.cloned()
.collect();
matched.reverse();
if let Some(limit) = self.limit {
matched.truncate(limit);
}
matched
}
pub fn stream(self) -> AuditStream {
AuditStream::new(
self.client.snapshot_reader.clone(),
self.client.config.snapshot_poll_interval,
AuditFilter {
operator: self.operator_filter,
time_range: self.time_range,
force_only: self.force_only,
},
self.since_seq.unwrap_or(0),
)
}
}
#[derive(Clone, Debug)]
struct AuditFilter {
operator: Option<u64>,
time_range: Option<(u64, u64)>,
force_only: bool,
}
impl AuditFilter {
fn matches(&self, record: &super::meshos::AdminAuditRecord) -> bool {
if let Some(op_id) = self.operator {
if !record.operator_ids.contains(&op_id) {
return false;
}
}
if let Some((start, end)) = self.time_range {
if record.committed_at_ms < start || record.committed_at_ms > end {
return false;
}
}
if self.force_only && !record.event.is_ice() {
return false;
}
true
}
}
pub struct AuditStream {
reader: super::meshos::MeshOsSnapshotReader,
interval: Interval,
filter: AuditFilter,
last_seq: u64,
queued: std::collections::VecDeque<super::meshos::AdminAuditRecord>,
}
impl AuditStream {
fn new(
reader: super::meshos::MeshOsSnapshotReader,
poll_interval: Duration,
filter: AuditFilter,
initial_seq_watermark: u64,
) -> Self {
let poll_interval = poll_interval.max(Duration::from_millis(1));
Self {
reader,
interval: interval(poll_interval),
filter,
last_seq: initial_seq_watermark,
queued: std::collections::VecDeque::new(),
}
}
}
impl Stream for AuditStream {
type Item = Result<super::meshos::AdminAuditRecord, DeckError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(record) = self.queued.pop_front() {
return Poll::Ready(Some(Ok(record)));
}
match self.interval.poll_tick(cx) {
Poll::Ready(_) => {
let snap = self.reader.read();
let last_seq = self.last_seq;
let mut max_seq = last_seq;
for record in snap.admin_audit.iter().cloned() {
if record.seq <= last_seq {
continue;
}
if record.seq > max_seq {
max_seq = record.seq;
}
if self.filter.matches(&record) {
self.queued.push_back(record);
}
}
self.last_seq = max_seq;
if let Some(record) = self.queued.pop_front() {
Poll::Ready(Some(Ok(record)))
} else {
Poll::Pending
}
}
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Clone, Debug, Default)]
pub struct LogFilter {
pub min_level: Option<super::meshos::LogLevel>,
pub daemon_id: Option<u64>,
pub node_id: Option<NodeId>,
pub since_seq: Option<u64>,
}
impl LogFilter {
pub fn new() -> Self {
Self::default()
}
pub fn min_level(mut self, level: super::meshos::LogLevel) -> Self {
self.min_level = Some(level);
self
}
pub fn with_daemon(mut self, daemon_id: u64) -> Self {
self.daemon_id = Some(daemon_id);
self
}
pub fn with_node(mut self, node_id: NodeId) -> Self {
self.node_id = Some(node_id);
self
}
pub fn since(mut self, since_seq: u64) -> Self {
self.since_seq = Some(since_seq);
self
}
fn matches(&self, record: &super::meshos::LogRecord) -> bool {
if let Some(min) = self.min_level {
if record.level < min {
return false;
}
}
if let Some(id) = self.daemon_id {
if record.daemon_id != Some(id) {
return false;
}
}
if let Some(node) = self.node_id {
if record.node_id != Some(node) {
return false;
}
}
true
}
}
pub struct LogStream {
reader: super::meshos::MeshOsSnapshotReader,
interval: Interval,
filter: LogFilter,
last_seq: u64,
queued: std::collections::VecDeque<super::meshos::LogRecord>,
}
impl LogStream {
fn new(
reader: super::meshos::MeshOsSnapshotReader,
poll_interval: Duration,
filter: LogFilter,
) -> Self {
let poll_interval = poll_interval.max(Duration::from_millis(1));
let last_seq = filter.since_seq.unwrap_or(0);
Self {
reader,
interval: interval(poll_interval),
filter,
last_seq,
queued: std::collections::VecDeque::new(),
}
}
}
impl Stream for LogStream {
type Item = Result<super::meshos::LogRecord, DeckError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(record) = self.queued.pop_front() {
return Poll::Ready(Some(Ok(record)));
}
match self.interval.poll_tick(cx) {
Poll::Ready(_) => {
let snap = self.reader.read();
let last_seq = self.last_seq;
let mut max_seq = last_seq;
for record in snap.log_ring.iter().cloned() {
if record.seq <= last_seq {
continue;
}
if record.seq > max_seq {
max_seq = record.seq;
}
if self.filter.matches(&record) {
self.queued.push_back(record);
}
}
self.last_seq = max_seq;
if let Some(record) = self.queued.pop_front() {
Poll::Ready(Some(Ok(record)))
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
Poll::Pending => Poll::Pending,
}
}
}
pub struct FailureStream {
reader: super::meshos::MeshOsSnapshotReader,
interval: Interval,
last_seq: u64,
queued: std::collections::VecDeque<super::meshos::FailureRecord>,
}
impl FailureStream {
fn new(
reader: super::meshos::MeshOsSnapshotReader,
poll_interval: Duration,
initial_seq_watermark: u64,
) -> Self {
let poll_interval = poll_interval.max(Duration::from_millis(1));
Self {
reader,
interval: interval(poll_interval),
last_seq: initial_seq_watermark,
queued: std::collections::VecDeque::new(),
}
}
}
impl Stream for FailureStream {
type Item = Result<super::meshos::FailureRecord, DeckError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(record) = self.queued.pop_front() {
return Poll::Ready(Some(Ok(record)));
}
match self.interval.poll_tick(cx) {
Poll::Ready(_) => {
let snap = self.reader.read();
let last_seq = self.last_seq;
let mut max_seq = last_seq;
for record in snap.recent_failures.iter().cloned() {
if record.seq <= last_seq {
continue;
}
if record.seq > max_seq {
max_seq = record.seq;
}
self.queued.push_back(record);
}
self.last_seq = max_seq;
if let Some(record) = self.queued.pop_front() {
Poll::Ready(Some(Ok(record)))
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
Poll::Pending => Poll::Pending,
}
}
}
pub struct SnapshotStream {
reader: MeshOsSnapshotReader,
interval: Interval,
}
impl SnapshotStream {
fn new(reader: MeshOsSnapshotReader, poll_interval: Duration) -> Self {
let poll_interval = poll_interval.max(Duration::from_millis(1));
Self {
reader,
interval: interval(poll_interval),
}
}
}
impl Stream for SnapshotStream {
type Item = Result<MeshOsSnapshot, DeckError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.interval.poll_tick(cx) {
Poll::Ready(_) => Poll::Ready(Some(Ok(self.reader.read()))),
Poll::Pending => Poll::Pending,
}
}
}
pub struct StatusSummaryStream {
reader: super::meshos::MeshOsSnapshotReader,
interval: Interval,
last_emitted: Option<StatusSummary>,
}
impl StatusSummaryStream {
fn new(reader: super::meshos::MeshOsSnapshotReader, poll_interval: Duration) -> Self {
let poll_interval = poll_interval.max(Duration::from_millis(1));
Self {
reader,
interval: interval(poll_interval),
last_emitted: None,
}
}
}
impl Stream for StatusSummaryStream {
type Item = Result<StatusSummary, DeckError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.interval.poll_tick(cx) {
Poll::Ready(_) => {
let snap = self.reader.read();
let summary = build_status_summary(&snap);
let should_emit = match &self.last_emitted {
None => true,
Some(prev) => prev != &summary,
};
if should_emit {
self.last_emitted = Some(summary.clone());
Poll::Ready(Some(Ok(summary)))
} else {
Poll::Pending
}
}
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::behavior::meshos::{
LoggingDispatcher, MaintenanceTransition, MeshOsAction, MeshOsConfig,
};
fn fast_config() -> MeshOsConfig {
MeshOsConfig::default()
.with_this_node(42)
.with_tick_interval(Duration::from_millis(10))
.with_event_queue_capacity(64)
.with_action_queue_capacity(64)
}
#[tokio::test]
async fn operator_identity_id_matches_keypair_origin_hash() {
let kp = EntityKeypair::generate();
let origin = kp.origin_hash();
let identity = OperatorIdentity::from_keypair(kp);
assert_eq!(identity.operator_id(), origin);
}
#[tokio::test]
async fn deck_subnet_and_gateway_accessors_default_to_empty_without_mesh() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
assert_eq!(deck.local_subnet(), None);
assert!(deck.known_subnets().is_empty());
assert!(deck.gateway_stats().is_none());
assert!(deck.gateway_exports().is_empty());
assert_eq!(deck.channel_visibility("any/name"), None);
assert!(deck.channels().is_empty());
assert_eq!(deck.channel_wire_hash("any/name"), None);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn deck_with_mesh_surfaces_local_subnet_and_gateway_stats() {
use crate::adapter::net::{
ChannelConfig, ChannelConfigRegistry, ChannelId, MeshNodeConfig, SubnetId, Visibility,
};
use std::net::SocketAddr;
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut mesh_cfg = MeshNodeConfig::new(addr, [0x17u8; 32]);
mesh_cfg = mesh_cfg.with_subnet(SubnetId::new(&[3, 7]));
let mut mesh = crate::adapter::net::MeshNode::new(EntityKeypair::generate(), mesh_cfg)
.await
.expect("MeshNode::new");
let registry = Arc::new(ChannelConfigRegistry::new());
let metrics_id = ChannelId::parse("internal/metrics").expect("channel id");
registry.insert(
ChannelConfig::new(metrics_id.clone()).with_visibility(Visibility::SubnetLocal),
);
mesh.set_channel_configs(registry);
let mesh = Arc::new(mesh);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate())
.with_mesh(mesh.clone());
assert_eq!(deck.local_subnet(), Some(SubnetId::new(&[3, 7])));
let stats = deck.gateway_stats().expect("gateway installed");
assert_eq!(stats.local_subnet, SubnetId::new(&[3, 7]));
assert_eq!(stats.forwarded, 0);
assert_eq!(stats.dropped, 0);
assert_eq!(stats.export_rules, 0);
assert!(stats.peer_subnets.is_empty());
assert_eq!(
deck.channel_visibility("internal/metrics"),
Some(Visibility::SubnetLocal),
);
let channels = deck.channels();
assert_eq!(channels.len(), 1);
assert_eq!(channels[0].0, "internal/metrics");
assert_eq!(channels[0].1, Visibility::SubnetLocal);
assert_eq!(
deck.channel_wire_hash("internal/metrics"),
Some(metrics_id.wire_hash()),
);
assert_eq!(
deck.channel_canonical_hash("internal/metrics"),
Some(metrics_id.hash()),
);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn deck_error_display_carries_kind_discriminator() {
let err = DeckError::new("unknown_node", "node 99 is not in the cluster");
let rendered = err.to_string();
assert!(
rendered.contains("<<deck-sdk-kind:unknown_node>>"),
"expected discriminator envelope, got {rendered:?}",
);
}
#[tokio::test]
async fn admin_enter_maintenance_publishes_admin_event_and_returns_commit() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let identity = OperatorIdentity::generate();
let deck = DeckClient::from_runtime(&runtime, identity.clone());
let commit = deck
.admin()
.enter_maintenance(42, None)
.await
.expect("commit");
assert_eq!(commit.operator_id(), identity.operator_id());
assert_eq!(commit.event_kind(), "enter_maintenance");
assert!(commit.commit_id() >= 1);
tokio::time::sleep(Duration::from_millis(80)).await;
let snap = runtime.snapshot();
assert!(
!matches!(
snap.local_maintenance,
crate::adapter::net::behavior::meshos::MaintenanceStateSnapshot::Active
),
"local maintenance should have transitioned out of Active, got {:?}",
snap.local_maintenance,
);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn admin_drop_replicas_publishes_with_supplied_chain_ids() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
let commit = deck
.admin()
.drop_replicas(42, vec![1, 2, 3])
.await
.expect("commit");
assert_eq!(commit.event_kind(), "drop_replicas");
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn commit_ids_increment_monotonically_per_client() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
let a = deck.admin().cordon(42).await.unwrap();
let b = deck.admin().uncordon(42).await.unwrap();
assert!(b.commit_id() > a.commit_id());
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn snapshot_stream_yields_a_snapshot_per_poll_interval() {
use futures::StreamExt;
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(20),
..DeckClientConfig::default()
},
);
let mut stream = deck.snapshots();
let first = stream.next().await.expect("first").expect("ok");
let second = stream.next().await.expect("second").expect("ok");
assert_eq!(first.local_maintenance, second.local_maintenance);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn snapshot_stream_observes_admin_command_aftermath() {
use futures::StreamExt;
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(15),
..DeckClientConfig::default()
},
);
let _ = deck.admin().enter_maintenance(42, None).await.unwrap();
let mut stream = deck.snapshots();
let mut saw_transition = false;
for _ in 0..20 {
let snap = stream.next().await.expect("next").expect("ok");
if !matches!(
snap.local_maintenance,
crate::adapter::net::behavior::meshos::MaintenanceStateSnapshot::Active
) {
saw_transition = true;
break;
}
}
assert!(
saw_transition,
"stream should have surfaced a non-Active local_maintenance after enter_maintenance",
);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn admin_commit_after_runtime_shutdown_returns_loop_closed_error() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
let _ = runtime.shutdown().await;
let err = deck
.admin()
.cordon(42)
.await
.expect_err("publish after shutdown should fail");
assert_eq!(err.kind, "loop_closed");
}
#[allow(dead_code)]
fn _ensure_action_types_are_in_scope() -> (MaintenanceTransition, MeshOsAction) {
(
MaintenanceTransition::EnteringMaintenance,
MeshOsAction::CommitMaintenanceTransition {
node: 0,
target: MaintenanceTransition::EnteringMaintenance,
},
)
}
#[tokio::test]
async fn ice_proposal_commit_with_insufficient_signatures_fails() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(100),
ice_signature_threshold: 2,
},
);
let proposal = deck.ice().freeze_cluster(Duration::from_secs(10));
let simulated = proposal.simulate().await.expect("simulate");
let sig = deck.identity().sign_proposal(
simulated.action(),
simulated.issued_at_ms(),
&simulated.blast_hash(),
);
let err = simulated
.commit(&[sig])
.await
.expect_err("under-threshold commit should fail");
assert_eq!(err.kind, "insufficient_signatures");
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn ice_freeze_proposal_simulate_then_commit_lands_freeze_on_loop() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
let proposal = deck.ice().freeze_cluster(Duration::from_secs(30));
let simulated = proposal.simulate().await.expect("simulate");
assert_eq!(
simulated.blast_radius().estimated_drain_delay,
Some(Duration::from_secs(30))
);
let sig = deck.identity().sign_proposal(
simulated.action(),
simulated.issued_at_ms(),
&simulated.blast_hash(),
);
let commit = simulated.commit(&[sig]).await.expect("commit");
assert_eq!(commit.event_kind(), "freeze_cluster");
tokio::time::sleep(Duration::from_millis(80)).await;
let snap = runtime.snapshot();
assert!(
snap.freeze_remaining_ms.is_some(),
"freeze_remaining_ms should be set after committed freeze",
);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn ice_thaw_proposal_simulate_warns_no_op_when_unfrozen() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
let proposal = deck.ice().thaw_cluster();
let simulated = proposal.simulate().await.expect("simulate");
assert!(simulated.blast_radius().warnings.iter().any(|w| matches!(
w,
crate::adapter::net::behavior::meshos::BlastWarning::ThawHasNoFreezeToCancel
)));
let _ = runtime.shutdown().await;
}
const TEST_BLAST_HASH: super::super::meshos::BlastRadiusHash =
[1u8; super::super::meshos::BLAST_RADIUS_HASH_LEN];
fn _assert_proposal_send_sync_static_check() {
fn _assert_send<T: Send>() {}
fn _assert_send_sync<T: Send + Sync>() {}
_assert_send_sync::<IceProposal<'static>>();
_assert_send_sync::<SimulatedIceProposal<'static>>();
_assert_send::<SnapshotStream>();
_assert_send::<StatusSummaryStream>();
_assert_send::<AuditStream>();
_assert_send::<LogStream>();
_assert_send::<FailureStream>();
}
#[tokio::test]
async fn operator_signature_carries_issuing_operator_id() {
let identity = OperatorIdentity::generate();
let proposal = IceActionProposal::FreezeCluster {
ttl: Duration::from_secs(60),
};
let sig = identity.sign_proposal(
&proposal,
super::super::meshos::now_ms_since_unix_epoch(),
&TEST_BLAST_HASH,
);
assert_eq!(sig.operator_id, identity.operator_id());
assert_eq!(sig.signature.len(), 64);
}
#[tokio::test]
async fn operator_registry_verifies_a_well_formed_signature() {
let identity = OperatorIdentity::generate();
let mut registry = OperatorRegistry::new();
registry.register(identity.keypair());
let proposal = IceActionProposal::FreezeCluster {
ttl: Duration::from_secs(60),
};
let ts = super::super::meshos::now_ms_since_unix_epoch();
let sig = identity.sign_proposal(&proposal, ts, &TEST_BLAST_HASH);
let payload = ice_proposal_signing_payload(&proposal, ts, &TEST_BLAST_HASH);
registry.verify(&sig, &payload).expect("valid signature");
}
#[tokio::test]
async fn operator_registry_rejects_unknown_operator() {
let registry = OperatorRegistry::new();
let identity = OperatorIdentity::generate();
let proposal = IceActionProposal::ThawCluster;
let ts = super::super::meshos::now_ms_since_unix_epoch();
let sig = identity.sign_proposal(&proposal, ts, &TEST_BLAST_HASH);
let payload = ice_proposal_signing_payload(&proposal, ts, &TEST_BLAST_HASH);
let err = registry
.verify(&sig, &payload)
.expect_err("unregistered operator should not verify");
assert_eq!(err.kind(), "not_authorized");
}
#[tokio::test]
async fn operator_registry_rejects_tampered_signature_bytes() {
let identity = OperatorIdentity::generate();
let mut registry = OperatorRegistry::new();
registry.register(identity.keypair());
let proposal = IceActionProposal::FreezeCluster {
ttl: Duration::from_secs(10),
};
let ts = super::super::meshos::now_ms_since_unix_epoch();
let mut sig = identity.sign_proposal(&proposal, ts, &TEST_BLAST_HASH);
sig.signature[0] ^= 0x01;
let payload = ice_proposal_signing_payload(&proposal, ts, &TEST_BLAST_HASH);
let err = registry
.verify(&sig, &payload)
.expect_err("tampered signature should not verify");
assert_eq!(err.kind(), "signature_invalid");
}
#[tokio::test]
async fn operator_registry_rejects_signature_for_wrong_payload() {
let identity = OperatorIdentity::generate();
let mut registry = OperatorRegistry::new();
registry.register(identity.keypair());
let signed_proposal = IceActionProposal::FreezeCluster {
ttl: Duration::from_secs(10),
};
let other_proposal = IceActionProposal::FreezeCluster {
ttl: Duration::from_secs(60),
};
let ts = super::super::meshos::now_ms_since_unix_epoch();
let sig = identity.sign_proposal(&signed_proposal, ts, &TEST_BLAST_HASH);
let payload = ice_proposal_signing_payload(&other_proposal, ts, &TEST_BLAST_HASH);
let err = registry
.verify(&sig, &payload)
.expect_err("cross-proposal signature should not verify");
assert_eq!(err.kind(), "signature_invalid");
}
#[tokio::test]
async fn operator_registry_rejects_wrong_length_signature() {
let identity = OperatorIdentity::generate();
let mut registry = OperatorRegistry::new();
registry.register(identity.keypair());
let proposal = IceActionProposal::ThawCluster;
let sig = OperatorSignature {
operator_id: identity.operator_id(),
signature: vec![0; 32], };
let payload = ice_proposal_signing_payload(
&proposal,
super::super::meshos::now_ms_since_unix_epoch(),
&TEST_BLAST_HASH,
);
let err = registry
.verify(&sig, &payload)
.expect_err("wrong-length signature should not verify");
assert_eq!(err.kind(), "signature_invalid");
}
#[tokio::test]
async fn ice_commit_with_registry_rejects_an_unverified_signature() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let op_a = OperatorIdentity::generate();
let op_b = OperatorIdentity::generate();
let mut registry = OperatorRegistry::new();
registry.register(op_a.keypair());
registry.register(op_b.keypair());
let deck = DeckClient::new(
runtime.handle_clone(),
runtime.snapshot_reader().clone(),
op_a.clone(),
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(100),
ice_signature_threshold: 2,
},
)
.with_operator_registry(registry);
let proposal = deck.ice().freeze_cluster(Duration::from_secs(15));
let simulated = proposal.simulate().await.expect("simulate");
let sig_a = op_a.sign_proposal(
simulated.action(),
simulated.issued_at_ms(),
&simulated.blast_hash(),
);
let mut sig_b = op_b.sign_proposal(
simulated.action(),
simulated.issued_at_ms(),
&simulated.blast_hash(),
);
sig_b.signature[3] ^= 0xFF;
let err = simulated
.commit(&[sig_a, sig_b])
.await
.expect_err("commit with tampered sig should fail");
assert_eq!(err.kind, "signature_invalid");
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn ice_flush_avoid_lists_proposal_simulate_and_commit_round_trips() {
use super::super::meshos::AvoidScope;
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
let proposal = deck.ice().flush_avoid_lists(AvoidScope::OnPeer { peer: 5 });
let simulated = proposal.simulate().await.expect("simulate");
assert!(simulated.blast_radius().warnings.iter().any(|w| matches!(
w,
crate::adapter::net::behavior::meshos::BlastWarning::AvoidFlushRecoversPeer { peer: 5 }
)));
let sig = deck.identity().sign_proposal(
simulated.action(),
simulated.issued_at_ms(),
&simulated.blast_hash(),
);
let commit = simulated.commit(&[sig]).await.expect("commit");
assert_eq!(commit.event_kind(), "flush_avoid_lists");
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn status_summary_stream_emits_initial_summary_immediately() {
use futures::StreamExt;
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(10),
..DeckClientConfig::default()
},
);
let mut stream = deck.status_summary_stream();
let first = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("first timed out")
.expect("first closed")
.expect("first ok");
assert!(first.freeze_remaining_ms.is_none());
assert!(!first.local_maintenance_active);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn status_summary_stream_dedups_unchanged_summaries() {
use futures::StreamExt;
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(10),
..DeckClientConfig::default()
},
);
let mut stream = deck.status_summary_stream();
let _ = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("first")
.expect("closed")
.expect("ok");
let second = tokio::time::timeout(Duration::from_millis(80), stream.next()).await;
assert!(
second.is_err(),
"stream should not re-emit unchanged summary"
);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn status_summary_stream_re_emits_on_freeze_state_change() {
use futures::StreamExt;
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(10),
..DeckClientConfig::default()
},
);
let mut stream = deck.status_summary_stream();
let first = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("first")
.expect("closed")
.expect("ok");
assert!(first.freeze_remaining_ms.is_none());
let p = deck
.ice()
.freeze_cluster(Duration::from_secs(30))
.simulate()
.await
.expect("simulate");
let sig = deck
.identity()
.sign_proposal(p.action(), p.issued_at_ms(), &p.blast_hash());
p.commit(&[sig]).await.expect("freeze");
let after_freeze = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("after_freeze timed out")
.expect("after_freeze closed")
.expect("after_freeze ok");
assert!(after_freeze.freeze_remaining_ms.is_some());
assert!(after_freeze.admin_audit_ring_depth >= 1);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn status_summary_reflects_steady_state_idle_cluster() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
let summary = deck.status_summary();
assert_eq!(summary.peers, PeerCounts::default());
assert_eq!(summary.daemons, DaemonCounts::default());
assert_eq!(summary.replica_chains, 0);
assert_eq!(summary.recently_emitted_count, 0);
assert!(summary.freeze_remaining_ms.is_none());
assert!(!summary.local_maintenance_active);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn status_summary_flags_freeze_after_freeze_cluster_commit() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
let p = deck
.ice()
.freeze_cluster(Duration::from_secs(30))
.simulate()
.await
.expect("simulate");
let sig = deck
.identity()
.sign_proposal(p.action(), p.issued_at_ms(), &p.blast_hash());
p.commit(&[sig]).await.expect("freeze");
tokio::time::sleep(Duration::from_millis(60)).await;
let summary = deck.status_summary();
assert!(summary.freeze_remaining_ms.is_some());
assert!(summary.admin_audit_ring_depth >= 1);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn status_summary_flags_local_maintenance_after_enter_maintenance() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
deck.admin()
.enter_maintenance(42, None)
.await
.expect("commit");
tokio::time::sleep(Duration::from_millis(60)).await;
let summary = deck.status_summary();
assert!(
summary.local_maintenance_active,
"local_maintenance_active should flip on after enter_maintenance",
);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn subscribe_failures_yields_seeded_dispatcher_rejection() {
use crate::adapter::net::behavior::meshos::DispatchError;
use futures::StreamExt;
let dispatcher = Arc::new(LoggingDispatcher::new());
dispatcher.fail_next(DispatchError::drop("first"));
let runtime = MeshOsRuntime::start(fast_config(), Arc::clone(&dispatcher));
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(15),
..DeckClientConfig::default()
},
);
let mut stream = deck.subscribe_failures(0);
deck.admin().enter_maintenance(42, None).await.unwrap();
let record = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("timed out")
.expect("closed")
.expect("ok");
assert!(record.seq > 0);
assert!(record.reason.contains("first"));
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn subscribe_failures_since_seq_drops_already_seen() {
use crate::adapter::net::behavior::meshos::DispatchError;
use futures::StreamExt;
let dispatcher = Arc::new(LoggingDispatcher::new());
dispatcher.fail_next(DispatchError::drop("first"));
let runtime = MeshOsRuntime::start(fast_config(), Arc::clone(&dispatcher));
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(15),
..DeckClientConfig::default()
},
);
deck.admin().enter_maintenance(42, None).await.unwrap();
let deadline = std::time::Instant::now() + Duration::from_secs(2);
let mut seq_seen = 0u64;
while std::time::Instant::now() < deadline {
let all = deck.recent_failures();
if let Some(r) = all.last() {
seq_seen = r.seq;
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert!(seq_seen > 0);
let mut stream = deck.subscribe_failures(seq_seen);
let parked = tokio::time::timeout(Duration::from_millis(60), stream.next()).await;
assert!(parked.is_err(), "no new failures means parked stream");
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn recent_failures_surfaces_dispatcher_rejections() {
use crate::adapter::net::behavior::meshos::DispatchError;
let dispatcher = Arc::new(LoggingDispatcher::new());
dispatcher.fail_next(DispatchError::drop("synthetic rejection"));
let runtime = MeshOsRuntime::start(fast_config(), Arc::clone(&dispatcher));
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
deck.admin()
.enter_maintenance(42, None)
.await
.expect("commit");
let deadline = std::time::Instant::now() + Duration::from_secs(2);
let mut got: Vec<crate::adapter::net::behavior::meshos::FailureRecord> = Vec::new();
while std::time::Instant::now() < deadline {
got = deck.recent_failures();
if !got.is_empty() {
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert!(
!got.is_empty(),
"recent_failures should reflect the seeded dispatcher rejection",
);
assert!(got[0].reason.contains("synthetic rejection"));
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn recent_failures_since_drops_records_at_or_below_cutoff() {
use crate::adapter::net::behavior::meshos::DispatchError;
let dispatcher = Arc::new(LoggingDispatcher::new());
dispatcher.fail_next(DispatchError::drop("first failure"));
let runtime = MeshOsRuntime::start(fast_config(), Arc::clone(&dispatcher));
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
deck.admin()
.enter_maintenance(42, None)
.await
.expect("commit");
let deadline = std::time::Instant::now() + Duration::from_secs(2);
let mut all: Vec<crate::adapter::net::behavior::meshos::FailureRecord> = Vec::new();
while std::time::Instant::now() < deadline {
all = deck.recent_failures();
if !all.is_empty() {
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert!(!all.is_empty(), "seed failure should land");
let cutoff = all[0].recorded_at_ms;
let after = deck.recent_failures_since(cutoff);
assert!(
after.iter().all(|r| r.recorded_at_ms > cutoff),
"since filter should drop records at the cutoff",
);
assert!(after.iter().all(|r| r.reason != "first failure"));
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn per_field_accessors_match_full_snapshot_contents() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
let snap = deck.status();
assert_eq!(deck.peers(), snap.peers);
assert_eq!(deck.daemons(), snap.daemons);
assert_eq!(deck.replicas(), snap.replicas);
assert_eq!(deck.local_maintenance(), snap.local_maintenance);
assert_eq!(deck.freeze_remaining_ms(), snap.freeze_remaining_ms);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn status_returns_freshest_snapshot_synchronously() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
let s = deck.status();
assert!(matches!(
s.local_maintenance,
crate::adapter::net::behavior::meshos::MaintenanceStateSnapshot::Active
));
let p = deck
.ice()
.freeze_cluster(Duration::from_secs(20))
.simulate()
.await
.expect("simulate");
let sig = deck
.identity()
.sign_proposal(p.action(), p.issued_at_ms(), &p.blast_hash());
p.commit(&[sig]).await.expect("commit");
tokio::time::sleep(Duration::from_millis(60)).await;
let s = deck.status();
assert!(s.freeze_remaining_ms.is_some());
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn watch_resolves_immediately_when_predicate_already_true() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
let snap = tokio::time::timeout(
Duration::from_millis(50),
deck.watch(|s| s.freeze_remaining_ms.is_none()),
)
.await
.expect("watch should not block when predicate already holds");
assert!(snap.freeze_remaining_ms.is_none());
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn watch_resolves_when_predicate_becomes_true_after_admin_commit() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(10),
..DeckClientConfig::default()
},
);
let deck_handle = deck.snapshot_reader.clone();
let watcher = {
let identity = deck.identity().clone();
let config = deck.config.clone();
let handle = deck.handle.clone();
let client = DeckClient::new(handle, deck_handle.clone(), identity, config);
tokio::spawn(async move { client.watch(|s| s.freeze_remaining_ms.is_some()).await })
};
tokio::time::sleep(Duration::from_millis(40)).await;
let p = deck
.ice()
.freeze_cluster(Duration::from_secs(15))
.simulate()
.await
.expect("simulate");
let sig = deck
.identity()
.sign_proposal(p.action(), p.issued_at_ms(), &p.blast_hash());
p.commit(&[sig]).await.expect("commit");
let snap = tokio::time::timeout(Duration::from_secs(2), watcher)
.await
.expect("watcher should resolve")
.expect("join");
assert!(snap.freeze_remaining_ms.is_some());
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn watch_timeout_returns_watch_timeout_error_when_predicate_never_holds() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(10),
..DeckClientConfig::default()
},
);
let err = deck
.watch_timeout(
|s| s.freeze_remaining_ms.is_some(),
Duration::from_millis(80),
)
.await
.expect_err("predicate never holds, should time out");
assert_eq!(err.kind, "watch_timeout");
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn audit_since_filter_drops_records_at_or_below_watermark() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
deck.admin().cordon(42).await.unwrap();
deck.admin().uncordon(42).await.unwrap();
deck.admin().invalidate_placement(42).await.unwrap();
tokio::time::sleep(Duration::from_millis(80)).await;
let all = deck.audit().collect();
assert_eq!(all.len(), 3);
let middle_seq = all[1].seq;
let after_middle = deck.audit().since(middle_seq).collect();
assert_eq!(after_middle.len(), 1, "since should keep only seq > middle");
assert!(after_middle[0].seq > middle_seq);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn audit_stream_since_seeds_initial_watermark() {
use futures::StreamExt;
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(15),
..DeckClientConfig::default()
},
);
deck.admin().cordon(42).await.unwrap();
deck.admin().uncordon(42).await.unwrap();
deck.admin().invalidate_placement(42).await.unwrap();
tokio::time::sleep(Duration::from_millis(80)).await;
let all = deck.audit().collect();
assert_eq!(all.len(), 3);
let middle_seq = all[1].seq;
let mut stream = deck.audit().since(middle_seq).stream();
let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("timed out")
.expect("closed")
.expect("ok");
assert!(next.seq > middle_seq);
let parked = tokio::time::timeout(Duration::from_millis(40), stream.next()).await;
assert!(
parked.is_err(),
"stream should park after watermark catches up"
);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn log_filter_since_seeds_stream_watermark() {
use crate::adapter::net::behavior::meshos::{LogLine, MeshOsEvent};
use futures::StreamExt;
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(15),
..DeckClientConfig::default()
},
);
for i in 0..3 {
runtime
.handle()
.publish(MeshOsEvent::LogLine(LogLine::info(
None,
format!("msg {i}"),
)))
.await
.unwrap();
}
tokio::time::sleep(Duration::from_millis(80)).await;
let snap = runtime.snapshot();
assert_eq!(snap.log_ring.len(), 3);
let middle_seq = snap.log_ring[1].seq;
let mut stream = deck.subscribe_logs(LogFilter::new().since(middle_seq));
let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("timed out")
.expect("closed")
.expect("ok");
assert!(next.seq > middle_seq);
assert_eq!(next.message, "msg 2");
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn subscribe_logs_yields_published_log_lines_in_seq_order() {
use crate::adapter::net::behavior::meshos::{LogLevel, LogLine, MeshOsEvent};
use futures::StreamExt;
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(15),
..DeckClientConfig::default()
},
);
let mut stream = deck.subscribe_logs(LogFilter::new());
for (i, level) in [LogLevel::Info, LogLevel::Warn, LogLevel::Error]
.into_iter()
.enumerate()
{
runtime
.handle()
.publish(MeshOsEvent::LogLine(LogLine {
level,
daemon_id: Some(7),
message: format!("msg {}", i),
}))
.await
.unwrap();
}
let r1 = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("r1 timed out")
.expect("r1 closed")
.expect("r1 ok");
let r2 = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("r2 timed out")
.expect("r2 closed")
.expect("r2 ok");
let r3 = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("r3 timed out")
.expect("r3 closed")
.expect("r3 ok");
assert!(r1.seq < r2.seq);
assert!(r2.seq < r3.seq);
assert_eq!(r1.level, LogLevel::Info);
assert_eq!(r3.level, LogLevel::Error);
assert_eq!(r1.node_id, Some(42));
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn subscribe_logs_min_level_filter_drops_below_threshold() {
use crate::adapter::net::behavior::meshos::{LogLevel, LogLine, MeshOsEvent};
use futures::StreamExt;
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(15),
..DeckClientConfig::default()
},
);
let mut stream = deck.subscribe_logs(LogFilter::new().min_level(LogLevel::Warn));
runtime
.handle()
.publish(MeshOsEvent::LogLine(LogLine::info(None, "info dropped")))
.await
.unwrap();
runtime
.handle()
.publish(MeshOsEvent::LogLine(LogLine::warn(None, "warn kept")))
.await
.unwrap();
let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("next timed out")
.expect("next closed")
.expect("next ok");
assert_eq!(next.level, LogLevel::Warn);
assert_eq!(next.message, "warn kept");
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn subscribe_logs_with_daemon_filter_keeps_only_matching_daemon() {
use crate::adapter::net::behavior::meshos::{LogLine, MeshOsEvent};
use futures::StreamExt;
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(15),
..DeckClientConfig::default()
},
);
let mut stream = deck.subscribe_logs(LogFilter::new().with_daemon(7));
runtime
.handle()
.publish(MeshOsEvent::LogLine(LogLine::info(
Some(99),
"other daemon",
)))
.await
.unwrap();
runtime
.handle()
.publish(MeshOsEvent::LogLine(LogLine::info(
Some(7),
"target daemon",
)))
.await
.unwrap();
let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("next timed out")
.expect("next closed")
.expect("next ok");
assert_eq!(next.daemon_id, Some(7));
assert_eq!(next.message, "target daemon");
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn audit_stream_emits_one_record_per_signed_commit_in_order() {
use futures::StreamExt;
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(15),
..DeckClientConfig::default()
},
);
let mut stream = deck.audit().stream();
let first_attempt = tokio::time::timeout(Duration::from_millis(40), stream.next()).await;
assert!(first_attempt.is_err(), "stream should park when no records");
deck.admin().cordon(42).await.unwrap();
deck.admin().uncordon(42).await.unwrap();
deck.admin().invalidate_placement(42).await.unwrap();
let r1 = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("r1 timed out")
.expect("r1 closed")
.expect("r1 ok");
let r2 = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("r2 timed out")
.expect("r2 closed")
.expect("r2 ok");
let r3 = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("r3 timed out")
.expect("r3 closed")
.expect("r3 ok");
assert!(r1.seq < r2.seq);
assert!(r2.seq < r3.seq);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn audit_stream_dedups_already_seen_records_across_polls() {
use futures::StreamExt;
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(10),
..DeckClientConfig::default()
},
);
deck.admin().cordon(42).await.unwrap();
let mut stream = deck.audit().stream();
let first = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("first timed out")
.expect("first closed")
.expect("first ok");
let second_attempt = tokio::time::timeout(Duration::from_millis(50), stream.next()).await;
assert!(
second_attempt.is_err(),
"stream should not re-emit seen record"
);
deck.admin().uncordon(42).await.unwrap();
let second = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("second timed out")
.expect("second closed")
.expect("second ok");
assert!(second.seq > first.seq);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn audit_stream_applies_force_only_filter_in_tail_mode() {
use futures::StreamExt;
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(10),
..DeckClientConfig::default()
},
);
let mut stream = deck.audit().force_only().stream();
deck.admin().cordon(42).await.unwrap();
let thaw = deck
.ice()
.thaw_cluster()
.simulate()
.await
.expect("simulate");
let sig =
deck.identity()
.sign_proposal(thaw.action(), thaw.issued_at_ms(), &thaw.blast_hash());
thaw.commit(&[sig]).await.unwrap();
let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
.await
.expect("next timed out")
.expect("next closed")
.expect("next ok");
assert!(next.event.is_ice());
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn audit_query_returns_empty_when_no_ice_commits_observed() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
let results = deck.audit().recent(10).collect();
assert!(results.is_empty());
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn audit_query_returns_recent_entries_newest_first() {
use crate::adapter::net::behavior::meshos::{IceActionProposal, MeshOsEvent};
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
for ttl_secs in [10, 20, 30] {
runtime
.handle()
.publish(MeshOsEvent::SignedIceCommit {
proposal: IceActionProposal::FreezeCluster {
ttl: Duration::from_secs(ttl_secs),
},
signatures: Vec::new(),
issued_at_ms: super::super::meshos::now_ms_since_unix_epoch(),
blast_hash: TEST_BLAST_HASH,
})
.await
.unwrap();
}
tokio::time::sleep(Duration::from_millis(80)).await;
let all = deck.audit().collect();
assert_eq!(all.len(), 3, "ring should hold all three entries");
assert!(matches!(
all[0].event,
AdminEvent::FreezeCluster { ttl } if ttl == Duration::from_secs(30)
));
assert!(matches!(
all[2].event,
AdminEvent::FreezeCluster { ttl } if ttl == Duration::from_secs(10)
));
let recent_one = deck.audit().recent(1).collect();
assert_eq!(recent_one.len(), 1);
assert!(matches!(
recent_one[0].event,
AdminEvent::FreezeCluster { ttl } if ttl == Duration::from_secs(30)
));
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn audit_query_filters_by_operator_id() {
use crate::adapter::net::behavior::meshos::{IceActionProposal, MeshOsEvent};
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let op_a = OperatorIdentity::generate();
let op_b = OperatorIdentity::generate();
let deck = DeckClient::from_runtime(&runtime, op_a.clone());
let proposal_a = IceActionProposal::FreezeCluster {
ttl: Duration::from_secs(10),
};
let ts_a = super::super::meshos::now_ms_since_unix_epoch();
let sig_a = OperatorSignature::sign(op_a.keypair(), &proposal_a, ts_a, &TEST_BLAST_HASH);
runtime
.handle()
.publish(MeshOsEvent::SignedIceCommit {
proposal: proposal_a,
signatures: vec![sig_a],
issued_at_ms: ts_a,
blast_hash: TEST_BLAST_HASH,
})
.await
.unwrap();
let proposal_b = IceActionProposal::ThawCluster;
let ts_b = super::super::meshos::now_ms_since_unix_epoch();
let sig_b = OperatorSignature::sign(op_b.keypair(), &proposal_b, ts_b, &TEST_BLAST_HASH);
runtime
.handle()
.publish(MeshOsEvent::SignedIceCommit {
proposal: proposal_b,
signatures: vec![sig_b],
issued_at_ms: ts_b,
blast_hash: TEST_BLAST_HASH,
})
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(80)).await;
let filtered = deck.audit().by_operator(op_a.operator_id()).collect();
assert_eq!(filtered.len(), 1);
assert!(matches!(
filtered[0].event,
AdminEvent::FreezeCluster { .. }
));
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn audit_query_force_only_drops_ordinary_admin_keeps_ice() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
deck.admin().cordon(42).await.expect("cordon");
let thaw = deck
.ice()
.thaw_cluster()
.simulate()
.await
.expect("simulate");
let sig =
deck.identity()
.sign_proposal(thaw.action(), thaw.issued_at_ms(), &thaw.blast_hash());
thaw.commit(&[sig]).await.expect("thaw");
tokio::time::sleep(Duration::from_millis(80)).await;
let baseline = deck.audit().collect();
assert_eq!(
baseline.len(),
2,
"ring should hold both ordinary and ICE commits"
);
let force_only = deck.audit().force_only().collect();
assert_eq!(force_only.len(), 1, "force_only should drop Cordon");
assert!(force_only[0].event.is_ice());
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn admin_commit_routes_through_signed_path_when_registry_installed() {
use std::sync::Arc as SArc;
let dispatcher = Arc::new(LoggingDispatcher::new());
let identity = OperatorIdentity::generate();
let mut registry = OperatorRegistry::new();
registry.register(identity.keypair());
let verifier = SArc::new(crate::adapter::net::behavior::meshos::AdminVerifier::new(
SArc::new(registry.clone()),
1,
));
let runtime = MeshOsRuntime::start_with_all(
fast_config(),
dispatcher,
Default::default(),
Default::default(),
SArc::new(crate::adapter::net::compute::DaemonRegistry::new()),
None,
Some(verifier),
);
let deck =
DeckClient::from_runtime(&runtime, identity.clone()).with_operator_registry(registry);
let commit = deck.admin().cordon(42).await.expect("commit");
assert_eq!(commit.event_kind(), "cordon");
tokio::time::sleep(Duration::from_millis(80)).await;
let entries = deck.audit().collect();
assert_eq!(entries.len(), 1);
assert!(matches!(
entries[0].outcome,
crate::adapter::net::behavior::meshos::VerificationOutcome::Accepted
));
assert_eq!(entries[0].operator_ids, vec![identity.operator_id()]);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn admin_commit_falls_back_to_unsigned_when_no_registry_installed() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
deck.admin().cordon(42).await.expect("commit");
tokio::time::sleep(Duration::from_millis(80)).await;
let entries = deck.audit().collect();
assert_eq!(entries.len(), 1);
assert!(matches!(
entries[0].outcome,
crate::adapter::net::behavior::meshos::VerificationOutcome::Unverified
));
assert!(entries[0].operator_ids.is_empty());
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn audit_ring_records_unsigned_admin_with_unverified_outcome() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
deck.admin().cordon(42).await.expect("cordon");
deck.admin()
.drop_replicas(42, vec![1, 2])
.await
.expect("drop_replicas");
tokio::time::sleep(Duration::from_millis(80)).await;
let entries = deck.audit().collect();
assert_eq!(entries.len(), 2);
for entry in &entries {
assert!(matches!(
entry.outcome,
crate::adapter::net::behavior::meshos::VerificationOutcome::Unverified
));
assert!(entry.operator_ids.is_empty());
}
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn audit_query_between_filters_outside_window() {
use crate::adapter::net::behavior::meshos::{IceActionProposal, MeshOsEvent};
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
runtime
.handle()
.publish(MeshOsEvent::SignedIceCommit {
proposal: IceActionProposal::ThawCluster,
signatures: Vec::new(),
issued_at_ms: super::super::meshos::now_ms_since_unix_epoch(),
blast_hash: TEST_BLAST_HASH,
})
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(80)).await;
let past_only = deck.audit().between(0, 1).collect();
assert!(past_only.is_empty());
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let around_now = deck
.audit()
.between(now_ms - 10_000, now_ms + 10_000)
.collect();
assert_eq!(around_now.len(), 1);
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn ice_force_restart_daemon_proposal_round_trips() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
let daemon = super::super::meshos::DaemonRef {
id: 7,
name: "telemetry".into(),
};
let proposal = deck.ice().force_restart_daemon(daemon.clone());
let simulated = proposal.simulate().await.expect("simulate");
assert_eq!(simulated.blast_radius().affected_daemons, vec![daemon]);
let sig = deck.identity().sign_proposal(
simulated.action(),
simulated.issued_at_ms(),
&simulated.blast_hash(),
);
let commit = simulated.commit(&[sig]).await.expect("commit");
assert_eq!(commit.event_kind(), "force_restart_daemon");
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn ice_kill_migration_proposal_round_trips_and_audits() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
let proposal = deck.ice().kill_migration(123);
let simulated = proposal.simulate().await.expect("simulate");
let sig = deck.identity().sign_proposal(
simulated.action(),
simulated.issued_at_ms(),
&simulated.blast_hash(),
);
let commit = simulated.commit(&[sig]).await.expect("commit");
assert_eq!(commit.event_kind(), "kill_migration");
tokio::time::sleep(Duration::from_millis(60)).await;
let entries = deck.audit().force_only().collect();
assert!(entries
.iter()
.any(|r| matches!(r.event, AdminEvent::KillMigration { migration: 123 })));
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn ice_force_cutover_proposal_round_trips() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
let proposal = deck.ice().force_cutover(100, 42);
let simulated = proposal.simulate().await.expect("simulate");
assert_eq!(simulated.blast_radius().affected_replicas, vec![100]);
assert_eq!(simulated.blast_radius().affected_nodes, vec![42]);
let sig = deck.identity().sign_proposal(
simulated.action(),
simulated.issued_at_ms(),
&simulated.blast_hash(),
);
let commit = simulated.commit(&[sig]).await.expect("commit");
assert_eq!(commit.event_kind(), "force_cutover");
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn ice_force_evict_replica_proposal_round_trips() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
let proposal = deck.ice().force_evict_replica(100, 7);
let simulated = proposal.simulate().await.expect("simulate");
assert_eq!(simulated.blast_radius().affected_replicas, vec![100]);
assert_eq!(simulated.blast_radius().affected_nodes, vec![7]);
let sig = deck.identity().sign_proposal(
simulated.action(),
simulated.issued_at_ms(),
&simulated.blast_hash(),
);
let commit = simulated.commit(&[sig]).await.expect("commit");
assert_eq!(commit.event_kind(), "force_evict_replica");
let _ = runtime.shutdown().await;
}
#[tokio::test]
async fn ice_commit_with_registry_accepts_a_valid_multi_op_bundle() {
let dispatcher = Arc::new(LoggingDispatcher::new());
let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
let op_a = OperatorIdentity::generate();
let op_b = OperatorIdentity::generate();
let mut registry = OperatorRegistry::new();
registry.register(op_a.keypair());
registry.register(op_b.keypair());
let deck = DeckClient::new(
runtime.handle_clone(),
runtime.snapshot_reader().clone(),
op_a.clone(),
DeckClientConfig {
snapshot_poll_interval: Duration::from_millis(100),
ice_signature_threshold: 2,
},
)
.with_operator_registry(registry);
let proposal = deck.ice().freeze_cluster(Duration::from_secs(15));
let simulated = proposal.simulate().await.expect("simulate");
let sig_a = op_a.sign_proposal(
simulated.action(),
simulated.issued_at_ms(),
&simulated.blast_hash(),
);
let sig_b = op_b.sign_proposal(
simulated.action(),
simulated.issued_at_ms(),
&simulated.blast_hash(),
);
let commit = simulated
.commit(&[sig_a, sig_b])
.await
.expect("valid multi-op bundle should commit");
assert_eq!(commit.event_kind(), "freeze_cluster");
let _ = runtime.shutdown().await;
}
}