1use std::pin::Pin;
75use std::sync::atomic::{AtomicU64, Ordering};
76use std::sync::Arc;
77use std::task::{Context, Poll};
78use std::time::{Duration, SystemTime};
79
80use futures::Stream;
81use tokio::time::{interval, Interval};
82
83use super::meshos::{
84 ice_proposal_signing_payload, simulate_ice_proposal, AdminEvent, BlastRadius, ChainId,
85 IceActionProposal, MeshOsEvent, MeshOsHandle, MeshOsHandleError, MeshOsRuntime, MeshOsSnapshot,
86 MeshOsSnapshotReader, NodeId,
87};
88use crate::adapter::net::behavior::aggregator::{AggregatorDaemon, SummaryAnnouncement};
89use crate::adapter::net::identity::EntityKeypair;
90use crate::adapter::net::subnet::SubnetId;
91use crate::adapter::net::MeshNode;
92use crate::adapter::net::{ChannelHash, Visibility};
93
94#[derive(Clone, Debug)]
101pub struct OperatorIdentity {
102 keypair: Arc<EntityKeypair>,
103 operator_id: u64,
104}
105
106impl OperatorIdentity {
107 pub fn from_keypair(keypair: EntityKeypair) -> Self {
110 let operator_id = keypair.origin_hash();
111 Self {
112 keypair: Arc::new(keypair),
113 operator_id,
114 }
115 }
116
117 pub fn generate() -> Self {
120 Self::from_keypair(EntityKeypair::generate())
121 }
122
123 pub fn operator_id(&self) -> u64 {
126 self.operator_id
127 }
128
129 pub fn keypair(&self) -> &EntityKeypair {
140 &self.keypair
141 }
142}
143
144#[derive(Clone, Debug, thiserror::Error)]
148#[error("<<deck-sdk-kind:{kind}>>{message}")]
149pub struct DeckError {
150 pub kind: &'static str,
154 pub message: String,
156}
157
158impl DeckError {
159 fn new(kind: &'static str, message: impl Into<String>) -> Self {
160 Self {
161 kind,
162 message: message.into(),
163 }
164 }
165}
166
167impl From<MeshOsHandleError> for DeckError {
168 fn from(err: MeshOsHandleError) -> Self {
169 match err {
170 MeshOsHandleError::LoopClosed => Self::new("loop_closed", "MeshOS loop has exited"),
171 MeshOsHandleError::QueueFull => Self::new(
172 "queue_full",
173 "MeshOS source channel at capacity — back off + retry",
174 ),
175 }
176 }
177}
178
179pub type AdminError = DeckError;
183
184pub type IceError = DeckError;
188
189#[derive(Clone, Debug)]
198pub struct ChainCommit {
199 commit_id: u64,
200 operator_id: u64,
201 event_kind: &'static str,
202 committed_at: SystemTime,
203}
204
205impl ChainCommit {
206 pub fn commit_id(&self) -> u64 {
209 self.commit_id
210 }
211
212 pub fn operator_id(&self) -> u64 {
214 self.operator_id
215 }
216
217 pub fn event_kind(&self) -> &'static str {
220 self.event_kind
221 }
222
223 pub fn committed_at(&self) -> SystemTime {
227 self.committed_at
228 }
229}
230
231#[derive(Clone, Debug)]
233pub struct DeckClientConfig {
234 pub snapshot_poll_interval: Duration,
239 pub ice_signature_threshold: usize,
248}
249
250impl Default for DeckClientConfig {
251 fn default() -> Self {
252 Self {
253 snapshot_poll_interval: Duration::from_millis(100),
254 ice_signature_threshold: 1,
255 }
256 }
257}
258
259#[derive(Clone, Debug, Default, Eq, PartialEq)]
267pub struct StatusSummary {
268 pub peers: PeerCounts,
270 pub daemons: DaemonCounts,
272 pub replica_chains: usize,
274 pub avoid_list_entries: usize,
276 pub recently_emitted_count: usize,
283 pub recent_failure_count: usize,
285 pub admin_audit_ring_depth: usize,
288 pub freeze_remaining_ms: Option<u64>,
291 pub local_maintenance_active: bool,
295}
296
297#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
299pub struct PeerCounts {
300 pub healthy: usize,
302 pub degraded: usize,
304 pub unreachable: usize,
306 pub unknown: usize,
308}
309
310#[derive(Clone, Debug, Eq, PartialEq)]
315pub struct GatewayStats {
316 pub local_subnet: SubnetId,
321 pub forwarded: u64,
326 pub dropped: u64,
328 pub peer_subnets: Vec<SubnetId>,
331 pub export_rules: u64,
334}
335
336#[derive(Clone, Debug, Eq, PartialEq)]
341pub struct SubnetRollup {
342 pub subnet: SubnetId,
344 pub members: Vec<u64>,
348 pub is_local: bool,
351}
352
353#[derive(Clone, Debug)]
359pub struct AggregatorSnapshot {
360 pub source_subnet: SubnetId,
362 pub fold_kinds: Vec<u16>,
364 pub generation: u64,
366 pub summary_interval: std::time::Duration,
368 pub summaries: Arc<Vec<SummaryAnnouncement>>,
370}
371
372#[derive(Clone, Debug)]
375pub struct AggregatorReplicaRow {
376 pub generation: u64,
378 pub healthy: bool,
381 pub diagnostic: Option<String>,
383 pub placement_node_id: Option<u64>,
387}
388
389#[derive(Clone, Debug)]
393pub struct AggregatorRegistryGroupSnapshot {
394 pub name: String,
396 pub group_seed: [u8; 32],
398 pub replicas: Vec<AggregatorReplicaRow>,
400}
401
402#[derive(Clone, Debug, Default)]
404pub struct AggregatorRegistrySnapshot {
405 pub groups: Vec<AggregatorRegistryGroupSnapshot>,
408}
409
410#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
416pub struct DaemonCounts {
417 pub running: usize,
419 pub starting: usize,
421 pub stopping: usize,
423 pub stopped: usize,
425 pub backing_off: usize,
429 pub crash_looping: usize,
432}
433
434fn build_status_summary(snap: &MeshOsSnapshot) -> StatusSummary {
438 let mut peers = PeerCounts::default();
439 for (_, peer) in snap.peers.iter() {
440 match peer.health {
441 Some(super::meshos::PeerHealthSnapshot::Healthy) => peers.healthy += 1,
442 Some(super::meshos::PeerHealthSnapshot::Degraded) => peers.degraded += 1,
443 Some(super::meshos::PeerHealthSnapshot::Unreachable) => peers.unreachable += 1,
444 None => peers.unknown += 1,
445 }
446 }
447 let mut daemons = DaemonCounts::default();
448 for (_, d) in snap.daemons.iter() {
449 match d.lifecycle {
450 super::meshos::DaemonLifecycleSnapshot::Running => daemons.running += 1,
451 super::meshos::DaemonLifecycleSnapshot::Starting => daemons.starting += 1,
452 super::meshos::DaemonLifecycleSnapshot::Stopping => daemons.stopping += 1,
453 super::meshos::DaemonLifecycleSnapshot::Stopped => daemons.stopped += 1,
454 }
455 match d.restart_state {
456 super::meshos::RestartStateSnapshot::Idle => {}
457 super::meshos::RestartStateSnapshot::BackingOff { .. } => daemons.backing_off += 1,
458 super::meshos::RestartStateSnapshot::CrashLooping { .. } => daemons.crash_looping += 1,
459 }
460 }
461 let maintenance_active = !matches!(
462 snap.local_maintenance,
463 super::meshos::MaintenanceStateSnapshot::Active
464 );
465 StatusSummary {
466 peers,
467 daemons,
468 replica_chains: snap.replicas.len(),
469 avoid_list_entries: snap.avoid_list.len(),
470 recently_emitted_count: snap.recently_emitted.len(),
471 recent_failure_count: snap.recent_failures.len(),
472 admin_audit_ring_depth: snap.admin_audit.len(),
473 freeze_remaining_ms: snap.freeze_remaining_ms,
474 local_maintenance_active: maintenance_active,
475 }
476}
477
478#[derive(Clone)]
487pub struct DeckClient {
488 handle: MeshOsHandle,
489 snapshot_reader: MeshOsSnapshotReader,
490 identity: OperatorIdentity,
491 config: DeckClientConfig,
492 commit_seq: Arc<AtomicU64>,
498 operator_registry: Option<Arc<OperatorRegistry>>,
505 mesh: Option<Arc<MeshNode>>,
513 aggregator: Option<Arc<AggregatorDaemon>>,
521}
522
523impl DeckClient {
524 pub fn new(
529 handle: MeshOsHandle,
530 snapshot_reader: MeshOsSnapshotReader,
531 identity: OperatorIdentity,
532 config: DeckClientConfig,
533 ) -> Self {
534 Self {
535 handle,
536 snapshot_reader,
537 identity,
538 config,
539 commit_seq: Arc::new(AtomicU64::new(0)),
540 operator_registry: None,
541 mesh: None,
542 aggregator: None,
543 }
544 }
545
546 pub fn with_mesh(mut self, mesh: Arc<MeshNode>) -> Self {
554 self.mesh = Some(mesh);
555 self
556 }
557
558 pub fn with_aggregator(mut self, aggregator: Arc<AggregatorDaemon>) -> Self {
561 self.aggregator = Some(aggregator);
562 self
563 }
564
565 pub fn from_runtime(runtime: &MeshOsRuntime, identity: OperatorIdentity) -> Self {
570 Self::new(
571 runtime.handle_clone(),
572 runtime.snapshot_reader().clone(),
573 identity,
574 DeckClientConfig::default(),
575 )
576 }
577
578 pub fn with_config(mut self, config: DeckClientConfig) -> Self {
581 self.config = config;
582 self
583 }
584
585 pub fn with_operator_registry(mut self, registry: OperatorRegistry) -> Self {
591 self.operator_registry = Some(Arc::new(registry));
592 self
593 }
594
595 pub fn operator_registry(&self) -> Option<&OperatorRegistry> {
597 self.operator_registry.as_deref()
598 }
599
600 pub fn identity(&self) -> &OperatorIdentity {
602 &self.identity
603 }
604
605 pub fn admin(&self) -> AdminCommands<'_> {
609 AdminCommands { client: self }
610 }
611
612 pub fn ice(&self) -> IceCommands<'_> {
617 IceCommands { client: self }
618 }
619
620 pub fn audit(&self) -> AuditQuery<'_> {
629 AuditQuery::new(self)
630 }
631
632 pub fn subscribe_failures(&self, since_seq: u64) -> FailureStream {
641 FailureStream::new(
642 self.snapshot_reader.clone(),
643 self.config.snapshot_poll_interval,
644 since_seq,
645 )
646 }
647
648 pub fn subscribe_logs(&self, filter: LogFilter) -> LogStream {
658 LogStream::new(
659 self.snapshot_reader.clone(),
660 self.config.snapshot_poll_interval,
661 filter,
662 )
663 }
664
665 pub fn snapshots(&self) -> SnapshotStream {
671 SnapshotStream::new(
672 self.snapshot_reader.clone(),
673 self.config.snapshot_poll_interval,
674 )
675 }
676
677 pub fn status(&self) -> MeshOsSnapshot {
683 self.snapshot_reader.read()
684 }
685
686 pub fn status_summary_stream(&self) -> StatusSummaryStream {
694 StatusSummaryStream::new(
695 self.snapshot_reader.clone(),
696 self.config.snapshot_poll_interval,
697 )
698 }
699
700 pub fn status_summary(&self) -> StatusSummary {
706 build_status_summary(&self.snapshot_reader.load())
707 }
708
709 pub fn peers(&self) -> std::collections::BTreeMap<NodeId, super::meshos::PeerSnapshot> {
712 self.snapshot_reader.load().peers.clone()
713 }
714
715 pub fn local_subnet(&self) -> Option<SubnetId> {
719 self.mesh.as_ref().map(|m| m.local_subnet())
720 }
721
722 pub fn known_subnets(&self) -> Vec<(u64, SubnetId)> {
728 self.mesh
729 .as_ref()
730 .map(|m| m.known_subnets())
731 .unwrap_or_default()
732 }
733
734 pub fn subnets_with_members(&self, local_node_id: Option<u64>) -> Vec<SubnetRollup> {
744 let local = self.local_subnet();
745 let mut buckets: std::collections::BTreeMap<u32, std::collections::BTreeSet<u64>> =
746 std::collections::BTreeMap::new();
747 for (node_id, subnet) in self.known_subnets() {
748 buckets.entry(subnet.raw()).or_default().insert(node_id);
749 }
750 if let Some(local_subnet) = local {
751 let entry = buckets.entry(local_subnet.raw()).or_default();
752 if let Some(id) = local_node_id {
753 entry.insert(id);
754 }
755 }
756 buckets
757 .into_iter()
758 .map(|(raw, members)| {
759 let subnet = SubnetId::from_raw(raw);
760 SubnetRollup {
761 subnet,
762 members: members.into_iter().collect(),
763 is_local: local == Some(subnet),
764 }
765 })
766 .collect()
767 }
768
769 pub fn gateway_stats(&self) -> Option<GatewayStats> {
774 let gw = self.mesh.as_ref().and_then(|m| m.gateway())?;
775 Some(GatewayStats {
776 local_subnet: gw.local_subnet(),
777 forwarded: gw.forwarded_count(),
778 dropped: gw.dropped_count(),
779 peer_subnets: gw.peer_subnets(),
780 export_rules: gw.exports().len() as u64,
781 })
782 }
783
784 pub fn gateway_exports(&self) -> Vec<(u16, Vec<SubnetId>)> {
788 self.mesh
789 .as_ref()
790 .and_then(|m| m.gateway())
791 .map(|gw| gw.exports())
792 .unwrap_or_default()
793 }
794
795 pub fn channel_visibility(&self, channel_name: &str) -> Option<Visibility> {
801 let mesh = self.mesh.as_ref()?;
802 let registry = mesh.channel_configs()?;
803 let cfg = registry.get_by_name(channel_name)?;
804 Some(cfg.visibility)
805 }
806
807 pub fn channels(&self) -> Vec<(String, Visibility)> {
811 let Some(mesh) = self.mesh.as_ref() else {
812 return Vec::new();
813 };
814 let Some(registry) = mesh.channel_configs() else {
815 return Vec::new();
816 };
817 registry
818 .snapshot()
819 .into_iter()
820 .map(|(name, cfg)| (name, cfg.visibility))
821 .collect()
822 }
823
824 pub fn channel_wire_hash(&self, channel_name: &str) -> Option<u16> {
831 let mesh = self.mesh.as_ref()?;
832 let registry = mesh.channel_configs()?;
833 let cfg = registry.get_by_name(channel_name)?;
834 Some(cfg.channel_id.wire_hash())
835 }
836
837 pub fn channel_canonical_hash(&self, channel_name: &str) -> Option<ChannelHash> {
841 let mesh = self.mesh.as_ref()?;
842 let registry = mesh.channel_configs()?;
843 let cfg = registry.get_by_name(channel_name)?;
844 Some(cfg.channel_id.hash())
845 }
846
847 pub fn aggregator_installed(&self) -> bool {
852 self.aggregator.is_some()
853 }
854
855 pub fn aggregator_summaries(&self) -> Vec<SummaryAnnouncement> {
858 self.aggregator
859 .as_ref()
860 .map(|a| a.latest_summaries())
861 .unwrap_or_default()
862 }
863
864 pub fn aggregator_summaries_arc(&self) -> Arc<Vec<SummaryAnnouncement>> {
868 self.aggregator
869 .as_ref()
870 .map(|a| a.latest_summaries_arc())
871 .unwrap_or_else(|| Arc::new(Vec::new()))
872 }
873
874 pub fn aggregator_snapshot(&self) -> Option<AggregatorSnapshot> {
884 let agg = self.aggregator.as_ref()?;
885 let config = agg.config();
886 Some(AggregatorSnapshot {
887 source_subnet: config.source_subnet,
888 fold_kinds: config.fold_kinds.clone(),
889 generation: agg.generation(),
890 summary_interval: config.summary_interval,
891 summaries: agg.latest_summaries_arc(),
892 })
893 }
894
895 pub async fn aggregator_registry_snapshot(&self) -> Option<AggregatorRegistrySnapshot> {
906 let mesh = self.mesh.as_ref()?;
907 let registry = mesh.aggregator_registry()?;
908 let entries = registry.entries();
909 let mut groups = Vec::with_capacity(entries.len());
910 for entry in entries {
911 let snap = entry.snapshot().await;
919 let rows = snap
920 .replicas
921 .iter()
922 .enumerate()
923 .map(|(idx, replica)| {
924 let health = snap.healths.get(idx).cloned().unwrap_or(
925 crate::adapter::net::behavior::lifecycle::ReplicaHealth {
926 healthy: true,
927 diagnostic: None,
928 },
929 );
930 let placement_node_id = snap.placements.get(idx).map(|p| p.node_id);
931 AggregatorReplicaRow {
932 generation: replica.generation(),
933 healthy: health.healthy,
934 diagnostic: health.diagnostic,
935 placement_node_id,
936 }
937 })
938 .collect();
939 groups.push(AggregatorRegistryGroupSnapshot {
940 name: entry.name.clone(),
941 group_seed: entry.group_seed,
942 replicas: rows,
943 });
944 }
945 Some(AggregatorRegistrySnapshot { groups })
946 }
947
948 pub fn aggregator_generation(&self) -> u64 {
951 self.aggregator
952 .as_ref()
953 .map(|a| a.generation())
954 .unwrap_or(0)
955 }
956
957 pub fn aggregator_source_subnet(&self) -> Option<SubnetId> {
960 self.aggregator.as_ref().map(|a| a.config().source_subnet)
961 }
962
963 pub fn aggregator_fold_kinds(&self) -> Vec<u16> {
966 self.aggregator
967 .as_ref()
968 .map(|a| a.config().fold_kinds.clone())
969 .unwrap_or_default()
970 }
971
972 pub fn aggregator_summary_interval(&self) -> std::time::Duration {
974 self.aggregator
975 .as_ref()
976 .map(|a| a.config().summary_interval)
977 .unwrap_or_default()
978 }
979
980 pub fn daemons(&self) -> std::collections::BTreeMap<u64, super::meshos::DaemonSnapshot> {
982 self.snapshot_reader.load().daemons.clone()
983 }
984
985 pub fn replicas(&self) -> std::collections::BTreeMap<ChainId, super::meshos::ReplicaSnapshot> {
987 self.snapshot_reader.load().replicas.clone()
988 }
989
990 pub fn local_maintenance(&self) -> super::meshos::MaintenanceStateSnapshot {
992 self.snapshot_reader.load().local_maintenance.clone()
993 }
994
995 pub fn freeze_remaining_ms(&self) -> Option<u64> {
998 self.snapshot_reader.load().freeze_remaining_ms
999 }
1000
1001 pub fn recent_failures(&self) -> Vec<super::meshos::FailureRecord> {
1007 self.snapshot_reader
1008 .load()
1009 .recent_failures
1010 .iter()
1011 .cloned()
1012 .collect()
1013 }
1014
1015 pub fn runtime_epoch_id(&self) -> u64 {
1023 self.snapshot_reader.load().runtime_epoch_id
1024 }
1025
1026 pub fn audit_head_seq(&self) -> u64 {
1033 self.snapshot_reader
1034 .load()
1035 .admin_audit
1036 .last()
1037 .map(|r| r.seq)
1038 .unwrap_or(0)
1039 }
1040
1041 pub fn log_head_seq(&self) -> u64 {
1045 self.snapshot_reader
1046 .load()
1047 .log_ring
1048 .last()
1049 .map(|r| r.seq)
1050 .unwrap_or(0)
1051 }
1052
1053 pub fn failure_head_seq(&self) -> u64 {
1056 self.snapshot_reader
1057 .load()
1058 .recent_failures
1059 .iter()
1060 .next_back()
1061 .map(|r| r.seq)
1062 .unwrap_or(0)
1063 }
1064
1065 pub fn recent_failures_since(&self, since_ms: u64) -> Vec<super::meshos::FailureRecord> {
1077 self.snapshot_reader
1078 .load()
1079 .recent_failures
1080 .iter()
1081 .filter(|r| r.recorded_at_ms > since_ms)
1082 .cloned()
1083 .collect()
1084 }
1085
1086 pub async fn watch<F>(&self, mut predicate: F) -> MeshOsSnapshot
1102 where
1103 F: FnMut(&MeshOsSnapshot) -> bool,
1104 {
1105 let snap = self.snapshot_reader.read();
1108 if predicate(&snap) {
1109 return snap;
1110 }
1111 let ceiling = self
1112 .config
1113 .snapshot_poll_interval
1114 .max(Duration::from_millis(1));
1115 let mut change_rx = self.snapshot_reader.subscribe_changes();
1122 loop {
1123 tokio::select! {
1124 biased;
1125 _ = change_rx.changed() => {}
1126 _ = tokio::time::sleep(ceiling) => {}
1127 }
1128 let snap = self.snapshot_reader.read();
1129 if predicate(&snap) {
1130 return snap;
1131 }
1132 }
1133 }
1134
1135 pub async fn watch_timeout<F>(
1140 &self,
1141 predicate: F,
1142 timeout: Duration,
1143 ) -> Result<MeshOsSnapshot, DeckError>
1144 where
1145 F: FnMut(&MeshOsSnapshot) -> bool,
1146 {
1147 tokio::time::timeout(timeout, self.watch(predicate))
1148 .await
1149 .map_err(|_| {
1150 DeckError::new(
1151 "watch_timeout",
1152 format!(
1153 "no snapshot matched the predicate within {} ms",
1154 timeout.as_millis()
1155 ),
1156 )
1157 })
1158 }
1159
1160 fn next_commit_id(&self) -> u64 {
1161 self.commit_seq.fetch_add(1, Ordering::Relaxed) + 1
1164 }
1165
1166 async fn publish_admin(
1167 &self,
1168 event: AdminEvent,
1169 kind: &'static str,
1170 ) -> Result<ChainCommit, AdminError> {
1171 let wire_event = if self.operator_registry.is_some() {
1180 let issued_at_ms = super::meshos::now_ms_since_unix_epoch();
1181 let signature = self.identity.sign_admin_event(&event, issued_at_ms);
1182 MeshOsEvent::SignedAdminCommit {
1183 event,
1184 signature,
1185 issued_at_ms,
1186 }
1187 } else {
1188 MeshOsEvent::AdminEvent(event)
1189 };
1190 self.handle
1191 .publish(wire_event)
1192 .await
1193 .map_err(AdminError::from)?;
1194 Ok(ChainCommit {
1195 commit_id: self.next_commit_id(),
1196 operator_id: self.identity.operator_id,
1197 event_kind: kind,
1198 committed_at: SystemTime::now(),
1199 })
1200 }
1201
1202 async fn publish_signed_ice(
1203 &self,
1204 proposal: IceActionProposal,
1205 signatures: Vec<OperatorSignature>,
1206 issued_at_ms: u64,
1207 blast_hash: super::meshos::BlastRadiusHash,
1208 kind: &'static str,
1209 ) -> Result<ChainCommit, IceError> {
1210 self.handle
1211 .publish(MeshOsEvent::SignedIceCommit {
1212 proposal,
1213 signatures,
1214 issued_at_ms,
1215 blast_hash,
1216 })
1217 .await
1218 .map_err(IceError::from)?;
1219 Ok(ChainCommit {
1220 commit_id: self.next_commit_id(),
1221 operator_id: self.identity.operator_id,
1222 event_kind: kind,
1223 committed_at: SystemTime::now(),
1224 })
1225 }
1226}
1227
1228pub struct AdminCommands<'a> {
1238 client: &'a DeckClient,
1239}
1240
1241impl AdminCommands<'_> {
1242 pub async fn drain(
1251 &self,
1252 node: NodeId,
1253 drain_for: Duration,
1254 ) -> Result<ChainCommit, AdminError> {
1255 self.client
1256 .publish_admin(AdminEvent::Drain { node, drain_for }, "drain")
1257 .await
1258 }
1259
1260 pub async fn enter_maintenance(
1265 &self,
1266 node: NodeId,
1267 drain_for: Option<Duration>,
1268 ) -> Result<ChainCommit, AdminError> {
1269 self.client
1270 .publish_admin(
1271 AdminEvent::EnterMaintenance { node, drain_for },
1272 "enter_maintenance",
1273 )
1274 .await
1275 }
1276
1277 pub async fn exit_maintenance(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
1279 self.client
1280 .publish_admin(AdminEvent::ExitMaintenance { node }, "exit_maintenance")
1281 .await
1282 }
1283
1284 pub async fn cordon(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
1287 self.client
1288 .publish_admin(AdminEvent::Cordon { node }, "cordon")
1289 .await
1290 }
1291
1292 pub async fn uncordon(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
1294 self.client
1295 .publish_admin(AdminEvent::Uncordon { node }, "uncordon")
1296 .await
1297 }
1298
1299 pub async fn drop_replicas(
1301 &self,
1302 node: NodeId,
1303 chains: Vec<ChainId>,
1304 ) -> Result<ChainCommit, AdminError> {
1305 self.client
1306 .publish_admin(AdminEvent::DropReplicas { node, chains }, "drop_replicas")
1307 .await
1308 }
1309
1310 pub async fn invalidate_placement(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
1312 self.client
1313 .publish_admin(
1314 AdminEvent::InvalidatePlacement { node },
1315 "invalidate_placement",
1316 )
1317 .await
1318 }
1319
1320 pub async fn restart_all_daemons(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
1322 self.client
1323 .publish_admin(
1324 AdminEvent::RestartAllDaemons { node },
1325 "restart_all_daemons",
1326 )
1327 .await
1328 }
1329
1330 pub async fn clear_avoid_list(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
1332 self.client
1333 .publish_admin(AdminEvent::ClearAvoidList { node }, "clear_avoid_list")
1334 .await
1335 }
1336}
1337
1338pub use super::meshos::{OperatorRegistry, OperatorSignature, VerifyError};
1343
1344impl OperatorIdentity {
1345 pub fn sign_proposal(
1361 &self,
1362 proposal: &IceActionProposal,
1363 issued_at_ms: u64,
1364 blast_hash: &super::meshos::BlastRadiusHash,
1365 ) -> OperatorSignature {
1366 OperatorSignature::sign(self.keypair(), proposal, issued_at_ms, blast_hash)
1367 }
1368
1369 pub fn sign_admin_event(&self, event: &AdminEvent, issued_at_ms: u64) -> OperatorSignature {
1376 OperatorSignature::sign_admin(self.keypair(), event, issued_at_ms)
1377 }
1378}
1379
1380fn verify_error_to_ice(err: VerifyError) -> IceError {
1384 let kind = err.kind();
1385 IceError::new(kind, err.to_string())
1386}
1387
1388pub struct IceCommands<'a> {
1394 client: &'a DeckClient,
1395}
1396
1397impl<'a> IceCommands<'a> {
1398 pub fn freeze_cluster(&self, ttl: Duration) -> IceProposal<'a> {
1401 IceProposal::new(self.client, IceActionProposal::FreezeCluster { ttl })
1402 }
1403
1404 pub fn flush_avoid_lists(&self, scope: super::meshos::AvoidScope) -> IceProposal<'a> {
1408 IceProposal::new(self.client, IceActionProposal::FlushAvoidLists { scope })
1409 }
1410
1411 pub fn force_evict_replica(&self, chain: ChainId, victim: NodeId) -> IceProposal<'a> {
1416 IceProposal::new(
1417 self.client,
1418 IceActionProposal::ForceEvictReplica { chain, victim },
1419 )
1420 }
1421
1422 pub fn force_restart_daemon(&self, daemon: super::meshos::DaemonRef) -> IceProposal<'a> {
1427 IceProposal::new(
1428 self.client,
1429 IceActionProposal::ForceRestartDaemon { daemon },
1430 )
1431 }
1432
1433 pub fn force_cutover(&self, chain: ChainId, target: NodeId) -> IceProposal<'a> {
1439 IceProposal::new(
1440 self.client,
1441 IceActionProposal::ForceCutover { chain, target },
1442 )
1443 }
1444
1445 pub fn kill_migration(&self, migration: super::meshos::MigrationId) -> IceProposal<'a> {
1453 IceProposal::new(self.client, IceActionProposal::KillMigration { migration })
1454 }
1455
1456 pub fn thaw_cluster(&self) -> IceProposal<'a> {
1458 IceProposal::new(self.client, IceActionProposal::ThawCluster)
1459 }
1460}
1461
1462pub struct IceProposal<'a> {
1478 client: &'a DeckClient,
1479 action: IceActionProposal,
1480 issued_at_ms: u64,
1481}
1482
1483impl<'a> IceProposal<'a> {
1484 fn new(client: &'a DeckClient, action: IceActionProposal) -> Self {
1485 Self {
1486 client,
1487 action,
1488 issued_at_ms: super::meshos::now_ms_since_unix_epoch(),
1489 }
1490 }
1491
1492 pub fn action(&self) -> &IceActionProposal {
1494 &self.action
1495 }
1496
1497 pub fn issued_at_ms(&self) -> u64 {
1503 self.issued_at_ms
1504 }
1505
1506 pub async fn simulate(self) -> Result<SimulatedIceProposal<'a>, IceError> {
1514 let snap = self.client.snapshot_reader.read();
1515 let blast = simulate_ice_proposal(&snap, &self.action);
1516 Ok(SimulatedIceProposal {
1517 client: self.client,
1518 action: self.action,
1519 issued_at_ms: self.issued_at_ms,
1520 blast,
1521 })
1522 }
1523}
1524
1525pub struct SimulatedIceProposal<'a> {
1538 client: &'a DeckClient,
1539 action: IceActionProposal,
1540 issued_at_ms: u64,
1541 blast: BlastRadius,
1542}
1543
1544impl<'a> SimulatedIceProposal<'a> {
1545 pub fn blast_radius(&self) -> &BlastRadius {
1547 &self.blast
1548 }
1549
1550 pub fn action(&self) -> &IceActionProposal {
1552 &self.action
1553 }
1554
1555 pub fn issued_at_ms(&self) -> u64 {
1559 self.issued_at_ms
1560 }
1561
1562 pub fn blast_hash(&self) -> super::meshos::BlastRadiusHash {
1567 super::meshos::blast_radius_hash(&self.blast)
1568 }
1569
1570 pub async fn commit(self, signatures: &[OperatorSignature]) -> Result<ChainCommit, IceError> {
1580 let blast_hash = self.blast_hash();
1581 let threshold = self.client.config.ice_signature_threshold;
1582 if signatures.len() < threshold {
1583 return Err(IceError::new(
1584 "insufficient_signatures",
1585 format!(
1586 "ICE commit requires {} operator signatures; got {}",
1587 threshold,
1588 signatures.len()
1589 ),
1590 ));
1591 }
1592 if let Some(registry) = self.client.operator_registry.as_ref() {
1593 let payload =
1600 ice_proposal_signing_payload(&self.action, self.issued_at_ms, &blast_hash);
1601 let mut unique_operators: std::collections::BTreeSet<u64> =
1602 std::collections::BTreeSet::new();
1603 for sig in signatures {
1604 registry
1605 .verify(sig, &payload)
1606 .map_err(verify_error_to_ice)?;
1607 unique_operators.insert(sig.operator_id);
1608 }
1609 if unique_operators.len() < threshold {
1613 return Err(IceError::new(
1614 "insufficient_signatures",
1615 format!(
1616 "ICE commit requires {} distinct operator signatures; got {} distinct",
1617 threshold,
1618 unique_operators.len()
1619 ),
1620 ));
1621 }
1622 let kind = self.action.kind();
1630 self.client
1631 .publish_signed_ice(
1632 self.action,
1633 signatures.to_vec(),
1634 self.issued_at_ms,
1635 blast_hash,
1636 kind,
1637 )
1638 .await
1639 } else {
1640 let kind = self.action.kind();
1649 let event = self.action.to_admin_event();
1650 self.client.publish_admin(event, kind).await
1651 }
1652 }
1653}
1654
1655pub struct AuditQuery<'a> {
1687 client: &'a DeckClient,
1688 limit: Option<usize>,
1689 operator_filter: Option<u64>,
1690 time_range: Option<(u64, u64)>,
1691 force_only: bool,
1692 since_seq: Option<u64>,
1693}
1694
1695impl<'a> AuditQuery<'a> {
1696 fn new(client: &'a DeckClient) -> Self {
1697 Self {
1698 client,
1699 limit: None,
1700 operator_filter: None,
1701 time_range: None,
1702 force_only: false,
1703 since_seq: None,
1704 }
1705 }
1706
1707 pub fn recent(mut self, limit: usize) -> Self {
1717 self.limit = Some(limit);
1718 self
1719 }
1720
1721 pub fn by_operator(mut self, op_id: u64) -> Self {
1725 self.operator_filter = Some(op_id);
1726 self
1727 }
1728
1729 pub fn between(mut self, start_ms: u64, end_ms: u64) -> Self {
1733 self.time_range = Some((start_ms, end_ms));
1734 self
1735 }
1736
1737 pub fn force_only(mut self) -> Self {
1742 self.force_only = true;
1743 self
1744 }
1745
1746 pub fn since(mut self, since_seq: u64) -> Self {
1762 self.since_seq = Some(since_seq);
1763 self
1764 }
1765
1766 pub fn collect(self) -> Vec<super::meshos::AdminAuditRecord> {
1771 let snap = self.client.snapshot_reader.read();
1772 let mut matched: Vec<super::meshos::AdminAuditRecord> = snap
1773 .admin_audit
1774 .iter()
1775 .filter(|r| {
1776 if let Some(since) = self.since_seq {
1777 if r.seq <= since {
1778 return false;
1779 }
1780 }
1781 if let Some(op_id) = self.operator_filter {
1782 if !r.operator_ids.contains(&op_id) {
1783 return false;
1784 }
1785 }
1786 if let Some((start, end)) = self.time_range {
1787 if r.committed_at_ms < start || r.committed_at_ms > end {
1788 return false;
1789 }
1790 }
1791 if self.force_only && !r.event.is_ice() {
1792 return false;
1793 }
1794 true
1795 })
1796 .cloned()
1797 .collect();
1798 matched.reverse();
1801 if let Some(limit) = self.limit {
1802 matched.truncate(limit);
1803 }
1804 matched
1805 }
1806
1807 pub fn stream(self) -> AuditStream {
1818 AuditStream::new(
1819 self.client.snapshot_reader.clone(),
1820 self.client.config.snapshot_poll_interval,
1821 AuditFilter {
1822 operator: self.operator_filter,
1823 time_range: self.time_range,
1824 force_only: self.force_only,
1825 },
1826 self.since_seq.unwrap_or(0),
1827 )
1828 }
1829}
1830
1831#[derive(Clone, Debug)]
1835struct AuditFilter {
1836 operator: Option<u64>,
1837 time_range: Option<(u64, u64)>,
1838 force_only: bool,
1839}
1840
1841impl AuditFilter {
1842 fn matches(&self, record: &super::meshos::AdminAuditRecord) -> bool {
1843 if let Some(op_id) = self.operator {
1844 if !record.operator_ids.contains(&op_id) {
1845 return false;
1846 }
1847 }
1848 if let Some((start, end)) = self.time_range {
1849 if record.committed_at_ms < start || record.committed_at_ms > end {
1850 return false;
1851 }
1852 }
1853 if self.force_only && !record.event.is_ice() {
1854 return false;
1855 }
1856 true
1857 }
1858}
1859
1860pub struct AuditStream {
1869 reader: super::meshos::MeshOsSnapshotReader,
1870 interval: Interval,
1871 filter: AuditFilter,
1872 last_seq: u64,
1873 queued: std::collections::VecDeque<super::meshos::AdminAuditRecord>,
1878}
1879
1880impl AuditStream {
1881 fn new(
1882 reader: super::meshos::MeshOsSnapshotReader,
1883 poll_interval: Duration,
1884 filter: AuditFilter,
1885 initial_seq_watermark: u64,
1886 ) -> Self {
1887 let poll_interval = poll_interval.max(Duration::from_millis(1));
1888 Self {
1889 reader,
1890 interval: interval(poll_interval),
1891 filter,
1892 last_seq: initial_seq_watermark,
1893 queued: std::collections::VecDeque::new(),
1894 }
1895 }
1896}
1897
1898impl Stream for AuditStream {
1899 type Item = Result<super::meshos::AdminAuditRecord, DeckError>;
1900
1901 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1902 if let Some(record) = self.queued.pop_front() {
1905 return Poll::Ready(Some(Ok(record)));
1906 }
1907 match self.interval.poll_tick(cx) {
1909 Poll::Ready(_) => {
1910 let snap = self.reader.read();
1911 let last_seq = self.last_seq;
1912 let mut max_seq = last_seq;
1914 for record in snap.admin_audit.iter().cloned() {
1915 if record.seq <= last_seq {
1916 continue;
1917 }
1918 if record.seq > max_seq {
1919 max_seq = record.seq;
1920 }
1921 if self.filter.matches(&record) {
1922 self.queued.push_back(record);
1923 }
1924 }
1925 self.last_seq = max_seq;
1926 if let Some(record) = self.queued.pop_front() {
1927 Poll::Ready(Some(Ok(record)))
1928 } else {
1929 Poll::Pending
1935 }
1936 }
1937 Poll::Pending => Poll::Pending,
1938 }
1939 }
1940}
1941
1942#[derive(Clone, Debug, Default)]
1945pub struct LogFilter {
1946 pub min_level: Option<super::meshos::LogLevel>,
1949 pub daemon_id: Option<u64>,
1953 pub node_id: Option<NodeId>,
1957 pub since_seq: Option<u64>,
1962}
1963
1964impl LogFilter {
1965 pub fn new() -> Self {
1967 Self::default()
1968 }
1969
1970 pub fn min_level(mut self, level: super::meshos::LogLevel) -> Self {
1972 self.min_level = Some(level);
1973 self
1974 }
1975
1976 pub fn with_daemon(mut self, daemon_id: u64) -> Self {
1978 self.daemon_id = Some(daemon_id);
1979 self
1980 }
1981
1982 pub fn with_node(mut self, node_id: NodeId) -> Self {
1984 self.node_id = Some(node_id);
1985 self
1986 }
1987
1988 pub fn since(mut self, since_seq: u64) -> Self {
1996 self.since_seq = Some(since_seq);
1997 self
1998 }
1999
2000 fn matches(&self, record: &super::meshos::LogRecord) -> bool {
2001 if let Some(min) = self.min_level {
2002 if record.level < min {
2003 return false;
2004 }
2005 }
2006 if let Some(id) = self.daemon_id {
2007 if record.daemon_id != Some(id) {
2008 return false;
2009 }
2010 }
2011 if let Some(node) = self.node_id {
2012 if record.node_id != Some(node) {
2013 return false;
2014 }
2015 }
2016 true
2017 }
2018}
2019
2020pub struct LogStream {
2025 reader: super::meshos::MeshOsSnapshotReader,
2026 interval: Interval,
2027 filter: LogFilter,
2028 last_seq: u64,
2029 queued: std::collections::VecDeque<super::meshos::LogRecord>,
2030}
2031
2032impl LogStream {
2033 fn new(
2034 reader: super::meshos::MeshOsSnapshotReader,
2035 poll_interval: Duration,
2036 filter: LogFilter,
2037 ) -> Self {
2038 let poll_interval = poll_interval.max(Duration::from_millis(1));
2039 let last_seq = filter.since_seq.unwrap_or(0);
2040 Self {
2041 reader,
2042 interval: interval(poll_interval),
2043 filter,
2044 last_seq,
2045 queued: std::collections::VecDeque::new(),
2046 }
2047 }
2048}
2049
2050impl Stream for LogStream {
2051 type Item = Result<super::meshos::LogRecord, DeckError>;
2052
2053 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2054 if let Some(record) = self.queued.pop_front() {
2055 return Poll::Ready(Some(Ok(record)));
2056 }
2057 match self.interval.poll_tick(cx) {
2058 Poll::Ready(_) => {
2059 let snap = self.reader.read();
2060 let last_seq = self.last_seq;
2061 let mut max_seq = last_seq;
2062 for record in snap.log_ring.iter().cloned() {
2063 if record.seq <= last_seq {
2064 continue;
2065 }
2066 if record.seq > max_seq {
2067 max_seq = record.seq;
2068 }
2069 if self.filter.matches(&record) {
2070 self.queued.push_back(record);
2071 }
2072 }
2073 self.last_seq = max_seq;
2074 if let Some(record) = self.queued.pop_front() {
2075 Poll::Ready(Some(Ok(record)))
2076 } else {
2077 cx.waker().wake_by_ref();
2078 Poll::Pending
2079 }
2080 }
2081 Poll::Pending => Poll::Pending,
2082 }
2083 }
2084}
2085
2086pub struct FailureStream {
2098 reader: super::meshos::MeshOsSnapshotReader,
2099 interval: Interval,
2100 last_seq: u64,
2101 queued: std::collections::VecDeque<super::meshos::FailureRecord>,
2102}
2103
2104impl FailureStream {
2105 fn new(
2106 reader: super::meshos::MeshOsSnapshotReader,
2107 poll_interval: Duration,
2108 initial_seq_watermark: u64,
2109 ) -> Self {
2110 let poll_interval = poll_interval.max(Duration::from_millis(1));
2111 Self {
2112 reader,
2113 interval: interval(poll_interval),
2114 last_seq: initial_seq_watermark,
2115 queued: std::collections::VecDeque::new(),
2116 }
2117 }
2118}
2119
2120impl Stream for FailureStream {
2121 type Item = Result<super::meshos::FailureRecord, DeckError>;
2122
2123 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2124 if let Some(record) = self.queued.pop_front() {
2125 return Poll::Ready(Some(Ok(record)));
2126 }
2127 match self.interval.poll_tick(cx) {
2128 Poll::Ready(_) => {
2129 let snap = self.reader.read();
2130 let last_seq = self.last_seq;
2131 let mut max_seq = last_seq;
2132 for record in snap.recent_failures.iter().cloned() {
2133 if record.seq <= last_seq {
2134 continue;
2135 }
2136 if record.seq > max_seq {
2137 max_seq = record.seq;
2138 }
2139 self.queued.push_back(record);
2140 }
2141 self.last_seq = max_seq;
2142 if let Some(record) = self.queued.pop_front() {
2143 Poll::Ready(Some(Ok(record)))
2144 } else {
2145 cx.waker().wake_by_ref();
2146 Poll::Pending
2147 }
2148 }
2149 Poll::Pending => Poll::Pending,
2150 }
2151 }
2152}
2153
2154type SharedSnapshotChangeRx = Arc<tokio::sync::Mutex<tokio::sync::watch::Receiver<u64>>>;
2161
2162fn next_snapshot_change(
2167 rx: SharedSnapshotChangeRx,
2168) -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
2169 Box::pin(async move {
2170 let mut guard = rx.lock().await;
2171 let _ = guard.changed().await;
2174 })
2175}
2176
2177pub struct SnapshotStream {
2185 reader: MeshOsSnapshotReader,
2186 ceiling: Interval,
2189 change_rx: SharedSnapshotChangeRx,
2193 pending: parking_lot::Mutex<Pin<Box<dyn std::future::Future<Output = ()> + Send>>>,
2199}
2200
2201impl SnapshotStream {
2202 fn new(reader: MeshOsSnapshotReader, poll_interval: Duration) -> Self {
2203 let poll_interval = poll_interval.max(Duration::from_millis(1));
2206 let change_rx: SharedSnapshotChangeRx =
2207 Arc::new(tokio::sync::Mutex::new(reader.subscribe_changes()));
2208 let pending = parking_lot::Mutex::new(next_snapshot_change(change_rx.clone()));
2209 Self {
2210 reader,
2211 ceiling: interval(poll_interval),
2212 change_rx,
2213 pending,
2214 }
2215 }
2216}
2217
2218impl Stream for SnapshotStream {
2219 type Item = Result<MeshOsSnapshot, DeckError>;
2220
2221 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2222 let this = self.get_mut();
2225 let changed = {
2230 let mut pending = this.pending.lock();
2231 let ready = pending.as_mut().poll(cx).is_ready();
2232 if ready {
2233 *pending = next_snapshot_change(this.change_rx.clone());
2234 }
2235 ready
2236 };
2237 let ticked = this.ceiling.poll_tick(cx).is_ready();
2238 if changed || ticked {
2239 Poll::Ready(Some(Ok(this.reader.read())))
2240 } else {
2241 Poll::Pending
2242 }
2243 }
2244}
2245
2246pub struct StatusSummaryStream {
2255 reader: super::meshos::MeshOsSnapshotReader,
2256 ceiling: Interval,
2258 change_rx: SharedSnapshotChangeRx,
2261 pending: parking_lot::Mutex<Pin<Box<dyn std::future::Future<Output = ()> + Send>>>,
2266 last_emitted: Option<StatusSummary>,
2267}
2268
2269impl StatusSummaryStream {
2270 fn new(reader: super::meshos::MeshOsSnapshotReader, poll_interval: Duration) -> Self {
2271 let poll_interval = poll_interval.max(Duration::from_millis(1));
2272 let change_rx: SharedSnapshotChangeRx =
2273 Arc::new(tokio::sync::Mutex::new(reader.subscribe_changes()));
2274 let pending = parking_lot::Mutex::new(next_snapshot_change(change_rx.clone()));
2275 Self {
2276 reader,
2277 ceiling: interval(poll_interval),
2278 change_rx,
2279 pending,
2280 last_emitted: None,
2281 }
2282 }
2283}
2284
2285impl Stream for StatusSummaryStream {
2286 type Item = Result<StatusSummary, DeckError>;
2287
2288 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2289 let this = self.get_mut();
2290 loop {
2297 let changed = {
2298 let mut pending = this.pending.lock();
2299 let ready = pending.as_mut().poll(cx).is_ready();
2300 if ready {
2301 *pending = next_snapshot_change(this.change_rx.clone());
2302 }
2303 ready
2304 };
2305 let ticked = this.ceiling.poll_tick(cx).is_ready();
2306 if !changed && !ticked {
2307 return Poll::Pending;
2308 }
2309 let summary = build_status_summary(&this.reader.read());
2310 let should_emit = match &this.last_emitted {
2311 None => true,
2312 Some(prev) => prev != &summary,
2313 };
2314 if should_emit {
2315 this.last_emitted = Some(summary.clone());
2316 return Poll::Ready(Some(Ok(summary)));
2317 }
2318 }
2320 }
2321}
2322
2323#[cfg(test)]
2324mod tests {
2325 use super::*;
2326 use crate::adapter::net::behavior::meshos::{
2327 LoggingDispatcher, MaintenanceTransition, MeshOsAction, MeshOsConfig,
2328 };
2329
2330 fn fast_config() -> MeshOsConfig {
2331 MeshOsConfig::default()
2332 .with_this_node(42)
2333 .with_tick_interval(Duration::from_millis(10))
2334 .with_event_queue_capacity(64)
2335 .with_action_queue_capacity(64)
2336 }
2337
2338 #[tokio::test]
2339 async fn operator_identity_id_matches_keypair_origin_hash() {
2340 let kp = EntityKeypair::generate();
2341 let origin = kp.origin_hash();
2342 let identity = OperatorIdentity::from_keypair(kp);
2343 assert_eq!(identity.operator_id(), origin);
2344 }
2345
2346 #[tokio::test]
2347 async fn deck_subnet_and_gateway_accessors_default_to_empty_without_mesh() {
2348 let dispatcher = Arc::new(LoggingDispatcher::new());
2354 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2355 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
2356 assert_eq!(deck.local_subnet(), None);
2357 assert!(deck.known_subnets().is_empty());
2358 assert!(deck.gateway_stats().is_none());
2359 assert!(deck.gateway_exports().is_empty());
2360 assert_eq!(deck.channel_visibility("any/name"), None);
2361 assert!(deck.channels().is_empty());
2362 assert_eq!(deck.channel_wire_hash("any/name"), None);
2363 let _ = runtime.shutdown().await;
2364 }
2365
2366 #[tokio::test]
2367 async fn deck_with_mesh_surfaces_local_subnet_and_gateway_stats() {
2368 use crate::adapter::net::{
2374 ChannelConfig, ChannelConfigRegistry, ChannelId, MeshNodeConfig, SubnetId, Visibility,
2375 };
2376 use std::net::SocketAddr;
2377
2378 let dispatcher = Arc::new(LoggingDispatcher::new());
2379 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2380
2381 let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
2382 let mut mesh_cfg = MeshNodeConfig::new(addr, [0x17u8; 32]);
2383 mesh_cfg = mesh_cfg.with_subnet(SubnetId::new(&[3, 7]));
2384 let mut mesh = crate::adapter::net::MeshNode::new(EntityKeypair::generate(), mesh_cfg)
2385 .await
2386 .expect("MeshNode::new");
2387 let registry = Arc::new(ChannelConfigRegistry::new());
2388 let metrics_id = ChannelId::parse("internal/metrics").expect("channel id");
2389 registry.insert(
2390 ChannelConfig::new(metrics_id.clone()).with_visibility(Visibility::SubnetLocal),
2391 );
2392 mesh.set_channel_configs(registry);
2393 let mesh = Arc::new(mesh);
2394
2395 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate())
2396 .with_mesh(mesh.clone());
2397
2398 assert_eq!(deck.local_subnet(), Some(SubnetId::new(&[3, 7])));
2399 let stats = deck.gateway_stats().expect("gateway installed");
2400 assert_eq!(stats.local_subnet, SubnetId::new(&[3, 7]));
2401 assert_eq!(stats.forwarded, 0);
2402 assert_eq!(stats.dropped, 0);
2403 assert_eq!(stats.export_rules, 0);
2404 assert!(stats.peer_subnets.is_empty());
2405
2406 assert_eq!(
2409 deck.channel_visibility("internal/metrics"),
2410 Some(Visibility::SubnetLocal),
2411 );
2412 let channels = deck.channels();
2414 assert_eq!(channels.len(), 1);
2415 assert_eq!(channels[0].0, "internal/metrics");
2416 assert_eq!(channels[0].1, Visibility::SubnetLocal);
2417 assert_eq!(
2419 deck.channel_wire_hash("internal/metrics"),
2420 Some(metrics_id.wire_hash()),
2421 );
2422 assert_eq!(
2423 deck.channel_canonical_hash("internal/metrics"),
2424 Some(metrics_id.hash()),
2425 );
2426
2427 let _ = runtime.shutdown().await;
2428 }
2429
2430 #[tokio::test]
2431 async fn deck_error_display_carries_kind_discriminator() {
2432 let err = DeckError::new("unknown_node", "node 99 is not in the cluster");
2433 let rendered = err.to_string();
2434 assert!(
2435 rendered.contains("<<deck-sdk-kind:unknown_node>>"),
2436 "expected discriminator envelope, got {rendered:?}",
2437 );
2438 }
2439
2440 #[tokio::test]
2441 async fn admin_enter_maintenance_publishes_admin_event_and_returns_commit() {
2442 let dispatcher = Arc::new(LoggingDispatcher::new());
2448 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2449 let identity = OperatorIdentity::generate();
2450 let deck = DeckClient::from_runtime(&runtime, identity.clone());
2451 let commit = deck
2452 .admin()
2453 .enter_maintenance(42, None)
2454 .await
2455 .expect("commit");
2456 assert_eq!(commit.operator_id(), identity.operator_id());
2457 assert_eq!(commit.event_kind(), "enter_maintenance");
2458 assert!(commit.commit_id() >= 1);
2459
2460 tokio::time::sleep(Duration::from_millis(80)).await;
2465 let snap = runtime.snapshot();
2466 assert!(
2467 !matches!(
2468 snap.local_maintenance,
2469 crate::adapter::net::behavior::meshos::MaintenanceStateSnapshot::Active
2470 ),
2471 "local maintenance should have transitioned out of Active, got {:?}",
2472 snap.local_maintenance,
2473 );
2474
2475 let _ = runtime.shutdown().await;
2476 }
2477
2478 #[tokio::test]
2479 async fn admin_drop_replicas_publishes_with_supplied_chain_ids() {
2480 let dispatcher = Arc::new(LoggingDispatcher::new());
2481 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2482 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
2483 let commit = deck
2484 .admin()
2485 .drop_replicas(42, vec![1, 2, 3])
2486 .await
2487 .expect("commit");
2488 assert_eq!(commit.event_kind(), "drop_replicas");
2489 let _ = runtime.shutdown().await;
2490 }
2491
2492 #[tokio::test]
2493 async fn commit_ids_increment_monotonically_per_client() {
2494 let dispatcher = Arc::new(LoggingDispatcher::new());
2495 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2496 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
2497 let a = deck.admin().cordon(42).await.unwrap();
2498 let b = deck.admin().uncordon(42).await.unwrap();
2499 assert!(b.commit_id() > a.commit_id());
2500 let _ = runtime.shutdown().await;
2501 }
2502
2503 #[tokio::test]
2504 async fn snapshot_stream_yields_a_snapshot_per_poll_interval() {
2505 use futures::StreamExt;
2506 let dispatcher = Arc::new(LoggingDispatcher::new());
2507 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2508 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
2509 DeckClientConfig {
2510 snapshot_poll_interval: Duration::from_millis(20),
2511 ..DeckClientConfig::default()
2512 },
2513 );
2514
2515 let mut stream = deck.snapshots();
2516 let first = stream.next().await.expect("first").expect("ok");
2520 let second = stream.next().await.expect("second").expect("ok");
2521 assert_eq!(first.local_maintenance, second.local_maintenance);
2523 let _ = runtime.shutdown().await;
2524 }
2525
2526 #[tokio::test]
2527 async fn snapshot_stream_observes_admin_command_aftermath() {
2528 use futures::StreamExt;
2533 let dispatcher = Arc::new(LoggingDispatcher::new());
2534 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2535 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
2536 DeckClientConfig {
2537 snapshot_poll_interval: Duration::from_millis(15),
2538 ..DeckClientConfig::default()
2539 },
2540 );
2541
2542 let _ = deck.admin().enter_maintenance(42, None).await.unwrap();
2543
2544 let mut stream = deck.snapshots();
2547 let mut saw_transition = false;
2548 for _ in 0..20 {
2549 let snap = stream.next().await.expect("next").expect("ok");
2550 if !matches!(
2551 snap.local_maintenance,
2552 crate::adapter::net::behavior::meshos::MaintenanceStateSnapshot::Active
2553 ) {
2554 saw_transition = true;
2555 break;
2556 }
2557 }
2558 assert!(
2559 saw_transition,
2560 "stream should have surfaced a non-Active local_maintenance after enter_maintenance",
2561 );
2562 let _ = runtime.shutdown().await;
2563 }
2564
2565 #[tokio::test]
2566 async fn change_signal_stays_quiet_on_idle_ticks_and_fires_on_structural_change() {
2567 let dispatcher = Arc::new(LoggingDispatcher::new());
2574 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2575 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
2576
2577 let mut rx = deck.snapshot_reader.subscribe_changes();
2578 tokio::time::sleep(Duration::from_millis(120)).await;
2581 rx.borrow_and_update();
2582
2583 tokio::time::sleep(Duration::from_millis(200)).await;
2587 assert!(
2588 !rx.has_changed().unwrap(),
2589 "change signal fired on idle ticks — per-tick time progression \
2590 must NOT count as a structural change",
2591 );
2592
2593 let p = deck
2596 .ice()
2597 .freeze_cluster(Duration::from_secs(15))
2598 .simulate()
2599 .await
2600 .expect("simulate");
2601 let sig = deck
2602 .identity()
2603 .sign_proposal(p.action(), p.issued_at_ms(), &p.blast_hash());
2604 p.commit(&[sig]).await.expect("commit");
2605
2606 tokio::time::timeout(Duration::from_secs(2), rx.changed())
2607 .await
2608 .expect("a structural change must fire the signal well inside the timeout")
2609 .expect("change sender alive");
2610
2611 rx.borrow_and_update();
2614 tokio::time::sleep(Duration::from_millis(200)).await;
2615 assert!(
2616 !rx.has_changed().unwrap(),
2617 "freeze countdown advancing must not bump the change generation",
2618 );
2619
2620 let _ = runtime.shutdown().await;
2621 }
2622
2623 #[tokio::test]
2624 async fn admin_commit_after_runtime_shutdown_returns_loop_closed_error() {
2625 let dispatcher = Arc::new(LoggingDispatcher::new());
2626 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2627 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
2628 let _ = runtime.shutdown().await;
2629 let err = deck
2634 .admin()
2635 .cordon(42)
2636 .await
2637 .expect_err("publish after shutdown should fail");
2638 assert_eq!(err.kind, "loop_closed");
2639 }
2640
2641 #[allow(dead_code)]
2644 fn _ensure_action_types_are_in_scope() -> (MaintenanceTransition, MeshOsAction) {
2645 (
2646 MaintenanceTransition::EnteringMaintenance,
2647 MeshOsAction::CommitMaintenanceTransition {
2648 node: 0,
2649 target: MaintenanceTransition::EnteringMaintenance,
2650 },
2651 )
2652 }
2653
2654 #[tokio::test]
2662 async fn ice_proposal_commit_with_insufficient_signatures_fails() {
2663 let dispatcher = Arc::new(LoggingDispatcher::new());
2664 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2665 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
2667 DeckClientConfig {
2668 snapshot_poll_interval: Duration::from_millis(100),
2669 ice_signature_threshold: 2,
2670 },
2671 );
2672 let proposal = deck.ice().freeze_cluster(Duration::from_secs(10));
2673 let simulated = proposal.simulate().await.expect("simulate");
2674 let sig = deck.identity().sign_proposal(
2675 simulated.action(),
2676 simulated.issued_at_ms(),
2677 &simulated.blast_hash(),
2678 );
2679 let err = simulated
2680 .commit(&[sig])
2681 .await
2682 .expect_err("under-threshold commit should fail");
2683 assert_eq!(err.kind, "insufficient_signatures");
2684 let _ = runtime.shutdown().await;
2685 }
2686
2687 #[tokio::test]
2688 async fn ice_freeze_proposal_simulate_then_commit_lands_freeze_on_loop() {
2689 let dispatcher = Arc::new(LoggingDispatcher::new());
2690 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2691 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
2692
2693 let proposal = deck.ice().freeze_cluster(Duration::from_secs(30));
2694 let simulated = proposal.simulate().await.expect("simulate");
2695 assert_eq!(
2697 simulated.blast_radius().estimated_drain_delay,
2698 Some(Duration::from_secs(30))
2699 );
2700 let sig = deck.identity().sign_proposal(
2701 simulated.action(),
2702 simulated.issued_at_ms(),
2703 &simulated.blast_hash(),
2704 );
2705 let commit = simulated.commit(&[sig]).await.expect("commit");
2706 assert_eq!(commit.event_kind(), "freeze_cluster");
2707
2708 tokio::time::sleep(Duration::from_millis(80)).await;
2711 let snap = runtime.snapshot();
2712 assert!(
2713 snap.freeze_remaining_ms.is_some(),
2714 "freeze_remaining_ms should be set after committed freeze",
2715 );
2716 let _ = runtime.shutdown().await;
2717 }
2718
2719 #[tokio::test]
2720 async fn ice_thaw_proposal_simulate_warns_no_op_when_unfrozen() {
2721 let dispatcher = Arc::new(LoggingDispatcher::new());
2722 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2723 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
2724
2725 let proposal = deck.ice().thaw_cluster();
2726 let simulated = proposal.simulate().await.expect("simulate");
2727 assert!(simulated.blast_radius().warnings.iter().any(|w| matches!(
2729 w,
2730 crate::adapter::net::behavior::meshos::BlastWarning::ThawHasNoFreezeToCancel
2731 )));
2732 let _ = runtime.shutdown().await;
2733 }
2734
2735 const TEST_BLAST_HASH: super::super::meshos::BlastRadiusHash =
2740 [1u8; super::super::meshos::BLAST_RADIUS_HASH_LEN];
2741
2742 fn _assert_proposal_send_sync_static_check() {
2750 fn _assert_send<T: Send>() {}
2751 fn _assert_send_sync<T: Send + Sync>() {}
2752 _assert_send_sync::<IceProposal<'static>>();
2753 _assert_send_sync::<SimulatedIceProposal<'static>>();
2754 _assert_send::<SnapshotStream>();
2755 _assert_send::<StatusSummaryStream>();
2756 _assert_send::<AuditStream>();
2757 _assert_send::<LogStream>();
2758 _assert_send::<FailureStream>();
2759 }
2760
2761 #[tokio::test]
2762 async fn operator_signature_carries_issuing_operator_id() {
2763 let identity = OperatorIdentity::generate();
2764 let proposal = IceActionProposal::FreezeCluster {
2765 ttl: Duration::from_secs(60),
2766 };
2767 let sig = identity.sign_proposal(
2768 &proposal,
2769 super::super::meshos::now_ms_since_unix_epoch(),
2770 &TEST_BLAST_HASH,
2771 );
2772 assert_eq!(sig.operator_id, identity.operator_id());
2773 assert_eq!(sig.signature.len(), 64);
2775 }
2776
2777 #[tokio::test]
2778 async fn operator_registry_verifies_a_well_formed_signature() {
2779 let identity = OperatorIdentity::generate();
2780 let mut registry = OperatorRegistry::new();
2781 registry.register(identity.keypair());
2782
2783 let proposal = IceActionProposal::FreezeCluster {
2784 ttl: Duration::from_secs(60),
2785 };
2786 let ts = super::super::meshos::now_ms_since_unix_epoch();
2787 let sig = identity.sign_proposal(&proposal, ts, &TEST_BLAST_HASH);
2788 let payload = ice_proposal_signing_payload(&proposal, ts, &TEST_BLAST_HASH);
2789 registry.verify(&sig, &payload).expect("valid signature");
2790 }
2791
2792 #[tokio::test]
2793 async fn operator_registry_rejects_unknown_operator() {
2794 let registry = OperatorRegistry::new();
2795 let identity = OperatorIdentity::generate();
2796 let proposal = IceActionProposal::ThawCluster;
2797 let ts = super::super::meshos::now_ms_since_unix_epoch();
2798 let sig = identity.sign_proposal(&proposal, ts, &TEST_BLAST_HASH);
2799 let payload = ice_proposal_signing_payload(&proposal, ts, &TEST_BLAST_HASH);
2800 let err = registry
2801 .verify(&sig, &payload)
2802 .expect_err("unregistered operator should not verify");
2803 assert_eq!(err.kind(), "not_authorized");
2804 }
2805
2806 #[tokio::test]
2807 async fn operator_registry_rejects_tampered_signature_bytes() {
2808 let identity = OperatorIdentity::generate();
2809 let mut registry = OperatorRegistry::new();
2810 registry.register(identity.keypair());
2811
2812 let proposal = IceActionProposal::FreezeCluster {
2813 ttl: Duration::from_secs(10),
2814 };
2815 let ts = super::super::meshos::now_ms_since_unix_epoch();
2816 let mut sig = identity.sign_proposal(&proposal, ts, &TEST_BLAST_HASH);
2817 sig.signature[0] ^= 0x01;
2819 let payload = ice_proposal_signing_payload(&proposal, ts, &TEST_BLAST_HASH);
2820 let err = registry
2821 .verify(&sig, &payload)
2822 .expect_err("tampered signature should not verify");
2823 assert_eq!(err.kind(), "signature_invalid");
2824 }
2825
2826 #[tokio::test]
2827 async fn operator_registry_rejects_signature_for_wrong_payload() {
2828 let identity = OperatorIdentity::generate();
2833 let mut registry = OperatorRegistry::new();
2834 registry.register(identity.keypair());
2835
2836 let signed_proposal = IceActionProposal::FreezeCluster {
2837 ttl: Duration::from_secs(10),
2838 };
2839 let other_proposal = IceActionProposal::FreezeCluster {
2840 ttl: Duration::from_secs(60),
2841 };
2842 let ts = super::super::meshos::now_ms_since_unix_epoch();
2843 let sig = identity.sign_proposal(&signed_proposal, ts, &TEST_BLAST_HASH);
2844 let payload = ice_proposal_signing_payload(&other_proposal, ts, &TEST_BLAST_HASH);
2845 let err = registry
2846 .verify(&sig, &payload)
2847 .expect_err("cross-proposal signature should not verify");
2848 assert_eq!(err.kind(), "signature_invalid");
2849 }
2850
2851 #[tokio::test]
2852 async fn operator_registry_rejects_wrong_length_signature() {
2853 let identity = OperatorIdentity::generate();
2854 let mut registry = OperatorRegistry::new();
2855 registry.register(identity.keypair());
2856
2857 let proposal = IceActionProposal::ThawCluster;
2858 let sig = OperatorSignature {
2859 operator_id: identity.operator_id(),
2860 signature: vec![0; 32], };
2862 let payload = ice_proposal_signing_payload(
2863 &proposal,
2864 super::super::meshos::now_ms_since_unix_epoch(),
2865 &TEST_BLAST_HASH,
2866 );
2867 let err = registry
2868 .verify(&sig, &payload)
2869 .expect_err("wrong-length signature should not verify");
2870 assert_eq!(err.kind(), "signature_invalid");
2871 }
2872
2873 #[tokio::test]
2874 async fn ice_commit_with_registry_rejects_an_unverified_signature() {
2875 let dispatcher = Arc::new(LoggingDispatcher::new());
2879 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2880 let op_a = OperatorIdentity::generate();
2881 let op_b = OperatorIdentity::generate();
2882 let mut registry = OperatorRegistry::new();
2883 registry.register(op_a.keypair());
2884 registry.register(op_b.keypair());
2885 let deck = DeckClient::new(
2886 runtime.handle_clone(),
2887 runtime.snapshot_reader().clone(),
2888 op_a.clone(),
2889 DeckClientConfig {
2890 snapshot_poll_interval: Duration::from_millis(100),
2891 ice_signature_threshold: 2,
2892 },
2893 )
2894 .with_operator_registry(registry);
2895
2896 let proposal = deck.ice().freeze_cluster(Duration::from_secs(15));
2897 let simulated = proposal.simulate().await.expect("simulate");
2898 let sig_a = op_a.sign_proposal(
2899 simulated.action(),
2900 simulated.issued_at_ms(),
2901 &simulated.blast_hash(),
2902 );
2903 let mut sig_b = op_b.sign_proposal(
2904 simulated.action(),
2905 simulated.issued_at_ms(),
2906 &simulated.blast_hash(),
2907 );
2908 sig_b.signature[3] ^= 0xFF; let err = simulated
2911 .commit(&[sig_a, sig_b])
2912 .await
2913 .expect_err("commit with tampered sig should fail");
2914 assert_eq!(err.kind, "signature_invalid");
2915 let _ = runtime.shutdown().await;
2916 }
2917
2918 #[tokio::test]
2919 async fn ice_flush_avoid_lists_proposal_simulate_and_commit_round_trips() {
2920 use super::super::meshos::AvoidScope;
2921 let dispatcher = Arc::new(LoggingDispatcher::new());
2922 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2923 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
2924 let proposal = deck.ice().flush_avoid_lists(AvoidScope::OnPeer { peer: 5 });
2925 let simulated = proposal.simulate().await.expect("simulate");
2926 assert!(simulated.blast_radius().warnings.iter().any(|w| matches!(
2930 w,
2931 crate::adapter::net::behavior::meshos::BlastWarning::AvoidFlushRecoversPeer { peer: 5 }
2932 )));
2933 let sig = deck.identity().sign_proposal(
2935 simulated.action(),
2936 simulated.issued_at_ms(),
2937 &simulated.blast_hash(),
2938 );
2939 let commit = simulated.commit(&[sig]).await.expect("commit");
2940 assert_eq!(commit.event_kind(), "flush_avoid_lists");
2941 let _ = runtime.shutdown().await;
2942 }
2943
2944 #[tokio::test]
2945 async fn status_summary_stream_emits_initial_summary_immediately() {
2946 use futures::StreamExt;
2947 let dispatcher = Arc::new(LoggingDispatcher::new());
2948 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2949 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
2950 DeckClientConfig {
2951 snapshot_poll_interval: Duration::from_millis(10),
2952 ..DeckClientConfig::default()
2953 },
2954 );
2955 let mut stream = deck.status_summary_stream();
2956 let first = tokio::time::timeout(Duration::from_secs(2), stream.next())
2957 .await
2958 .expect("first timed out")
2959 .expect("first closed")
2960 .expect("first ok");
2961 assert!(first.freeze_remaining_ms.is_none());
2963 assert!(!first.local_maintenance_active);
2964 let _ = runtime.shutdown().await;
2965 }
2966
2967 #[tokio::test]
2968 async fn status_summary_stream_dedups_unchanged_summaries() {
2969 use futures::StreamExt;
2970 let dispatcher = Arc::new(LoggingDispatcher::new());
2971 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2972 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
2973 DeckClientConfig {
2974 snapshot_poll_interval: Duration::from_millis(10),
2975 ..DeckClientConfig::default()
2976 },
2977 );
2978 let mut stream = deck.status_summary_stream();
2979 let _ = tokio::time::timeout(Duration::from_secs(2), stream.next())
2981 .await
2982 .expect("first")
2983 .expect("closed")
2984 .expect("ok");
2985 let second = tokio::time::timeout(Duration::from_millis(80), stream.next()).await;
2987 assert!(
2988 second.is_err(),
2989 "stream should not re-emit unchanged summary"
2990 );
2991 let _ = runtime.shutdown().await;
2992 }
2993
2994 #[tokio::test]
2995 async fn status_summary_stream_re_emits_on_freeze_state_change() {
2996 use futures::StreamExt;
2997 let dispatcher = Arc::new(LoggingDispatcher::new());
2998 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2999 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3000 DeckClientConfig {
3001 snapshot_poll_interval: Duration::from_millis(10),
3002 ..DeckClientConfig::default()
3003 },
3004 );
3005 let mut stream = deck.status_summary_stream();
3006 let first = tokio::time::timeout(Duration::from_secs(2), stream.next())
3007 .await
3008 .expect("first")
3009 .expect("closed")
3010 .expect("ok");
3011 assert!(first.freeze_remaining_ms.is_none());
3012
3013 let p = deck
3017 .ice()
3018 .freeze_cluster(Duration::from_secs(30))
3019 .simulate()
3020 .await
3021 .expect("simulate");
3022 let sig = deck
3023 .identity()
3024 .sign_proposal(p.action(), p.issued_at_ms(), &p.blast_hash());
3025 p.commit(&[sig]).await.expect("freeze");
3026 let after_freeze = tokio::time::timeout(Duration::from_secs(2), stream.next())
3027 .await
3028 .expect("after_freeze timed out")
3029 .expect("after_freeze closed")
3030 .expect("after_freeze ok");
3031 assert!(after_freeze.freeze_remaining_ms.is_some());
3032 assert!(after_freeze.admin_audit_ring_depth >= 1);
3033 let _ = runtime.shutdown().await;
3034 }
3035
3036 #[tokio::test]
3037 async fn status_summary_reflects_steady_state_idle_cluster() {
3038 let dispatcher = Arc::new(LoggingDispatcher::new());
3039 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3040 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3041 let summary = deck.status_summary();
3042 assert_eq!(summary.peers, PeerCounts::default());
3043 assert_eq!(summary.daemons, DaemonCounts::default());
3044 assert_eq!(summary.replica_chains, 0);
3045 assert_eq!(summary.recently_emitted_count, 0);
3046 assert!(summary.freeze_remaining_ms.is_none());
3047 assert!(!summary.local_maintenance_active);
3048 let _ = runtime.shutdown().await;
3049 }
3050
3051 #[tokio::test]
3052 async fn status_summary_flags_freeze_after_freeze_cluster_commit() {
3053 let dispatcher = Arc::new(LoggingDispatcher::new());
3054 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3055 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3056 let p = deck
3057 .ice()
3058 .freeze_cluster(Duration::from_secs(30))
3059 .simulate()
3060 .await
3061 .expect("simulate");
3062 let sig = deck
3063 .identity()
3064 .sign_proposal(p.action(), p.issued_at_ms(), &p.blast_hash());
3065 p.commit(&[sig]).await.expect("freeze");
3066 tokio::time::sleep(Duration::from_millis(60)).await;
3067 let summary = deck.status_summary();
3068 assert!(summary.freeze_remaining_ms.is_some());
3069 assert!(summary.admin_audit_ring_depth >= 1);
3071 let _ = runtime.shutdown().await;
3072 }
3073
3074 #[tokio::test]
3075 async fn status_summary_flags_local_maintenance_after_enter_maintenance() {
3076 let dispatcher = Arc::new(LoggingDispatcher::new());
3077 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3078 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3079 deck.admin()
3081 .enter_maintenance(42, None)
3082 .await
3083 .expect("commit");
3084 tokio::time::sleep(Duration::from_millis(60)).await;
3085 let summary = deck.status_summary();
3086 assert!(
3087 summary.local_maintenance_active,
3088 "local_maintenance_active should flip on after enter_maintenance",
3089 );
3090 let _ = runtime.shutdown().await;
3091 }
3092
3093 #[tokio::test]
3094 async fn subscribe_failures_yields_seeded_dispatcher_rejection() {
3095 use crate::adapter::net::behavior::meshos::DispatchError;
3096 use futures::StreamExt;
3097 let dispatcher = Arc::new(LoggingDispatcher::new());
3098 dispatcher.fail_next(DispatchError::drop("first"));
3099 let runtime = MeshOsRuntime::start(fast_config(), Arc::clone(&dispatcher));
3100 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3101 DeckClientConfig {
3102 snapshot_poll_interval: Duration::from_millis(15),
3103 ..DeckClientConfig::default()
3104 },
3105 );
3106 let mut stream = deck.subscribe_failures(0);
3107
3108 deck.admin().enter_maintenance(42, None).await.unwrap();
3109
3110 let record = tokio::time::timeout(Duration::from_secs(2), stream.next())
3111 .await
3112 .expect("timed out")
3113 .expect("closed")
3114 .expect("ok");
3115 assert!(record.seq > 0);
3118 assert!(record.reason.contains("first"));
3119 let _ = runtime.shutdown().await;
3120 }
3121
3122 #[tokio::test]
3123 async fn subscribe_failures_since_seq_drops_already_seen() {
3124 use crate::adapter::net::behavior::meshos::DispatchError;
3125 use futures::StreamExt;
3126 let dispatcher = Arc::new(LoggingDispatcher::new());
3127 dispatcher.fail_next(DispatchError::drop("first"));
3128 let runtime = MeshOsRuntime::start(fast_config(), Arc::clone(&dispatcher));
3129 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3130 DeckClientConfig {
3131 snapshot_poll_interval: Duration::from_millis(15),
3132 ..DeckClientConfig::default()
3133 },
3134 );
3135
3136 deck.admin().enter_maintenance(42, None).await.unwrap();
3137 let deadline = std::time::Instant::now() + Duration::from_secs(2);
3139 let mut seq_seen = 0u64;
3140 while std::time::Instant::now() < deadline {
3141 let all = deck.recent_failures();
3142 if let Some(r) = all.last() {
3143 seq_seen = r.seq;
3144 break;
3145 }
3146 tokio::time::sleep(Duration::from_millis(20)).await;
3147 }
3148 assert!(seq_seen > 0);
3149
3150 let mut stream = deck.subscribe_failures(seq_seen);
3153 let parked = tokio::time::timeout(Duration::from_millis(60), stream.next()).await;
3154 assert!(parked.is_err(), "no new failures means parked stream");
3155 let _ = runtime.shutdown().await;
3156 }
3157
3158 #[tokio::test]
3159 async fn recent_failures_surfaces_dispatcher_rejections() {
3160 use crate::adapter::net::behavior::meshos::DispatchError;
3161 let dispatcher = Arc::new(LoggingDispatcher::new());
3162 dispatcher.fail_next(DispatchError::drop("synthetic rejection"));
3163 let runtime = MeshOsRuntime::start(fast_config(), Arc::clone(&dispatcher));
3164 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3165
3166 deck.admin()
3170 .enter_maintenance(42, None)
3171 .await
3172 .expect("commit");
3173
3174 let deadline = std::time::Instant::now() + Duration::from_secs(2);
3176 let mut got: Vec<crate::adapter::net::behavior::meshos::FailureRecord> = Vec::new();
3177 while std::time::Instant::now() < deadline {
3178 got = deck.recent_failures();
3179 if !got.is_empty() {
3180 break;
3181 }
3182 tokio::time::sleep(Duration::from_millis(20)).await;
3183 }
3184 assert!(
3185 !got.is_empty(),
3186 "recent_failures should reflect the seeded dispatcher rejection",
3187 );
3188 assert!(got[0].reason.contains("synthetic rejection"));
3189 let _ = runtime.shutdown().await;
3190 }
3191
3192 #[tokio::test]
3193 async fn recent_failures_since_drops_records_at_or_below_cutoff() {
3194 use crate::adapter::net::behavior::meshos::DispatchError;
3195 let dispatcher = Arc::new(LoggingDispatcher::new());
3196 dispatcher.fail_next(DispatchError::drop("first failure"));
3197 let runtime = MeshOsRuntime::start(fast_config(), Arc::clone(&dispatcher));
3198 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3199
3200 deck.admin()
3201 .enter_maintenance(42, None)
3202 .await
3203 .expect("commit");
3204 let deadline = std::time::Instant::now() + Duration::from_secs(2);
3205 let mut all: Vec<crate::adapter::net::behavior::meshos::FailureRecord> = Vec::new();
3206 while std::time::Instant::now() < deadline {
3207 all = deck.recent_failures();
3208 if !all.is_empty() {
3209 break;
3210 }
3211 tokio::time::sleep(Duration::from_millis(20)).await;
3212 }
3213 assert!(!all.is_empty(), "seed failure should land");
3214
3215 let cutoff = all[0].recorded_at_ms;
3218 let after = deck.recent_failures_since(cutoff);
3219 assert!(
3220 after.iter().all(|r| r.recorded_at_ms > cutoff),
3221 "since filter should drop records at the cutoff",
3222 );
3223 assert!(after.iter().all(|r| r.reason != "first failure"));
3226 let _ = runtime.shutdown().await;
3227 }
3228
3229 #[tokio::test]
3230 async fn per_field_accessors_match_full_snapshot_contents() {
3231 let dispatcher = Arc::new(LoggingDispatcher::new());
3232 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3233 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3234
3235 let snap = deck.status();
3236 assert_eq!(deck.peers(), snap.peers);
3237 assert_eq!(deck.daemons(), snap.daemons);
3238 assert_eq!(deck.replicas(), snap.replicas);
3239 assert_eq!(deck.local_maintenance(), snap.local_maintenance);
3240 assert_eq!(deck.freeze_remaining_ms(), snap.freeze_remaining_ms);
3241 let _ = runtime.shutdown().await;
3242 }
3243
3244 #[tokio::test]
3245 async fn status_returns_freshest_snapshot_synchronously() {
3246 let dispatcher = Arc::new(LoggingDispatcher::new());
3247 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3248 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3249
3250 let s = deck.status();
3252 assert!(matches!(
3253 s.local_maintenance,
3254 crate::adapter::net::behavior::meshos::MaintenanceStateSnapshot::Active
3255 ));
3256
3257 let p = deck
3260 .ice()
3261 .freeze_cluster(Duration::from_secs(20))
3262 .simulate()
3263 .await
3264 .expect("simulate");
3265 let sig = deck
3266 .identity()
3267 .sign_proposal(p.action(), p.issued_at_ms(), &p.blast_hash());
3268 p.commit(&[sig]).await.expect("commit");
3269 tokio::time::sleep(Duration::from_millis(60)).await;
3270 let s = deck.status();
3271 assert!(s.freeze_remaining_ms.is_some());
3272 let _ = runtime.shutdown().await;
3273 }
3274
3275 #[tokio::test]
3276 async fn watch_resolves_immediately_when_predicate_already_true() {
3277 let dispatcher = Arc::new(LoggingDispatcher::new());
3278 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3279 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3280 let snap = tokio::time::timeout(
3283 Duration::from_millis(50),
3284 deck.watch(|s| s.freeze_remaining_ms.is_none()),
3285 )
3286 .await
3287 .expect("watch should not block when predicate already holds");
3288 assert!(snap.freeze_remaining_ms.is_none());
3289 let _ = runtime.shutdown().await;
3290 }
3291
3292 #[tokio::test]
3293 async fn watch_resolves_when_predicate_becomes_true_after_admin_commit() {
3294 let dispatcher = Arc::new(LoggingDispatcher::new());
3295 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3296 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3297 DeckClientConfig {
3298 snapshot_poll_interval: Duration::from_millis(10),
3299 ..DeckClientConfig::default()
3300 },
3301 );
3302
3303 let deck_handle = deck.snapshot_reader.clone();
3305 let watcher = {
3306 let identity = deck.identity().clone();
3307 let config = deck.config.clone();
3308 let handle = deck.handle.clone();
3309 let client = DeckClient::new(handle, deck_handle.clone(), identity, config);
3315 tokio::spawn(async move { client.watch(|s| s.freeze_remaining_ms.is_some()).await })
3316 };
3317
3318 tokio::time::sleep(Duration::from_millis(40)).await;
3320 let p = deck
3321 .ice()
3322 .freeze_cluster(Duration::from_secs(15))
3323 .simulate()
3324 .await
3325 .expect("simulate");
3326 let sig = deck
3327 .identity()
3328 .sign_proposal(p.action(), p.issued_at_ms(), &p.blast_hash());
3329 p.commit(&[sig]).await.expect("commit");
3330
3331 let snap = tokio::time::timeout(Duration::from_secs(2), watcher)
3332 .await
3333 .expect("watcher should resolve")
3334 .expect("join");
3335 assert!(snap.freeze_remaining_ms.is_some());
3336 let _ = runtime.shutdown().await;
3337 }
3338
3339 #[tokio::test]
3340 async fn watch_is_event_driven_resolving_far_under_the_poll_ceiling() {
3341 let dispatcher = Arc::new(LoggingDispatcher::new());
3347 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3348 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3349 DeckClientConfig {
3350 snapshot_poll_interval: Duration::from_secs(30),
3351 ..DeckClientConfig::default()
3352 },
3353 );
3354
3355 let watcher = {
3356 let client = DeckClient::new(
3357 deck.handle.clone(),
3358 deck.snapshot_reader.clone(),
3359 deck.identity().clone(),
3360 deck.config.clone(),
3361 );
3362 tokio::spawn(async move { client.watch(|s| s.freeze_remaining_ms.is_some()).await })
3363 };
3364
3365 tokio::time::sleep(Duration::from_millis(40)).await;
3366 let started = std::time::Instant::now();
3367 let p = deck
3368 .ice()
3369 .freeze_cluster(Duration::from_secs(15))
3370 .simulate()
3371 .await
3372 .expect("simulate");
3373 let sig = deck
3374 .identity()
3375 .sign_proposal(p.action(), p.issued_at_ms(), &p.blast_hash());
3376 p.commit(&[sig]).await.expect("commit");
3377
3378 let snap = tokio::time::timeout(Duration::from_secs(2), watcher)
3379 .await
3380 .expect("watch must resolve far inside the 30s ceiling")
3381 .expect("join");
3382 assert!(snap.freeze_remaining_ms.is_some());
3383 assert!(
3384 started.elapsed() < Duration::from_secs(5),
3385 "watch took {:?}, expected ≪ 30s ceiling — not event-driven",
3386 started.elapsed(),
3387 );
3388 let _ = runtime.shutdown().await;
3389 }
3390
3391 #[tokio::test]
3392 async fn watch_timeout_returns_watch_timeout_error_when_predicate_never_holds() {
3393 let dispatcher = Arc::new(LoggingDispatcher::new());
3394 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3395 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3396 DeckClientConfig {
3397 snapshot_poll_interval: Duration::from_millis(10),
3398 ..DeckClientConfig::default()
3399 },
3400 );
3401
3402 let err = deck
3403 .watch_timeout(
3404 |s| s.freeze_remaining_ms.is_some(),
3405 Duration::from_millis(80),
3406 )
3407 .await
3408 .expect_err("predicate never holds, should time out");
3409 assert_eq!(err.kind, "watch_timeout");
3410 let _ = runtime.shutdown().await;
3411 }
3412
3413 #[tokio::test]
3414 async fn audit_since_filter_drops_records_at_or_below_watermark() {
3415 let dispatcher = Arc::new(LoggingDispatcher::new());
3416 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3417 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3418
3419 deck.admin().cordon(42).await.unwrap();
3420 deck.admin().uncordon(42).await.unwrap();
3421 deck.admin().invalidate_placement(42).await.unwrap();
3422 tokio::time::sleep(Duration::from_millis(80)).await;
3423
3424 let all = deck.audit().collect();
3425 assert_eq!(all.len(), 3);
3426 let middle_seq = all[1].seq;
3430 let after_middle = deck.audit().since(middle_seq).collect();
3431 assert_eq!(after_middle.len(), 1, "since should keep only seq > middle");
3432 assert!(after_middle[0].seq > middle_seq);
3433 let _ = runtime.shutdown().await;
3434 }
3435
3436 #[tokio::test]
3437 async fn audit_stream_since_seeds_initial_watermark() {
3438 use futures::StreamExt;
3439 let dispatcher = Arc::new(LoggingDispatcher::new());
3440 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3441 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3442 DeckClientConfig {
3443 snapshot_poll_interval: Duration::from_millis(15),
3444 ..DeckClientConfig::default()
3445 },
3446 );
3447
3448 deck.admin().cordon(42).await.unwrap();
3450 deck.admin().uncordon(42).await.unwrap();
3451 deck.admin().invalidate_placement(42).await.unwrap();
3452 tokio::time::sleep(Duration::from_millis(80)).await;
3453
3454 let all = deck.audit().collect();
3455 assert_eq!(all.len(), 3);
3456 let middle_seq = all[1].seq;
3460 let mut stream = deck.audit().since(middle_seq).stream();
3461 let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
3462 .await
3463 .expect("timed out")
3464 .expect("closed")
3465 .expect("ok");
3466 assert!(next.seq > middle_seq);
3467 let parked = tokio::time::timeout(Duration::from_millis(40), stream.next()).await;
3469 assert!(
3470 parked.is_err(),
3471 "stream should park after watermark catches up"
3472 );
3473 let _ = runtime.shutdown().await;
3474 }
3475
3476 #[tokio::test]
3477 async fn log_filter_since_seeds_stream_watermark() {
3478 use crate::adapter::net::behavior::meshos::{LogLine, MeshOsEvent};
3479 use futures::StreamExt;
3480 let dispatcher = Arc::new(LoggingDispatcher::new());
3481 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3482 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3483 DeckClientConfig {
3484 snapshot_poll_interval: Duration::from_millis(15),
3485 ..DeckClientConfig::default()
3486 },
3487 );
3488
3489 for i in 0..3 {
3491 runtime
3492 .handle()
3493 .publish(MeshOsEvent::LogLine(LogLine::info(
3494 None,
3495 format!("msg {i}"),
3496 )))
3497 .await
3498 .unwrap();
3499 }
3500 tokio::time::sleep(Duration::from_millis(80)).await;
3501
3502 let snap = runtime.snapshot();
3503 assert_eq!(snap.log_ring.len(), 3);
3504 let middle_seq = snap.log_ring[1].seq;
3505
3506 let mut stream = deck.subscribe_logs(LogFilter::new().since(middle_seq));
3509 let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
3510 .await
3511 .expect("timed out")
3512 .expect("closed")
3513 .expect("ok");
3514 assert!(next.seq > middle_seq);
3515 assert_eq!(next.message, "msg 2");
3516 let _ = runtime.shutdown().await;
3517 }
3518
3519 #[tokio::test]
3520 async fn subscribe_logs_yields_published_log_lines_in_seq_order() {
3521 use crate::adapter::net::behavior::meshos::{LogLevel, LogLine, MeshOsEvent};
3522 use futures::StreamExt;
3523 let dispatcher = Arc::new(LoggingDispatcher::new());
3524 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3525 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3526 DeckClientConfig {
3527 snapshot_poll_interval: Duration::from_millis(15),
3528 ..DeckClientConfig::default()
3529 },
3530 );
3531
3532 let mut stream = deck.subscribe_logs(LogFilter::new());
3533 for (i, level) in [LogLevel::Info, LogLevel::Warn, LogLevel::Error]
3534 .into_iter()
3535 .enumerate()
3536 {
3537 runtime
3538 .handle()
3539 .publish(MeshOsEvent::LogLine(LogLine {
3540 level,
3541 daemon_id: Some(7),
3542 message: format!("msg {}", i),
3543 }))
3544 .await
3545 .unwrap();
3546 }
3547
3548 let r1 = tokio::time::timeout(Duration::from_secs(2), stream.next())
3549 .await
3550 .expect("r1 timed out")
3551 .expect("r1 closed")
3552 .expect("r1 ok");
3553 let r2 = tokio::time::timeout(Duration::from_secs(2), stream.next())
3554 .await
3555 .expect("r2 timed out")
3556 .expect("r2 closed")
3557 .expect("r2 ok");
3558 let r3 = tokio::time::timeout(Duration::from_secs(2), stream.next())
3559 .await
3560 .expect("r3 timed out")
3561 .expect("r3 closed")
3562 .expect("r3 ok");
3563 assert!(r1.seq < r2.seq);
3564 assert!(r2.seq < r3.seq);
3565 assert_eq!(r1.level, LogLevel::Info);
3566 assert_eq!(r3.level, LogLevel::Error);
3567 assert_eq!(r1.node_id, Some(42));
3570 let _ = runtime.shutdown().await;
3571 }
3572
3573 #[tokio::test]
3574 async fn subscribe_logs_min_level_filter_drops_below_threshold() {
3575 use crate::adapter::net::behavior::meshos::{LogLevel, LogLine, MeshOsEvent};
3576 use futures::StreamExt;
3577 let dispatcher = Arc::new(LoggingDispatcher::new());
3578 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3579 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3580 DeckClientConfig {
3581 snapshot_poll_interval: Duration::from_millis(15),
3582 ..DeckClientConfig::default()
3583 },
3584 );
3585
3586 let mut stream = deck.subscribe_logs(LogFilter::new().min_level(LogLevel::Warn));
3587 runtime
3588 .handle()
3589 .publish(MeshOsEvent::LogLine(LogLine::info(None, "info dropped")))
3590 .await
3591 .unwrap();
3592 runtime
3593 .handle()
3594 .publish(MeshOsEvent::LogLine(LogLine::warn(None, "warn kept")))
3595 .await
3596 .unwrap();
3597
3598 let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
3599 .await
3600 .expect("next timed out")
3601 .expect("next closed")
3602 .expect("next ok");
3603 assert_eq!(next.level, LogLevel::Warn);
3604 assert_eq!(next.message, "warn kept");
3605 let _ = runtime.shutdown().await;
3606 }
3607
3608 #[tokio::test]
3609 async fn subscribe_logs_with_daemon_filter_keeps_only_matching_daemon() {
3610 use crate::adapter::net::behavior::meshos::{LogLine, MeshOsEvent};
3611 use futures::StreamExt;
3612 let dispatcher = Arc::new(LoggingDispatcher::new());
3613 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3614 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3615 DeckClientConfig {
3616 snapshot_poll_interval: Duration::from_millis(15),
3617 ..DeckClientConfig::default()
3618 },
3619 );
3620
3621 let mut stream = deck.subscribe_logs(LogFilter::new().with_daemon(7));
3622 runtime
3623 .handle()
3624 .publish(MeshOsEvent::LogLine(LogLine::info(
3625 Some(99),
3626 "other daemon",
3627 )))
3628 .await
3629 .unwrap();
3630 runtime
3631 .handle()
3632 .publish(MeshOsEvent::LogLine(LogLine::info(
3633 Some(7),
3634 "target daemon",
3635 )))
3636 .await
3637 .unwrap();
3638
3639 let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
3640 .await
3641 .expect("next timed out")
3642 .expect("next closed")
3643 .expect("next ok");
3644 assert_eq!(next.daemon_id, Some(7));
3645 assert_eq!(next.message, "target daemon");
3646 let _ = runtime.shutdown().await;
3647 }
3648
3649 #[tokio::test]
3650 async fn audit_stream_emits_one_record_per_signed_commit_in_order() {
3651 use futures::StreamExt;
3652 let dispatcher = Arc::new(LoggingDispatcher::new());
3653 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3654 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3655 DeckClientConfig {
3656 snapshot_poll_interval: Duration::from_millis(15),
3657 ..DeckClientConfig::default()
3658 },
3659 );
3660
3661 let mut stream = deck.audit().stream();
3662 let first_attempt = tokio::time::timeout(Duration::from_millis(40), stream.next()).await;
3665 assert!(first_attempt.is_err(), "stream should park when no records");
3666
3667 deck.admin().cordon(42).await.unwrap();
3670 deck.admin().uncordon(42).await.unwrap();
3671 deck.admin().invalidate_placement(42).await.unwrap();
3672
3673 let r1 = tokio::time::timeout(Duration::from_secs(2), stream.next())
3674 .await
3675 .expect("r1 timed out")
3676 .expect("r1 closed")
3677 .expect("r1 ok");
3678 let r2 = tokio::time::timeout(Duration::from_secs(2), stream.next())
3679 .await
3680 .expect("r2 timed out")
3681 .expect("r2 closed")
3682 .expect("r2 ok");
3683 let r3 = tokio::time::timeout(Duration::from_secs(2), stream.next())
3684 .await
3685 .expect("r3 timed out")
3686 .expect("r3 closed")
3687 .expect("r3 ok");
3688
3689 assert!(r1.seq < r2.seq);
3692 assert!(r2.seq < r3.seq);
3693 let _ = runtime.shutdown().await;
3694 }
3695
3696 #[tokio::test]
3697 async fn audit_stream_dedups_already_seen_records_across_polls() {
3698 use futures::StreamExt;
3703 let dispatcher = Arc::new(LoggingDispatcher::new());
3704 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3705 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3706 DeckClientConfig {
3707 snapshot_poll_interval: Duration::from_millis(10),
3708 ..DeckClientConfig::default()
3709 },
3710 );
3711
3712 deck.admin().cordon(42).await.unwrap();
3713 let mut stream = deck.audit().stream();
3714 let first = tokio::time::timeout(Duration::from_secs(2), stream.next())
3715 .await
3716 .expect("first timed out")
3717 .expect("first closed")
3718 .expect("first ok");
3719
3720 let second_attempt = tokio::time::timeout(Duration::from_millis(50), stream.next()).await;
3723 assert!(
3724 second_attempt.is_err(),
3725 "stream should not re-emit seen record"
3726 );
3727
3728 deck.admin().uncordon(42).await.unwrap();
3730 let second = tokio::time::timeout(Duration::from_secs(2), stream.next())
3731 .await
3732 .expect("second timed out")
3733 .expect("second closed")
3734 .expect("second ok");
3735 assert!(second.seq > first.seq);
3736 let _ = runtime.shutdown().await;
3737 }
3738
3739 #[tokio::test]
3740 async fn audit_stream_applies_force_only_filter_in_tail_mode() {
3741 use futures::StreamExt;
3744 let dispatcher = Arc::new(LoggingDispatcher::new());
3745 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3746 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3747 DeckClientConfig {
3748 snapshot_poll_interval: Duration::from_millis(10),
3749 ..DeckClientConfig::default()
3750 },
3751 );
3752
3753 let mut stream = deck.audit().force_only().stream();
3754 deck.admin().cordon(42).await.unwrap();
3755 let thaw = deck
3756 .ice()
3757 .thaw_cluster()
3758 .simulate()
3759 .await
3760 .expect("simulate");
3761 let sig =
3762 deck.identity()
3763 .sign_proposal(thaw.action(), thaw.issued_at_ms(), &thaw.blast_hash());
3764 thaw.commit(&[sig]).await.unwrap();
3765
3766 let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
3767 .await
3768 .expect("next timed out")
3769 .expect("next closed")
3770 .expect("next ok");
3771 assert!(next.event.is_ice());
3773 let _ = runtime.shutdown().await;
3774 }
3775
3776 #[tokio::test]
3777 async fn audit_query_returns_empty_when_no_ice_commits_observed() {
3778 let dispatcher = Arc::new(LoggingDispatcher::new());
3779 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3780 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3781 let results = deck.audit().recent(10).collect();
3782 assert!(results.is_empty());
3783 let _ = runtime.shutdown().await;
3784 }
3785
3786 #[tokio::test]
3787 async fn audit_query_returns_recent_entries_newest_first() {
3788 use crate::adapter::net::behavior::meshos::{IceActionProposal, MeshOsEvent};
3796 let dispatcher = Arc::new(LoggingDispatcher::new());
3797 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3798 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3799
3800 for ttl_secs in [10, 20, 30] {
3801 runtime
3802 .handle()
3803 .publish(MeshOsEvent::SignedIceCommit {
3804 proposal: IceActionProposal::FreezeCluster {
3805 ttl: Duration::from_secs(ttl_secs),
3806 },
3807 signatures: Vec::new(),
3808 issued_at_ms: super::super::meshos::now_ms_since_unix_epoch(),
3809 blast_hash: TEST_BLAST_HASH,
3810 })
3811 .await
3812 .unwrap();
3813 }
3814 tokio::time::sleep(Duration::from_millis(80)).await;
3815 let all = deck.audit().collect();
3816 assert_eq!(all.len(), 3, "ring should hold all three entries");
3817 assert!(matches!(
3820 all[0].event,
3821 AdminEvent::FreezeCluster { ttl } if ttl == Duration::from_secs(30)
3822 ));
3823 assert!(matches!(
3824 all[2].event,
3825 AdminEvent::FreezeCluster { ttl } if ttl == Duration::from_secs(10)
3826 ));
3827
3828 let recent_one = deck.audit().recent(1).collect();
3829 assert_eq!(recent_one.len(), 1);
3830 assert!(matches!(
3831 recent_one[0].event,
3832 AdminEvent::FreezeCluster { ttl } if ttl == Duration::from_secs(30)
3833 ));
3834 let _ = runtime.shutdown().await;
3835 }
3836
3837 #[tokio::test]
3838 async fn audit_query_filters_by_operator_id() {
3839 use crate::adapter::net::behavior::meshos::{IceActionProposal, MeshOsEvent};
3840 let dispatcher = Arc::new(LoggingDispatcher::new());
3841 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3842 let op_a = OperatorIdentity::generate();
3843 let op_b = OperatorIdentity::generate();
3844 let deck = DeckClient::from_runtime(&runtime, op_a.clone());
3845
3846 let proposal_a = IceActionProposal::FreezeCluster {
3848 ttl: Duration::from_secs(10),
3849 };
3850 let ts_a = super::super::meshos::now_ms_since_unix_epoch();
3851 let sig_a = OperatorSignature::sign(op_a.keypair(), &proposal_a, ts_a, &TEST_BLAST_HASH);
3852 runtime
3853 .handle()
3854 .publish(MeshOsEvent::SignedIceCommit {
3855 proposal: proposal_a,
3856 signatures: vec![sig_a],
3857 issued_at_ms: ts_a,
3858 blast_hash: TEST_BLAST_HASH,
3859 })
3860 .await
3861 .unwrap();
3862 let proposal_b = IceActionProposal::ThawCluster;
3864 let ts_b = super::super::meshos::now_ms_since_unix_epoch();
3865 let sig_b = OperatorSignature::sign(op_b.keypair(), &proposal_b, ts_b, &TEST_BLAST_HASH);
3866 runtime
3867 .handle()
3868 .publish(MeshOsEvent::SignedIceCommit {
3869 proposal: proposal_b,
3870 signatures: vec![sig_b],
3871 issued_at_ms: ts_b,
3872 blast_hash: TEST_BLAST_HASH,
3873 })
3874 .await
3875 .unwrap();
3876 tokio::time::sleep(Duration::from_millis(80)).await;
3877
3878 let filtered = deck.audit().by_operator(op_a.operator_id()).collect();
3879 assert_eq!(filtered.len(), 1);
3880 assert!(matches!(
3881 filtered[0].event,
3882 AdminEvent::FreezeCluster { .. }
3883 ));
3884 let _ = runtime.shutdown().await;
3885 }
3886
3887 #[tokio::test]
3888 async fn audit_query_force_only_drops_ordinary_admin_keeps_ice() {
3889 let dispatcher = Arc::new(LoggingDispatcher::new());
3892 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3893 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3894
3895 deck.admin().cordon(42).await.expect("cordon");
3896 let thaw = deck
3897 .ice()
3898 .thaw_cluster()
3899 .simulate()
3900 .await
3901 .expect("simulate");
3902 let sig =
3903 deck.identity()
3904 .sign_proposal(thaw.action(), thaw.issued_at_ms(), &thaw.blast_hash());
3905 thaw.commit(&[sig]).await.expect("thaw");
3906 tokio::time::sleep(Duration::from_millis(80)).await;
3907
3908 let baseline = deck.audit().collect();
3909 assert_eq!(
3910 baseline.len(),
3911 2,
3912 "ring should hold both ordinary and ICE commits"
3913 );
3914 let force_only = deck.audit().force_only().collect();
3915 assert_eq!(force_only.len(), 1, "force_only should drop Cordon");
3916 assert!(force_only[0].event.is_ice());
3917 let _ = runtime.shutdown().await;
3918 }
3919
3920 #[tokio::test]
3921 async fn admin_commit_routes_through_signed_path_when_registry_installed() {
3922 use std::sync::Arc as SArc;
3927 let dispatcher = Arc::new(LoggingDispatcher::new());
3928 let identity = OperatorIdentity::generate();
3929 let mut registry = OperatorRegistry::new();
3930 registry.register(identity.keypair());
3931 let verifier = SArc::new(crate::adapter::net::behavior::meshos::AdminVerifier::new(
3932 SArc::new(registry.clone()),
3933 1,
3934 ));
3935 let runtime = MeshOsRuntime::start_with_all(
3936 fast_config(),
3937 dispatcher,
3938 Default::default(),
3939 Default::default(),
3940 SArc::new(crate::adapter::net::compute::DaemonRegistry::new()),
3941 None,
3942 Some(verifier),
3943 );
3944 let deck =
3945 DeckClient::from_runtime(&runtime, identity.clone()).with_operator_registry(registry);
3946
3947 let commit = deck.admin().cordon(42).await.expect("commit");
3948 assert_eq!(commit.event_kind(), "cordon");
3949
3950 tokio::time::sleep(Duration::from_millis(80)).await;
3953 let entries = deck.audit().collect();
3954 assert_eq!(entries.len(), 1);
3955 assert!(matches!(
3956 entries[0].outcome,
3957 crate::adapter::net::behavior::meshos::VerificationOutcome::Accepted
3958 ));
3959 assert_eq!(entries[0].operator_ids, vec![identity.operator_id()]);
3960 let _ = runtime.shutdown().await;
3961 }
3962
3963 #[tokio::test]
3964 async fn admin_commit_falls_back_to_unsigned_when_no_registry_installed() {
3965 let dispatcher = Arc::new(LoggingDispatcher::new());
3969 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3970 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3971
3972 deck.admin().cordon(42).await.expect("commit");
3973 tokio::time::sleep(Duration::from_millis(80)).await;
3974
3975 let entries = deck.audit().collect();
3976 assert_eq!(entries.len(), 1);
3977 assert!(matches!(
3978 entries[0].outcome,
3979 crate::adapter::net::behavior::meshos::VerificationOutcome::Unverified
3980 ));
3981 assert!(entries[0].operator_ids.is_empty());
3982 let _ = runtime.shutdown().await;
3983 }
3984
3985 #[tokio::test]
3986 async fn audit_ring_records_unsigned_admin_with_unverified_outcome() {
3987 let dispatcher = Arc::new(LoggingDispatcher::new());
3991 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3992 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3993
3994 deck.admin().cordon(42).await.expect("cordon");
3995 deck.admin()
3996 .drop_replicas(42, vec![1, 2])
3997 .await
3998 .expect("drop_replicas");
3999 tokio::time::sleep(Duration::from_millis(80)).await;
4000
4001 let entries = deck.audit().collect();
4002 assert_eq!(entries.len(), 2);
4003 for entry in &entries {
4004 assert!(matches!(
4005 entry.outcome,
4006 crate::adapter::net::behavior::meshos::VerificationOutcome::Unverified
4007 ));
4008 assert!(entry.operator_ids.is_empty());
4009 }
4010 let _ = runtime.shutdown().await;
4011 }
4012
4013 #[tokio::test]
4014 async fn audit_query_between_filters_outside_window() {
4015 use crate::adapter::net::behavior::meshos::{IceActionProposal, MeshOsEvent};
4016 let dispatcher = Arc::new(LoggingDispatcher::new());
4017 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
4018 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
4019
4020 runtime
4021 .handle()
4022 .publish(MeshOsEvent::SignedIceCommit {
4023 proposal: IceActionProposal::ThawCluster,
4024 signatures: Vec::new(),
4025 issued_at_ms: super::super::meshos::now_ms_since_unix_epoch(),
4026 blast_hash: TEST_BLAST_HASH,
4027 })
4028 .await
4029 .unwrap();
4030 tokio::time::sleep(Duration::from_millis(80)).await;
4031
4032 let past_only = deck.audit().between(0, 1).collect();
4035 assert!(past_only.is_empty());
4036
4037 let now_ms = std::time::SystemTime::now()
4039 .duration_since(std::time::UNIX_EPOCH)
4040 .unwrap()
4041 .as_millis() as u64;
4042 let around_now = deck
4043 .audit()
4044 .between(now_ms - 10_000, now_ms + 10_000)
4045 .collect();
4046 assert_eq!(around_now.len(), 1);
4047 let _ = runtime.shutdown().await;
4048 }
4049
4050 #[tokio::test]
4051 async fn ice_force_restart_daemon_proposal_round_trips() {
4052 let dispatcher = Arc::new(LoggingDispatcher::new());
4053 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
4054 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
4055 let daemon = super::super::meshos::DaemonRef {
4056 id: 7,
4057 name: "telemetry".into(),
4058 };
4059 let proposal = deck.ice().force_restart_daemon(daemon.clone());
4060 let simulated = proposal.simulate().await.expect("simulate");
4061 assert_eq!(simulated.blast_radius().affected_daemons, vec![daemon]);
4062 let sig = deck.identity().sign_proposal(
4063 simulated.action(),
4064 simulated.issued_at_ms(),
4065 &simulated.blast_hash(),
4066 );
4067 let commit = simulated.commit(&[sig]).await.expect("commit");
4068 assert_eq!(commit.event_kind(), "force_restart_daemon");
4069 let _ = runtime.shutdown().await;
4070 }
4071
4072 #[tokio::test]
4073 async fn ice_kill_migration_proposal_round_trips_and_audits() {
4074 let dispatcher = Arc::new(LoggingDispatcher::new());
4075 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
4076 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
4077 let proposal = deck.ice().kill_migration(123);
4078 let simulated = proposal.simulate().await.expect("simulate");
4079 let sig = deck.identity().sign_proposal(
4080 simulated.action(),
4081 simulated.issued_at_ms(),
4082 &simulated.blast_hash(),
4083 );
4084 let commit = simulated.commit(&[sig]).await.expect("commit");
4085 assert_eq!(commit.event_kind(), "kill_migration");
4086
4087 tokio::time::sleep(Duration::from_millis(60)).await;
4090 let entries = deck.audit().force_only().collect();
4091 assert!(entries
4092 .iter()
4093 .any(|r| matches!(r.event, AdminEvent::KillMigration { migration: 123 })));
4094 let _ = runtime.shutdown().await;
4095 }
4096
4097 #[tokio::test]
4098 async fn ice_force_cutover_proposal_round_trips() {
4099 let dispatcher = Arc::new(LoggingDispatcher::new());
4100 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
4101 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
4102 let proposal = deck.ice().force_cutover(100, 42);
4103 let simulated = proposal.simulate().await.expect("simulate");
4104 assert_eq!(simulated.blast_radius().affected_replicas, vec![100]);
4105 assert_eq!(simulated.blast_radius().affected_nodes, vec![42]);
4106 let sig = deck.identity().sign_proposal(
4107 simulated.action(),
4108 simulated.issued_at_ms(),
4109 &simulated.blast_hash(),
4110 );
4111 let commit = simulated.commit(&[sig]).await.expect("commit");
4112 assert_eq!(commit.event_kind(), "force_cutover");
4113 let _ = runtime.shutdown().await;
4114 }
4115
4116 #[tokio::test]
4117 async fn ice_force_evict_replica_proposal_round_trips() {
4118 let dispatcher = Arc::new(LoggingDispatcher::new());
4119 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
4120 let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
4121 let proposal = deck.ice().force_evict_replica(100, 7);
4122 let simulated = proposal.simulate().await.expect("simulate");
4123 assert_eq!(simulated.blast_radius().affected_replicas, vec![100]);
4124 assert_eq!(simulated.blast_radius().affected_nodes, vec![7]);
4125 let sig = deck.identity().sign_proposal(
4126 simulated.action(),
4127 simulated.issued_at_ms(),
4128 &simulated.blast_hash(),
4129 );
4130 let commit = simulated.commit(&[sig]).await.expect("commit");
4131 assert_eq!(commit.event_kind(), "force_evict_replica");
4132 let _ = runtime.shutdown().await;
4133 }
4134
4135 #[tokio::test]
4136 async fn ice_commit_with_registry_accepts_a_valid_multi_op_bundle() {
4137 let dispatcher = Arc::new(LoggingDispatcher::new());
4138 let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
4139 let op_a = OperatorIdentity::generate();
4140 let op_b = OperatorIdentity::generate();
4141 let mut registry = OperatorRegistry::new();
4142 registry.register(op_a.keypair());
4143 registry.register(op_b.keypair());
4144 let deck = DeckClient::new(
4145 runtime.handle_clone(),
4146 runtime.snapshot_reader().clone(),
4147 op_a.clone(),
4148 DeckClientConfig {
4149 snapshot_poll_interval: Duration::from_millis(100),
4150 ice_signature_threshold: 2,
4151 },
4152 )
4153 .with_operator_registry(registry);
4154
4155 let proposal = deck.ice().freeze_cluster(Duration::from_secs(15));
4156 let simulated = proposal.simulate().await.expect("simulate");
4157 let sig_a = op_a.sign_proposal(
4158 simulated.action(),
4159 simulated.issued_at_ms(),
4160 &simulated.blast_hash(),
4161 );
4162 let sig_b = op_b.sign_proposal(
4163 simulated.action(),
4164 simulated.issued_at_ms(),
4165 &simulated.blast_hash(),
4166 );
4167 let commit = simulated
4168 .commit(&[sig_a, sig_b])
4169 .await
4170 .expect("valid multi-op bundle should commit");
4171 assert_eq!(commit.event_kind(), "freeze_cluster");
4172 let _ = runtime.shutdown().await;
4173 }
4174}