Skip to main content

net/adapter/net/behavior/
deck.rs

1//! Deck SDK — Phase 1 (Rust) per
2//! [`DECK_SDK_PLAN.md`](../../../../../../docs/plans/DECK_SDK_PLAN.md).
3//!
4//! Operator-side surface, the dual of `behavior::meshos::sdk`
5//! (the daemon-author surface). Daemons author against
6//! `MeshOsDaemonSdk`; operators command against `DeckClient`.
7//!
8//! # Phase 1 scope
9//!
10//! - [`DeckClient`] — composes a [`MeshOsRuntime`] with an
11//!   [`OperatorIdentity`].
12//! - [`OperatorIdentity`] — operator-key newtype around
13//!   [`super::super::EntityKeypair`]. **Phase 1 is non-signing:**
14//!   the substrate's channel-auth surface doesn't yet expose
15//!   operator-key signing, so admin commits ride the local loop
16//!   un-signed and the SDK records the operator id for audit
17//!   correlation. The signing seam wires in when the substrate
18//!   slice that adds it lands (per the plan's "substrate gaps").
19//! - [`AdminCommands`] — typed methods for every
20//!   [`super::meshos::AdminEvent`] variant. Each publishes the
21//!   admin event onto the loop's event stream and returns a
22//!   [`ChainCommit`] correlation handle.
23//! - [`SnapshotStream`] — `Stream` over
24//!   [`super::meshos::MeshOsSnapshotReader`] polled at a
25//!   configurable cadence (defaults to the loop's tick interval).
26//!
27//! # Phase 1 deferrals
28//!
29//! - **Operator-signed commits.** The substrate's verifier doesn't
30//!   yet check operator signatures; the SDK records the operator
31//!   id on each commit but does not sign the event payload. Slated
32//!   for the substrate slice that adds operator-key channel-auth.
33//! - **Audit queries (`audit()`).** Need a signed admin chain to
34//!   query against; deferred to a slice that lands after the
35//!   substrate's admin-chain commit + signing path.
36//! - **Log stream (`subscribe_logs()`).** Needs per-daemon /
37//!   per-node log-chain binding through RedEX `tail()`; deferred.
38//! - **ICE (`ice()`).** Phase 2 substrate work (`ForceDrain`,
39//!   `ForceEvictReplica`, …, blast-radius simulator); Phase 3 SDK
40//!   surface. Locked decision #4 of the plan: blast-radius
41//!   simulation is mandatory before commit — substrate-side
42//!   contract not yet written.
43//!
44//! # Error model
45//!
46//! [`DeckError`] uses the `<<deck-sdk-kind:KIND>>MSG` discriminator
47//! format every cross-language SDK parses. Kinds shipped in Phase 1:
48//! `unknown_node`, `chain_commit_failed`, `loop_closed`,
49//! `queue_full`, `stream_closed`.
50//!
51//! # Example
52//!
53//! ```ignore
54//! use net::adapter::net::behavior::deck::{DeckClient, OperatorIdentity};
55//! use net::adapter::net::behavior::meshos::{MeshOsConfig, MeshOsRuntime};
56//!
57//! let runtime = MeshOsRuntime::start(MeshOsConfig::default(), dispatcher);
58//! let identity = OperatorIdentity::generate();
59//! let deck = DeckClient::from_runtime(&runtime, identity);
60//!
61//! let commit = deck
62//!     .admin()
63//!     .enter_maintenance(node_id, None)
64//!     .await?;
65//! tracing::info!(commit_id = commit.commit_id(), "drain proposed");
66//!
67//! use futures::StreamExt;
68//! let mut snaps = deck.snapshots();
69//! if let Some(Ok(snap)) = snaps.next().await {
70//!     // …render the latest state…
71//! }
72//! ```
73
74use std::pin::Pin;
75use std::sync::atomic::{AtomicU64, Ordering};
76use std::sync::Arc;
77use std::task::{Context, Poll};
78use std::time::{Duration, SystemTime};
79
80use futures::Stream;
81use tokio::time::{interval, Interval};
82
83use super::meshos::{
84    ice_proposal_signing_payload, simulate_ice_proposal, AdminEvent, BlastRadius, ChainId,
85    IceActionProposal, MeshOsEvent, MeshOsHandle, MeshOsHandleError, MeshOsRuntime, MeshOsSnapshot,
86    MeshOsSnapshotReader, NodeId,
87};
88use crate::adapter::net::behavior::aggregator::{AggregatorDaemon, SummaryAnnouncement};
89use crate::adapter::net::identity::EntityKeypair;
90use crate::adapter::net::subnet::SubnetId;
91use crate::adapter::net::MeshNode;
92use crate::adapter::net::{ChannelHash, Visibility};
93
94/// Operator identity. Phase 1 holds the operator key as an
95/// [`EntityKeypair`] (the same ed25519 type daemons use) plus a
96/// derived 64-bit operator id. The signing seam will widen here
97/// when the substrate slice that adds operator-key channel-auth
98/// lands; until then commits ride the local loop and the SDK
99/// records the operator id for audit correlation.
100#[derive(Clone, Debug)]
101pub struct OperatorIdentity {
102    keypair: Arc<EntityKeypair>,
103    operator_id: u64,
104}
105
106impl OperatorIdentity {
107    /// Wrap an existing keypair as an operator identity. The
108    /// operator id derives from the keypair's `origin_hash`.
109    pub fn from_keypair(keypair: EntityKeypair) -> Self {
110        let operator_id = keypair.origin_hash();
111        Self {
112            keypair: Arc::new(keypair),
113            operator_id,
114        }
115    }
116
117    /// Generate a fresh keypair + identity. Convenience for tests
118    /// and the tooling that bootstraps a one-shot operator.
119    pub fn generate() -> Self {
120        Self::from_keypair(EntityKeypair::generate())
121    }
122
123    /// 64-bit operator id derived from the underlying keypair's
124    /// `origin_hash`. Stable across the operator's lifetime.
125    pub fn operator_id(&self) -> u64 {
126        self.operator_id
127    }
128
129    /// Borrow the underlying keypair. **Use sparingly.** The
130    /// SDK's own signing helpers
131    /// ([`Self::sign_proposal`], [`Self::sign_admin_event`])
132    /// cover the canonical signing flows; reach for this only
133    /// when implementing a cross-language signing seam (e.g. a
134    /// FFI binding that needs to call its own ed25519 lib over
135    /// the same `(domain || issued_at || blast_hash || postcard)`
136    /// payload shape). Calls outside that envelope risk
137    /// drift between the SDK's signing bytes and what the
138    /// substrate verifier rebuilds.
139    pub fn keypair(&self) -> &EntityKeypair {
140        &self.keypair
141    }
142}
143
144/// SDK error surface. Carries the operator-readable message + a
145/// stable kind discriminator usable from cross-language consumers
146/// via the `<<deck-sdk-kind:KIND>>MSG` envelope.
147#[derive(Clone, Debug, thiserror::Error)]
148#[error("<<deck-sdk-kind:{kind}>>{message}")]
149pub struct DeckError {
150    /// Stable kind discriminator. Lowercase + underscore-only;
151    /// cross-language SDKs parse the surrounding
152    /// `<<deck-sdk-kind:…>>` envelope to extract this verbatim.
153    pub kind: &'static str,
154    /// Operator-readable message.
155    pub message: String,
156}
157
158impl DeckError {
159    fn new(kind: &'static str, message: impl Into<String>) -> Self {
160        Self {
161            kind,
162            message: message.into(),
163        }
164    }
165}
166
167impl From<MeshOsHandleError> for DeckError {
168    fn from(err: MeshOsHandleError) -> Self {
169        match err {
170            MeshOsHandleError::LoopClosed => Self::new("loop_closed", "MeshOS loop has exited"),
171            MeshOsHandleError::QueueFull => Self::new(
172                "queue_full",
173                "MeshOS source channel at capacity — back off + retry",
174            ),
175        }
176    }
177}
178
179/// Type alias for the admin-command error surface. Shares the
180/// underlying [`DeckError`] envelope; admin commits surface
181/// `loop_closed` / `queue_full` kinds.
182pub type AdminError = DeckError;
183
184/// Type alias for the ICE-surface error type. Shares the
185/// underlying [`DeckError`] envelope; ICE commits add the
186/// `simulation_required` / `insufficient_signatures` kinds.
187pub type IceError = DeckError;
188
189/// Correlation handle returned by every admin commit. Phase 1
190/// represents "the admin event was accepted by the loop's event
191/// queue"; the substrate slice that adds a signed admin chain
192/// will widen this to carry the chain sequence + commit hash.
193///
194/// Always carries the issuing operator id so audit downstream
195/// (when wired) can correlate commits to the operator that
196/// issued them.
197#[derive(Clone, Debug)]
198pub struct ChainCommit {
199    commit_id: u64,
200    operator_id: u64,
201    event_kind: &'static str,
202    committed_at: SystemTime,
203}
204
205impl ChainCommit {
206    /// Process-local correlation id, monotonically increasing
207    /// across every commit a single [`DeckClient`] produces.
208    pub fn commit_id(&self) -> u64 {
209        self.commit_id
210    }
211
212    /// Id of the operator that issued the commit.
213    pub fn operator_id(&self) -> u64 {
214        self.operator_id
215    }
216
217    /// Discriminator for the admin event the commit carried
218    /// (e.g. `"enter_maintenance"`, `"drop_replicas"`).
219    pub fn event_kind(&self) -> &'static str {
220        self.event_kind
221    }
222
223    /// Wall-clock timestamp at which the SDK accepted the commit.
224    /// Distinct from any per-chain commit sequence the substrate
225    /// will eventually expose.
226    pub fn committed_at(&self) -> SystemTime {
227        self.committed_at
228    }
229}
230
231/// Tunables for [`DeckClient`].
232#[derive(Clone, Debug)]
233pub struct DeckClientConfig {
234    /// Cadence at which [`SnapshotStream`] polls the runtime's
235    /// snapshot reader. Defaults to 100ms — same order of
236    /// magnitude as the default loop tick so the stream surfaces
237    /// each post-reconcile snapshot once.
238    pub snapshot_poll_interval: Duration,
239    /// Minimum operator signatures required to commit an ICE
240    /// proposal (see [`SimulatedIceProposal::commit`]). Plan
241    /// locks this in at 2-of-N by default,
242    /// substrate-verified; this slice ships single-signature
243    /// (`1`) as the SDK-side default because substrate-side
244    /// multi-operator verification hasn't shipped yet. Operators
245    /// who want client-enforced multi-op gating ahead of the
246    /// substrate slice can bump this knob.
247    pub ice_signature_threshold: usize,
248}
249
250impl Default for DeckClientConfig {
251    fn default() -> Self {
252        Self {
253            snapshot_poll_interval: Duration::from_millis(100),
254            ice_signature_threshold: 1,
255        }
256    }
257}
258
259/// Compact at-a-glance rollup of the runtime's latest snapshot.
260/// Built by [`DeckClient::status_summary`]; designed for the
261/// operator UI's "is everything OK?" header — one pass over
262/// the snapshot to count each cohort, plus the two cluster-
263/// wide flags ([`Self::freeze_remaining_ms`] and
264/// [`Self::local_maintenance_active`]) operators care most
265/// about at first glance.
266#[derive(Clone, Debug, Default, Eq, PartialEq)]
267pub struct StatusSummary {
268    /// Per-health-class peer counts.
269    pub peers: PeerCounts,
270    /// Per-lifecycle-and-restart-state daemon counts.
271    pub daemons: DaemonCounts,
272    /// Number of replica chains the snapshot tracks.
273    pub replica_chains: usize,
274    /// Number of avoid-list entries on this node.
275    pub avoid_list_entries: usize,
276    /// Depth of the ring of recently-emitted actions (count
277    /// of entries in `MeshOsSnapshot::recently_emitted`).
278    /// This is "what reconcile recently asked for," NOT "what
279    /// is currently in flight" — the executor doesn't signal
280    /// completion back to the loop, so the ring caps at
281    /// `action_queue_capacity` and never drains on its own.
282    pub recently_emitted_count: usize,
283    /// Executor failure ring depth.
284    pub recent_failure_count: usize,
285    /// Admin audit ring depth (signed ICE bundles + unsigned
286    /// admin events).
287    pub admin_audit_ring_depth: usize,
288    /// Milliseconds remaining on the cluster-wide ICE freeze.
289    /// `None` if no freeze is in effect.
290    pub freeze_remaining_ms: Option<u64>,
291    /// `true` iff this node's local maintenance state is
292    /// anything other than `Active`. Operators read this for
293    /// "is this node in maintenance right now?".
294    pub local_maintenance_active: bool,
295}
296
297/// Peer-health counts within a [`StatusSummary`].
298#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
299pub struct PeerCounts {
300    /// Peers responding within the heartbeat window.
301    pub healthy: usize,
302    /// Peers responding but slow.
303    pub degraded: usize,
304    /// Peers unreachable.
305    pub unreachable: usize,
306    /// Peers without a health sample yet.
307    pub unknown: usize,
308}
309
310/// Aggregate `SubnetGateway` counters surfaced by
311/// [`DeckClient::gateway_stats`]. Plain value type so operator
312/// tooling can render / serialize / diff snapshots without
313/// reaching into the substrate's atomic counters.
314#[derive(Clone, Debug, Eq, PartialEq)]
315pub struct GatewayStats {
316    /// The mesh node's local subnet — same as
317    /// [`DeckClient::local_subnet`], echoed here so a
318    /// `gateway stats` rendering doesn't need a second
319    /// accessor call.
320    pub local_subnet: SubnetId,
321    /// Total cross-subnet visibility decisions that resolved to
322    /// "forward" (publish-fanout admitted; subscribe-gate
323    /// admitted). Monotonic-increasing for the lifetime of the
324    /// gateway.
325    pub forwarded: u64,
326    /// Total decisions that resolved to "drop." Monotonic.
327    pub dropped: u64,
328    /// Snapshot of every peer subnet the gateway is bridging to,
329    /// sorted by raw bits. Sourced from `SubnetGateway::peer_subnets`.
330    pub peer_subnets: Vec<SubnetId>,
331    /// Number of explicit `(channel, target-subnets)` rules in
332    /// the export table — what `gateway exports` enumerates.
333    pub export_rules: u64,
334}
335
336/// One row in [`DeckClient::subnets_with_members`]'s rollup.
337/// Carries the subnet, the sorted member-`node_id` set, and a
338/// flag marking the local subnet so renderers don't need a
339/// second pass.
340#[derive(Clone, Debug, Eq, PartialEq)]
341pub struct SubnetRollup {
342    /// Subnet this row represents.
343    pub subnet: SubnetId,
344    /// Sorted set of `node_id`s known to belong to this subnet.
345    /// Empty when the local subnet has no peers (the local node
346    /// is only included if the caller supplied its node id).
347    pub members: Vec<u64>,
348    /// `true` when [`Self::subnet`] matches the local mesh
349    /// node's subnet.
350    pub is_local: bool,
351}
352
353/// One-shot snapshot returned by [`DeckClient::aggregator_snapshot`].
354/// Bundles every field a renderer needs in a single struct so
355/// callers don't pay for five per-field lock acquisitions per
356/// frame. `summaries` is an `Arc` so the snapshot itself is
357/// cheap to clone.
358#[derive(Clone, Debug)]
359pub struct AggregatorSnapshot {
360    /// Subnet the aggregator is summarizing.
361    pub source_subnet: SubnetId,
362    /// `FoldKind::KIND_ID`s the aggregator is configured for.
363    pub fold_kinds: Vec<u16>,
364    /// Aggregator's monotonic tick counter.
365    pub generation: u64,
366    /// Aggregator's tick cadence.
367    pub summary_interval: std::time::Duration,
368    /// Buffered summaries — `Arc::clone`-cheap.
369    pub summaries: Arc<Vec<SummaryAnnouncement>>,
370}
371
372/// Per-replica row in an [`AggregatorRegistryGroupSnapshot`].
373/// One per replica in declaration order.
374#[derive(Clone, Debug)]
375pub struct AggregatorReplicaRow {
376    /// Replica's monotonic tick counter.
377    pub generation: u64,
378    /// `true` when the replica's last tick was within
379    /// `3 × summary_interval`.
380    pub healthy: bool,
381    /// Operator-facing diagnostic when `healthy == false`.
382    pub diagnostic: Option<String>,
383    /// Placement decision recorded at spawn time (only present
384    /// when the group was spawned via
385    /// `LifecycleGroup::spawn_with_placement`).
386    pub placement_node_id: Option<u64>,
387}
388
389/// Snapshot of one registered aggregator group. Built by
390/// [`DeckClient::aggregator_registry_snapshot`] and consumed by
391/// `net aggregator ls` + the future Deck panel.
392#[derive(Clone, Debug)]
393pub struct AggregatorRegistryGroupSnapshot {
394    /// Operator-chosen group name.
395    pub name: String,
396    /// 32-byte group seed for deterministic identity.
397    pub group_seed: [u8; 32],
398    /// Per-replica rows in declaration order.
399    pub replicas: Vec<AggregatorReplicaRow>,
400}
401
402/// Snapshot of every aggregator group registered on the node.
403#[derive(Clone, Debug, Default)]
404pub struct AggregatorRegistrySnapshot {
405    /// Groups in lexicographic order by name (matches the
406    /// registry's `entries()` ordering).
407    pub groups: Vec<AggregatorRegistryGroupSnapshot>,
408}
409
410/// Daemon counts within a [`StatusSummary`]. Lifecycle
411/// counts are disjoint partitions of the registered set;
412/// `crash_looping` / `backing_off` are orthogonal restart-
413/// state markers that overlap with lifecycle (a `Stopped`
414/// daemon can also be `BackingOff`).
415#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
416pub struct DaemonCounts {
417    /// Currently running.
418    pub running: usize,
419    /// Start requested; awaiting confirmation.
420    pub starting: usize,
421    /// Stop requested; awaiting confirmation.
422    pub stopping: usize,
423    /// Not currently running.
424    pub stopped: usize,
425    /// In supervisor's BackingOff window. Orthogonal to
426    /// lifecycle — a daemon in this state is typically also
427    /// `Stopped`.
428    pub backing_off: usize,
429    /// In supervisor's CrashLooping window — has crossed the
430    /// crash-loop threshold and is parked for a longer cooldown.
431    pub crash_looping: usize,
432}
433
434/// Build a [`StatusSummary`] from a snapshot. One pass over
435/// the per-peer and per-daemon maps; the rest of the rollup
436/// is direct field reads.
437fn build_status_summary(snap: &MeshOsSnapshot) -> StatusSummary {
438    let mut peers = PeerCounts::default();
439    for (_, peer) in snap.peers.iter() {
440        match peer.health {
441            Some(super::meshos::PeerHealthSnapshot::Healthy) => peers.healthy += 1,
442            Some(super::meshos::PeerHealthSnapshot::Degraded) => peers.degraded += 1,
443            Some(super::meshos::PeerHealthSnapshot::Unreachable) => peers.unreachable += 1,
444            None => peers.unknown += 1,
445        }
446    }
447    let mut daemons = DaemonCounts::default();
448    for (_, d) in snap.daemons.iter() {
449        match d.lifecycle {
450            super::meshos::DaemonLifecycleSnapshot::Running => daemons.running += 1,
451            super::meshos::DaemonLifecycleSnapshot::Starting => daemons.starting += 1,
452            super::meshos::DaemonLifecycleSnapshot::Stopping => daemons.stopping += 1,
453            super::meshos::DaemonLifecycleSnapshot::Stopped => daemons.stopped += 1,
454        }
455        match d.restart_state {
456            super::meshos::RestartStateSnapshot::Idle => {}
457            super::meshos::RestartStateSnapshot::BackingOff { .. } => daemons.backing_off += 1,
458            super::meshos::RestartStateSnapshot::CrashLooping { .. } => daemons.crash_looping += 1,
459        }
460    }
461    let maintenance_active = !matches!(
462        snap.local_maintenance,
463        super::meshos::MaintenanceStateSnapshot::Active
464    );
465    StatusSummary {
466        peers,
467        daemons,
468        replica_chains: snap.replicas.len(),
469        avoid_list_entries: snap.avoid_list.len(),
470        recently_emitted_count: snap.recently_emitted.len(),
471        recent_failure_count: snap.recent_failures.len(),
472        admin_audit_ring_depth: snap.admin_audit.len(),
473        freeze_remaining_ms: snap.freeze_remaining_ms,
474        local_maintenance_active: maintenance_active,
475    }
476}
477
478/// Operator-facing client. Composes a [`MeshOsHandle`] +
479/// [`MeshOsSnapshotReader`] + [`OperatorIdentity`] into the
480/// surface Deck-the-binary (and other operator tools) bind
481/// against.
482///
483/// Constructed via [`Self::from_runtime`] (when the caller holds
484/// the live runtime) or [`Self::new`] (when the caller already
485/// has handle + reader and wants to compose explicitly).
486#[derive(Clone)]
487pub struct DeckClient {
488    handle: MeshOsHandle,
489    snapshot_reader: MeshOsSnapshotReader,
490    identity: OperatorIdentity,
491    config: DeckClientConfig,
492    /// Wrapped in `Arc` so clones share the same counter —
493    /// `commit_id`s stay monotonic across every fan-out use of
494    /// a single `DeckClient`. Without this, a clone would
495    /// produce its own counter and two siblings could emit
496    /// colliding `commit_id`s for distinct commits.
497    commit_seq: Arc<AtomicU64>,
498    /// Optional operator-key registry. When present, every ICE
499    /// commit verifies each [`OperatorSignature`] against the
500    /// registered public key before publishing. When absent,
501    /// signatures pass through unchecked — useful for local /
502    /// in-process tests but unsafe for any deployment that
503    /// hasn't yet wired up the substrate verifier.
504    operator_registry: Option<Arc<OperatorRegistry>>,
505    /// Optional reference to the running `MeshNode`. Wired in via
506    /// [`Self::with_mesh`] when the operator surface needs to
507    /// read subnet / gateway / channel state that doesn't live
508    /// in the [`MeshOsSnapshot`]. `None` when the deck is running
509    /// against a degenerate / mesh-less runtime (test harnesses,
510    /// pre-attach CLI invocations); the subnet/gateway accessors
511    /// gracefully report "no mesh installed" in that case.
512    mesh: Option<Arc<MeshNode>>,
513    /// Optional reference to a running [`AggregatorDaemon`].
514    /// Wired in via [`Self::with_aggregator`] when an aggregator
515    /// is running in-process alongside the deck. Powers the
516    /// `AGGREGATORS` Deck panel + the
517    /// `DeckClient::aggregator_*` accessors. `None` for decks
518    /// that don't host an aggregator (most operator binaries —
519    /// they're queriers, not hosts).
520    aggregator: Option<Arc<AggregatorDaemon>>,
521}
522
523impl DeckClient {
524    /// Explicit constructor. Use when the caller already holds
525    /// a [`MeshOsHandle`] + [`MeshOsSnapshotReader`] (e.g. when
526    /// composing with other subsystems that share the same
527    /// runtime).
528    pub fn new(
529        handle: MeshOsHandle,
530        snapshot_reader: MeshOsSnapshotReader,
531        identity: OperatorIdentity,
532        config: DeckClientConfig,
533    ) -> Self {
534        Self {
535            handle,
536            snapshot_reader,
537            identity,
538            config,
539            commit_seq: Arc::new(AtomicU64::new(0)),
540            operator_registry: None,
541            mesh: None,
542            aggregator: None,
543        }
544    }
545
546    /// Install a live `MeshNode` reference so the subnet and
547    /// gateway accessors ([`Self::local_subnet`],
548    /// [`Self::known_subnets`], [`Self::gateway_stats`],
549    /// [`Self::gateway_exports`], [`Self::channel_visibility`])
550    /// can read substrate state that doesn't flow through the
551    /// [`MeshOsSnapshot`] stream. Without this the accessors
552    /// gracefully report "no mesh installed."
553    pub fn with_mesh(mut self, mesh: Arc<MeshNode>) -> Self {
554        self.mesh = Some(mesh);
555        self
556    }
557
558    /// Install a live [`AggregatorDaemon`] reference. Powers the
559    /// `aggregator_*` accessors + the Deck `AGGREGATORS` panel.
560    pub fn with_aggregator(mut self, aggregator: Arc<AggregatorDaemon>) -> Self {
561        self.aggregator = Some(aggregator);
562        self
563    }
564
565    /// Convenience constructor that pulls the handle + snapshot
566    /// reader off a live runtime. Borrows the runtime; the
567    /// returned client outlives the borrow because both
568    /// `MeshOsHandle` and `MeshOsSnapshotReader` are clone-shared.
569    pub fn from_runtime(runtime: &MeshOsRuntime, identity: OperatorIdentity) -> Self {
570        Self::new(
571            runtime.handle_clone(),
572            runtime.snapshot_reader().clone(),
573            identity,
574            DeckClientConfig::default(),
575        )
576    }
577
578    /// Override the default [`DeckClientConfig`] on an existing
579    /// client. Builder-style.
580    pub fn with_config(mut self, config: DeckClientConfig) -> Self {
581        self.config = config;
582        self
583    }
584
585    /// Install an [`OperatorRegistry`] that gates every ICE
586    /// commit on operator-signature verification. Without a
587    /// registry installed, commits accept any signature bundle
588    /// that meets the threshold — useful for local tests but
589    /// not for deployment.
590    pub fn with_operator_registry(mut self, registry: OperatorRegistry) -> Self {
591        self.operator_registry = Some(Arc::new(registry));
592        self
593    }
594
595    /// Borrow the installed operator registry, if any.
596    pub fn operator_registry(&self) -> Option<&OperatorRegistry> {
597        self.operator_registry.as_deref()
598    }
599
600    /// Borrow the operator identity.
601    pub fn identity(&self) -> &OperatorIdentity {
602        &self.identity
603    }
604
605    /// Build an [`AdminCommands`] surface bound to this client.
606    /// Each method publishes the corresponding admin event and
607    /// returns a [`ChainCommit`].
608    pub fn admin(&self) -> AdminCommands<'_> {
609        AdminCommands { client: self }
610    }
611
612    /// Build an [`IceCommands`] surface — the break-glass
613    /// operator path. Each method returns an [`IceProposal`]
614    /// that must be `simulate()`d before `commit()` per the
615    /// plan's locked decision #4.
616    pub fn ice(&self) -> IceCommands<'_> {
617        IceCommands { client: self }
618    }
619
620    /// Build an [`AuditQuery`] reading the substrate's ICE
621    /// audit ring (every `SignedIceCommit` the loop's verifier
622    /// observed — accepted or rejected). Fluent builder: chain
623    /// `by_operator`, `between`, `force_only`, `recent` before
624    /// `collect()`. See [`AuditQuery`] for the per-method
625    /// semantics + the current Phase 1 scope (ICE only — the
626    /// non-force admin chain query path lands when the
627    /// substrate's signed admin chain ships).
628    pub fn audit(&self) -> AuditQuery<'_> {
629        AuditQuery::new(self)
630    }
631
632    /// Subscribe to the substrate's executor-failure ring.
633    /// Returns a [`FailureStream`] that tails each
634    /// [`super::meshos::FailureRecord`] as the executor records
635    /// it. Same seq-watermark dedup pattern as
636    /// [`AuditStream`] / [`LogStream`].
637    ///
638    /// `since_seq` seeds the initial watermark; passing `0`
639    /// (the default) tails new failures from "now" onwards.
640    pub fn subscribe_failures(&self, since_seq: u64) -> FailureStream {
641        FailureStream::new(
642            self.snapshot_reader.clone(),
643            self.config.snapshot_poll_interval,
644            since_seq,
645        )
646    }
647
648    /// Subscribe to the substrate's per-node log ring with
649    /// the given filter. Returns a [`LogStream`] that tails
650    /// matching log lines as they arrive — same dedup pattern
651    /// as the audit-tail stream (monotonic per-runtime `seq`
652    /// watermark).
653    ///
654    /// `filter` defaults to "everything"; chain
655    /// [`LogFilter::min_level`] / [`LogFilter::with_daemon`]
656    /// / [`LogFilter::with_node`] to narrow the result.
657    pub fn subscribe_logs(&self, filter: LogFilter) -> LogStream {
658        LogStream::new(
659            self.snapshot_reader.clone(),
660            self.config.snapshot_poll_interval,
661            filter,
662        )
663    }
664
665    /// Open a [`SnapshotStream`] over the runtime's snapshot
666    /// reader. The stream polls at
667    /// [`DeckClientConfig::snapshot_poll_interval`] and emits a
668    /// `Result<MeshOsSnapshot, DeckError>` on every poll.
669    /// Closing the stream is a `drop`.
670    pub fn snapshots(&self) -> SnapshotStream {
671        SnapshotStream::new(
672            self.snapshot_reader.clone(),
673            self.config.snapshot_poll_interval,
674        )
675    }
676
677    /// One-shot read of the runtime's latest snapshot.
678    /// Synchronous — one atomic load on the snapshot pointer
679    /// plus a clone of the underlying data. Use for one-off
680    /// reads ("what's the freeze state right now?"); prefer
681    /// [`Self::snapshots`] when iterating over many ticks.
682    pub fn status(&self) -> MeshOsSnapshot {
683        self.snapshot_reader.read()
684    }
685
686    /// Live-updating [`StatusSummary`] stream. Polls the
687    /// snapshot reader at
688    /// [`DeckClientConfig::snapshot_poll_interval`] and emits
689    /// a new summary whenever the rollup actually changes
690    /// (PartialEq dedup) — operator dashboards bind here for
691    /// "render only when something is different." The first
692    /// summary always emits.
693    pub fn status_summary_stream(&self) -> StatusSummaryStream {
694        StatusSummaryStream::new(
695            self.snapshot_reader.clone(),
696            self.config.snapshot_poll_interval,
697        )
698    }
699
700    /// Roll the snapshot up into a compact at-a-glance status
701    /// summary — daemon-health counts, peer-health counts,
702    /// freeze / maintenance flags, queue depths. Useful for
703    /// the operator UI's "is everything OK?" header. One
704    /// snapshot load + a single iterator pass; no full clone.
705    pub fn status_summary(&self) -> StatusSummary {
706        build_status_summary(&self.snapshot_reader.load())
707    }
708
709    /// Borrow the latest peer summary. One snapshot load + a
710    /// clone of just the peers map.
711    pub fn peers(&self) -> std::collections::BTreeMap<NodeId, super::meshos::PeerSnapshot> {
712        self.snapshot_reader.load().peers.clone()
713    }
714
715    /// This deck's local mesh node's `SubnetId`, or `None` when
716    /// no `MeshNode` has been wired in via [`Self::with_mesh`].
717    /// Powers `net subnet show`.
718    pub fn local_subnet(&self) -> Option<SubnetId> {
719        self.mesh.as_ref().map(|m| m.local_subnet())
720    }
721
722    /// Snapshot of every `(node_id, subnet_id)` pair the local
723    /// mesh has cached from signature-verified capability
724    /// announcements, sorted by `node_id`. Empty when no
725    /// `MeshNode` is wired in. Powers `net subnet ls` and
726    /// `net subnet tree`.
727    pub fn known_subnets(&self) -> Vec<(u64, SubnetId)> {
728        self.mesh
729            .as_ref()
730            .map(|m| m.known_subnets())
731            .unwrap_or_default()
732    }
733
734    /// Group `known_subnets` into one row per subnet with sorted
735    /// member ids. Pass `Some(node_id)` to include the local node
736    /// under its own subnet's members (the CLI's `subnet ls`
737    /// surface does this); pass `None` to omit the local node
738    /// from members but still flag its row via
739    /// [`SubnetRollup::is_local`] (the deck SUBNETS tab does this).
740    ///
741    /// The local subnet always appears as a row, even when no
742    /// peers are known under it.
743    pub fn subnets_with_members(&self, local_node_id: Option<u64>) -> Vec<SubnetRollup> {
744        let local = self.local_subnet();
745        let mut buckets: std::collections::BTreeMap<u32, std::collections::BTreeSet<u64>> =
746            std::collections::BTreeMap::new();
747        for (node_id, subnet) in self.known_subnets() {
748            buckets.entry(subnet.raw()).or_default().insert(node_id);
749        }
750        if let Some(local_subnet) = local {
751            let entry = buckets.entry(local_subnet.raw()).or_default();
752            if let Some(id) = local_node_id {
753                entry.insert(id);
754            }
755        }
756        buckets
757            .into_iter()
758            .map(|(raw, members)| {
759                let subnet = SubnetId::from_raw(raw);
760                SubnetRollup {
761                    subnet,
762                    members: members.into_iter().collect(),
763                    is_local: local == Some(subnet),
764                }
765            })
766            .collect()
767    }
768
769    /// Aggregate gateway counters for `net gateway stats`.
770    /// Returns `None` when the mesh has no installed
771    /// `ChannelConfigRegistry` — in that case the gateway isn't
772    /// built and there's nothing to report.
773    pub fn gateway_stats(&self) -> Option<GatewayStats> {
774        let gw = self.mesh.as_ref().and_then(|m| m.gateway())?;
775        Some(GatewayStats {
776            local_subnet: gw.local_subnet(),
777            forwarded: gw.forwarded_count(),
778            dropped: gw.dropped_count(),
779            peer_subnets: gw.peer_subnets(),
780            export_rules: gw.exports().len() as u64,
781        })
782    }
783
784    /// Snapshot of the gateway's export table as
785    /// `(channel_hash, target_subnets)` pairs, sorted by
786    /// `channel_hash`. Empty when no gateway is installed.
787    pub fn gateway_exports(&self) -> Vec<(u16, Vec<SubnetId>)> {
788        self.mesh
789            .as_ref()
790            .and_then(|m| m.gateway())
791            .map(|gw| gw.exports())
792            .unwrap_or_default()
793    }
794
795    /// Resolve a channel name to its [`Visibility`] config, or
796    /// `None` when no `ChannelConfigRegistry` has been installed
797    /// or the name isn't registered. Falls back through the
798    /// registry's prefix table via `get_by_name`. Powers
799    /// `net channel visibility <name>`.
800    pub fn channel_visibility(&self, channel_name: &str) -> Option<Visibility> {
801        let mesh = self.mesh.as_ref()?;
802        let registry = mesh.channel_configs()?;
803        let cfg = registry.get_by_name(channel_name)?;
804        Some(cfg.visibility)
805    }
806
807    /// Snapshot every registered channel as `(name, visibility)`
808    /// pairs for `net channel ls`, sorted by name. Empty when no
809    /// `ChannelConfigRegistry` has been installed.
810    pub fn channels(&self) -> Vec<(String, Visibility)> {
811        let Some(mesh) = self.mesh.as_ref() else {
812            return Vec::new();
813        };
814        let Some(registry) = mesh.channel_configs() else {
815            return Vec::new();
816        };
817        registry
818            .snapshot()
819            .into_iter()
820            .map(|(name, cfg)| (name, cfg.visibility))
821            .collect()
822    }
823
824    /// Lookup a channel's wire-`u16` hash for use with
825    /// `gateway_exports`. `None` when no registry is installed
826    /// or the channel isn't registered. Convenience for
827    /// `net gateway export <channel> ...` to translate the
828    /// human-readable channel name into the wire key the
829    /// gateway's export table is keyed on.
830    pub fn channel_wire_hash(&self, channel_name: &str) -> Option<u16> {
831        let mesh = self.mesh.as_ref()?;
832        let registry = mesh.channel_configs()?;
833        let cfg = registry.get_by_name(channel_name)?;
834        Some(cfg.channel_id.wire_hash())
835    }
836
837    /// Lookup a channel's canonical `ChannelHash` (u64). Same
838    /// shape as [`Self::channel_wire_hash`] but returns the full
839    /// 64-bit hash callers use for fold + ACL lookups.
840    pub fn channel_canonical_hash(&self, channel_name: &str) -> Option<ChannelHash> {
841        let mesh = self.mesh.as_ref()?;
842        let registry = mesh.channel_configs()?;
843        let cfg = registry.get_by_name(channel_name)?;
844        Some(cfg.channel_id.hash())
845    }
846
847    /// `true` when a live [`AggregatorDaemon`] is installed via
848    /// [`Self::with_aggregator`]. Lets the AGGREGATORS Deck panel
849    /// discriminate between "no aggregator wired" and "aggregator
850    /// wired but has nothing to report yet."
851    pub fn aggregator_installed(&self) -> bool {
852        self.aggregator.is_some()
853    }
854
855    /// Snapshot the running aggregator's latest summaries.
856    /// Empty vec when no aggregator is installed.
857    pub fn aggregator_summaries(&self) -> Vec<SummaryAnnouncement> {
858        self.aggregator
859            .as_ref()
860            .map(|a| a.latest_summaries())
861            .unwrap_or_default()
862    }
863
864    /// Cheap shared-snapshot variant of [`Self::aggregator_summaries`]
865    /// — clones only the outer `Arc`. Hot-path callers (TUI render
866    /// loops) should prefer this.
867    pub fn aggregator_summaries_arc(&self) -> Arc<Vec<SummaryAnnouncement>> {
868        self.aggregator
869            .as_ref()
870            .map(|a| a.latest_summaries_arc())
871            .unwrap_or_else(|| Arc::new(Vec::new()))
872    }
873
874    /// One-call accessor that returns every aggregator field a
875    /// renderer needs in one struct. Replaces the per-field hops
876    /// (`aggregator_source_subnet` + `aggregator_fold_kinds` +
877    /// `aggregator_generation` + `aggregator_summary_interval` +
878    /// `aggregator_summaries`) — five lock acquisitions and two
879    /// Vec clones per frame collapse to one struct construction
880    /// and one Arc clone.
881    ///
882    /// Returns `None` when no aggregator is installed.
883    pub fn aggregator_snapshot(&self) -> Option<AggregatorSnapshot> {
884        let agg = self.aggregator.as_ref()?;
885        let config = agg.config();
886        Some(AggregatorSnapshot {
887            source_subnet: config.source_subnet,
888            fold_kinds: config.fold_kinds.clone(),
889            generation: agg.generation(),
890            summary_interval: config.summary_interval,
891            summaries: agg.latest_summaries_arc(),
892        })
893    }
894
895    /// Snapshot every aggregator group registered on the
896    /// installed `MeshNode`'s
897    /// [`AggregatorRegistry`](super::aggregator::registry::AggregatorRegistry).
898    /// Returns `None` when no mesh is wired in or no registry has
899    /// been installed via `MeshNode::set_aggregator_registry`.
900    ///
901    /// Used by `net aggregator ls` + the future
902    /// Deck AGGREGATORS-list panel. Per-replica health is
903    /// surfaced inline so CLI / TUI render one shot of data
904    /// without follow-up calls.
905    pub async fn aggregator_registry_snapshot(&self) -> Option<AggregatorRegistrySnapshot> {
906        let mesh = self.mesh.as_ref()?;
907        let registry = mesh.aggregator_registry()?;
908        let entries = registry.entries();
909        let mut groups = Vec::with_capacity(entries.len());
910        for entry in entries {
911            // One lock + outside-the-guard health-join per group,
912            // via the entry's snapshot helper. Previously this
913            // path took three sequential lock acquisitions
914            // (`replicas` / `placements` / `health`) per group +
915            // a slow `health()` blocked concurrent
916            // `register`/`unregister` writers. See
917            // `AggregatorGroupEntry::snapshot` for the rationale.
918            let snap = entry.snapshot().await;
919            let rows = snap
920                .replicas
921                .iter()
922                .enumerate()
923                .map(|(idx, replica)| {
924                    let health = snap.healths.get(idx).cloned().unwrap_or(
925                        crate::adapter::net::behavior::lifecycle::ReplicaHealth {
926                            healthy: true,
927                            diagnostic: None,
928                        },
929                    );
930                    let placement_node_id = snap.placements.get(idx).map(|p| p.node_id);
931                    AggregatorReplicaRow {
932                        generation: replica.generation(),
933                        healthy: health.healthy,
934                        diagnostic: health.diagnostic,
935                        placement_node_id,
936                    }
937                })
938                .collect();
939            groups.push(AggregatorRegistryGroupSnapshot {
940                name: entry.name.clone(),
941                group_seed: entry.group_seed,
942                replicas: rows,
943            });
944        }
945        Some(AggregatorRegistrySnapshot { groups })
946    }
947
948    /// Aggregator's monotonic tick counter, or `0` when none
949    /// installed.
950    pub fn aggregator_generation(&self) -> u64 {
951        self.aggregator
952            .as_ref()
953            .map(|a| a.generation())
954            .unwrap_or(0)
955    }
956
957    /// Aggregator's source subnet — what the daemon is summarizing.
958    /// `None` when no aggregator is installed.
959    pub fn aggregator_source_subnet(&self) -> Option<SubnetId> {
960        self.aggregator.as_ref().map(|a| a.config().source_subnet)
961    }
962
963    /// List of fold-kind ids the aggregator is configured to
964    /// summarize. Empty when no aggregator is installed.
965    pub fn aggregator_fold_kinds(&self) -> Vec<u16> {
966        self.aggregator
967            .as_ref()
968            .map(|a| a.config().fold_kinds.clone())
969            .unwrap_or_default()
970    }
971
972    /// Aggregator's summary cadence, or zero when none installed.
973    pub fn aggregator_summary_interval(&self) -> std::time::Duration {
974        self.aggregator
975            .as_ref()
976            .map(|a| a.config().summary_interval)
977            .unwrap_or_default()
978    }
979
980    /// Borrow the latest daemon summary keyed by daemon id.
981    pub fn daemons(&self) -> std::collections::BTreeMap<u64, super::meshos::DaemonSnapshot> {
982        self.snapshot_reader.load().daemons.clone()
983    }
984
985    /// Borrow the latest replica summary keyed by chain id.
986    pub fn replicas(&self) -> std::collections::BTreeMap<ChainId, super::meshos::ReplicaSnapshot> {
987        self.snapshot_reader.load().replicas.clone()
988    }
989
990    /// Read this node's local maintenance state.
991    pub fn local_maintenance(&self) -> super::meshos::MaintenanceStateSnapshot {
992        self.snapshot_reader.load().local_maintenance.clone()
993    }
994
995    /// Read the cluster-wide ICE freeze remaining time. `None`
996    /// when no freeze is in effect.
997    pub fn freeze_remaining_ms(&self) -> Option<u64> {
998        self.snapshot_reader.load().freeze_remaining_ms
999    }
1000
1001    /// Borrow the latest executor-side failure ring. Bounded
1002    /// by [`super::meshos::RECENT_FAILURES_CAPACITY`]; ordered
1003    /// oldest-first (FIFO). Operators read this to see what
1004    /// the action dispatcher rejected recently. One snapshot
1005    /// load + a clone of just the failure ring.
1006    pub fn recent_failures(&self) -> Vec<super::meshos::FailureRecord> {
1007        self.snapshot_reader
1008            .load()
1009            .recent_failures
1010            .iter()
1011            .cloned()
1012            .collect()
1013    }
1014
1015    /// Runtime-epoch identifier this MeshOsLoop stamped at
1016    /// startup. Stable for the lifetime of the loop task and
1017    /// changes on every restart. Consumers dedup'ing with
1018    /// `since(seq)` watermarks pair every saved watermark
1019    /// with this value — when it flips, reset the watermark
1020    /// to 0 rather than silently filtering post-restart
1021    /// records as "smaller than my last seq."
1022    pub fn runtime_epoch_id(&self) -> u64 {
1023        self.snapshot_reader.load().runtime_epoch_id
1024    }
1025
1026    /// Highest `seq` currently visible on the admin-audit
1027    /// ring. Returns `0` when the ring is empty. Lets a
1028    /// caller's `since(seq)` pagination distinguish "ahead of
1029    /// the head" (caller's watermark > head_seq) from "no new
1030    /// records yet" (watermark == head_seq) — the audit
1031    /// stream itself swallows both cases silently.
1032    pub fn audit_head_seq(&self) -> u64 {
1033        self.snapshot_reader
1034            .load()
1035            .admin_audit
1036            .last()
1037            .map(|r| r.seq)
1038            .unwrap_or(0)
1039    }
1040
1041    /// Highest `seq` currently visible on the log ring.
1042    /// Returns `0` when the ring is empty. Same purpose as
1043    /// [`Self::audit_head_seq`] for the log-stream surface.
1044    pub fn log_head_seq(&self) -> u64 {
1045        self.snapshot_reader
1046            .load()
1047            .log_ring
1048            .last()
1049            .map(|r| r.seq)
1050            .unwrap_or(0)
1051    }
1052
1053    /// Highest `seq` currently visible on the failure ring.
1054    /// Returns `0` when the ring is empty.
1055    pub fn failure_head_seq(&self) -> u64 {
1056        self.snapshot_reader
1057            .load()
1058            .recent_failures
1059            .iter()
1060            .next_back()
1061            .map(|r| r.seq)
1062            .unwrap_or(0)
1063    }
1064
1065    /// Like [`Self::recent_failures`] but keeps only entries
1066    /// with `recorded_at_ms > since_ms`. Pagination primitive:
1067    /// poll periodically, persist the max `recorded_at_ms`
1068    /// observed, and re-query with that value to surface only
1069    /// new failures.
1070    ///
1071    /// Note: same-ms collisions can land on the boundary —
1072    /// records sharing the cutoff value's exact ms might be
1073    /// missed if they were added between polls. The proper
1074    /// seq-based stream lands once [`super::meshos::FailureRecord`]
1075    /// gains a monotonic seq field (next substrate slice).
1076    pub fn recent_failures_since(&self, since_ms: u64) -> Vec<super::meshos::FailureRecord> {
1077        self.snapshot_reader
1078            .load()
1079            .recent_failures
1080            .iter()
1081            .filter(|r| r.recorded_at_ms > since_ms)
1082            .cloned()
1083            .collect()
1084    }
1085
1086    /// Await a snapshot matching `predicate`. Event-driven (E-9/E-10):
1087    /// blocks on the loop's structural change signal (the snapshot
1088    /// reader's `subscribe_changes`) and re-tests `predicate` on each
1089    /// change, so a match is observed as soon as the loop publishes a
1090    /// structurally-changed snapshot rather than on the next poll tick.
1091    /// If the current snapshot already matches, resolves immediately.
1092    ///
1093    /// [`DeckClientConfig::snapshot_poll_interval`] is retained as a
1094    /// debounce ceiling, not a poll cadence: it bounds the re-test
1095    /// latency even if a publish edge is missed (the loop also
1096    /// republishes every Tick, so the ceiling is belt-and-suspenders).
1097    ///
1098    /// **Note on wedge risk:** if no snapshot ever matches the
1099    /// predicate, this future never resolves. Use
1100    /// [`Self::watch_timeout`] for bounded waits.
1101    pub async fn watch<F>(&self, mut predicate: F) -> MeshOsSnapshot
1102    where
1103        F: FnMut(&MeshOsSnapshot) -> bool,
1104    {
1105        // Check the current snapshot first — many "wait for
1106        // state X" calls land on a state that's already true.
1107        let snap = self.snapshot_reader.read();
1108        if predicate(&snap) {
1109            return snap;
1110        }
1111        let ceiling = self
1112            .config
1113            .snapshot_poll_interval
1114            .max(Duration::from_millis(1));
1115        // Subscribe ONCE and reuse the receiver across iterations: a
1116        // structural-change generation bumped between the predicate
1117        // re-test and the next `changed()` await is still observed (the
1118        // receiver tracks its seen generation), so this is
1119        // missed-wakeup-safe and the ceiling is a true backstop — even
1120        // when `snapshot_poll_interval` is set long for idle-quiet.
1121        let mut change_rx = self.snapshot_reader.subscribe_changes();
1122        loop {
1123            tokio::select! {
1124                biased;
1125                _ = change_rx.changed() => {}
1126                _ = tokio::time::sleep(ceiling) => {}
1127            }
1128            let snap = self.snapshot_reader.read();
1129            if predicate(&snap) {
1130                return snap;
1131            }
1132        }
1133    }
1134
1135    /// Like [`Self::watch`] but with a bounded wait. Returns
1136    /// `Ok(snapshot)` on first match, `Err(DeckError)` with
1137    /// kind `watch_timeout` when `timeout` elapses without a
1138    /// match.
1139    pub async fn watch_timeout<F>(
1140        &self,
1141        predicate: F,
1142        timeout: Duration,
1143    ) -> Result<MeshOsSnapshot, DeckError>
1144    where
1145        F: FnMut(&MeshOsSnapshot) -> bool,
1146    {
1147        tokio::time::timeout(timeout, self.watch(predicate))
1148            .await
1149            .map_err(|_| {
1150                DeckError::new(
1151                    "watch_timeout",
1152                    format!(
1153                        "no snapshot matched the predicate within {} ms",
1154                        timeout.as_millis()
1155                    ),
1156                )
1157            })
1158    }
1159
1160    fn next_commit_id(&self) -> u64 {
1161        // Start at 1 so a `0` commit id is recognizable as
1162        // "unset" downstream.
1163        self.commit_seq.fetch_add(1, Ordering::Relaxed) + 1
1164    }
1165
1166    async fn publish_admin(
1167        &self,
1168        event: AdminEvent,
1169        kind: &'static str,
1170    ) -> Result<ChainCommit, AdminError> {
1171        // When an operator registry is installed, sign the
1172        // admin event and route through SignedAdminCommit so
1173        // the substrate verifier sees the operator's signature.
1174        // Otherwise (in-process tests, dev mode) fall back to
1175        // the unsigned admin path. Per the plan's locked
1176        // decision #2 every Deck commit is signed in
1177        // production deployments — the substrate verifier is
1178        // the source of truth.
1179        let wire_event = if self.operator_registry.is_some() {
1180            let issued_at_ms = super::meshos::now_ms_since_unix_epoch();
1181            let signature = self.identity.sign_admin_event(&event, issued_at_ms);
1182            MeshOsEvent::SignedAdminCommit {
1183                event,
1184                signature,
1185                issued_at_ms,
1186            }
1187        } else {
1188            MeshOsEvent::AdminEvent(event)
1189        };
1190        self.handle
1191            .publish(wire_event)
1192            .await
1193            .map_err(AdminError::from)?;
1194        Ok(ChainCommit {
1195            commit_id: self.next_commit_id(),
1196            operator_id: self.identity.operator_id,
1197            event_kind: kind,
1198            committed_at: SystemTime::now(),
1199        })
1200    }
1201
1202    async fn publish_signed_ice(
1203        &self,
1204        proposal: IceActionProposal,
1205        signatures: Vec<OperatorSignature>,
1206        issued_at_ms: u64,
1207        blast_hash: super::meshos::BlastRadiusHash,
1208        kind: &'static str,
1209    ) -> Result<ChainCommit, IceError> {
1210        self.handle
1211            .publish(MeshOsEvent::SignedIceCommit {
1212                proposal,
1213                signatures,
1214                issued_at_ms,
1215                blast_hash,
1216            })
1217            .await
1218            .map_err(IceError::from)?;
1219        Ok(ChainCommit {
1220            commit_id: self.next_commit_id(),
1221            operator_id: self.identity.operator_id,
1222            event_kind: kind,
1223            committed_at: SystemTime::now(),
1224        })
1225    }
1226}
1227
1228/// Typed admin-command surface. Constructed via
1229/// [`DeckClient::admin`]; every method maps to one
1230/// [`super::meshos::AdminEvent`] variant.
1231///
1232/// Phase 1 publishes events onto the loop's event stream
1233/// directly (matching the substrate's current admin-event entry
1234/// path). When the substrate adds a signed admin chain, this
1235/// surface gains a signing step before the publish — the
1236/// per-method type signatures don't change.
1237pub struct AdminCommands<'a> {
1238    client: &'a DeckClient,
1239}
1240
1241impl AdminCommands<'_> {
1242    /// Drain `node`'s workload over `drain_for`. Replicas
1243    /// migrate; daemons drain via
1244    /// [`crate::adapter::net::compute::DaemonControl::DrainStart`]
1245    /// once the loop sees the resulting `EnteringMaintenance`
1246    /// state. The duration is the wait-window from the loop's
1247    /// next tick; the substrate computes the absolute deadline
1248    /// at fold time so two replays of the same event stream
1249    /// produce identical `since` / `deadline` instants.
1250    pub async fn drain(
1251        &self,
1252        node: NodeId,
1253        drain_for: Duration,
1254    ) -> Result<ChainCommit, AdminError> {
1255        self.client
1256            .publish_admin(AdminEvent::Drain { node, drain_for }, "drain")
1257            .await
1258    }
1259
1260    /// Begin a maintenance window for `node`. `drain_for` is
1261    /// the drain-window duration; `None` defers to the cluster's
1262    /// configured default. The substrate-side fold computes the
1263    /// absolute deadline as `last_tick + drain_for`.
1264    pub async fn enter_maintenance(
1265        &self,
1266        node: NodeId,
1267        drain_for: Option<Duration>,
1268    ) -> Result<ChainCommit, AdminError> {
1269        self.client
1270            .publish_admin(
1271                AdminEvent::EnterMaintenance { node, drain_for },
1272                "enter_maintenance",
1273            )
1274            .await
1275    }
1276
1277    /// End a maintenance window for `node`.
1278    pub async fn exit_maintenance(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
1279        self.client
1280            .publish_admin(AdminEvent::ExitMaintenance { node }, "exit_maintenance")
1281            .await
1282    }
1283
1284    /// Mark `node` ineligible for new placements (existing
1285    /// workload stays).
1286    pub async fn cordon(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
1287        self.client
1288            .publish_admin(AdminEvent::Cordon { node }, "cordon")
1289            .await
1290    }
1291
1292    /// Remove a prior cordon.
1293    pub async fn uncordon(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
1294        self.client
1295            .publish_admin(AdminEvent::Uncordon { node }, "uncordon")
1296            .await
1297    }
1298
1299    /// Drop the listed replicas from `node`.
1300    pub async fn drop_replicas(
1301        &self,
1302        node: NodeId,
1303        chains: Vec<ChainId>,
1304    ) -> Result<ChainCommit, AdminError> {
1305        self.client
1306            .publish_admin(AdminEvent::DropReplicas { node, chains }, "drop_replicas")
1307            .await
1308    }
1309
1310    /// Force a placement recompute for `node`.
1311    pub async fn invalidate_placement(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
1312        self.client
1313            .publish_admin(
1314                AdminEvent::InvalidatePlacement { node },
1315                "invalidate_placement",
1316            )
1317            .await
1318    }
1319
1320    /// Force-restart every daemon on `node`.
1321    pub async fn restart_all_daemons(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
1322        self.client
1323            .publish_admin(
1324                AdminEvent::RestartAllDaemons { node },
1325                "restart_all_daemons",
1326            )
1327            .await
1328    }
1329
1330    /// Clear `node`'s local avoid list.
1331    pub async fn clear_avoid_list(&self, node: NodeId) -> Result<ChainCommit, AdminError> {
1332        self.client
1333            .publish_admin(AdminEvent::ClearAvoidList { node }, "clear_avoid_list")
1334            .await
1335    }
1336}
1337
1338/// Re-export the substrate-side signing types so the SDK
1339/// surface stays under `behavior::deck::*` for backwards
1340/// compatibility with the previous slice. The implementations
1341/// live alongside the rest of ICE in `behavior::meshos::ice`.
1342pub use super::meshos::{OperatorRegistry, OperatorSignature, VerifyError};
1343
1344impl OperatorIdentity {
1345    /// Sign `proposal` with this operator's ed25519 key, stamped
1346    /// at `issued_at_ms` and bound to the simulator's
1347    /// pre-execution preview hash `blast_hash`. Thin
1348    /// SDK-ergonomic wrapper over the substrate's
1349    /// [`OperatorSignature::sign`] constructor.
1350    ///
1351    /// The substrate verifier rebuilds the same domain-tagged
1352    /// payload — including `blast_hash` — and rejects bundles
1353    /// whose hash is the
1354    /// [`super::meshos::SIMULATION_REQUIRED_SENTINEL`]
1355    /// (substrate enforcement of locked decision #4) or whose
1356    /// freshness window has expired. Coordinators collecting
1357    /// signatures from multiple operators must share the same
1358    /// `(issued_at_ms, blast_hash)` pair across the bundle —
1359    /// fetch both from the [`IceProposal`] handle.
1360    pub fn sign_proposal(
1361        &self,
1362        proposal: &IceActionProposal,
1363        issued_at_ms: u64,
1364        blast_hash: &super::meshos::BlastRadiusHash,
1365    ) -> OperatorSignature {
1366        OperatorSignature::sign(self.keypair(), proposal, issued_at_ms, blast_hash)
1367    }
1368
1369    /// Sign an ordinary `AdminEvent` for the single-signature
1370    /// `SignedAdminCommit` path, stamped at `issued_at_ms`. The
1371    /// signature covers the substrate's
1372    /// [`super::meshos::admin_event_signing_payload(event, issued_at_ms)`]
1373    /// so the loop verifier and the SDK agree on the byte
1374    /// sequence.
1375    pub fn sign_admin_event(&self, event: &AdminEvent, issued_at_ms: u64) -> OperatorSignature {
1376        OperatorSignature::sign_admin(self.keypair(), event, issued_at_ms)
1377    }
1378}
1379
1380/// SDK-side translation of substrate
1381/// [`VerifyError`] to the `<<deck-sdk-kind:KIND>>MSG` envelope
1382/// the SDK's [`IceProposal::commit`] returns.
1383fn verify_error_to_ice(err: VerifyError) -> IceError {
1384    let kind = err.kind();
1385    IceError::new(kind, err.to_string())
1386}
1387
1388/// Break-glass operator surface. Constructed via
1389/// [`DeckClient::ice`]; each method returns an [`IceProposal`]
1390/// that must be `simulate()`d before `commit()` per the plan's
1391/// locked decision #4 (blast-radius simulation is mandatory
1392/// before any ICE commit).
1393pub struct IceCommands<'a> {
1394    client: &'a DeckClient,
1395}
1396
1397impl<'a> IceCommands<'a> {
1398    /// Propose a cluster-wide freeze. The returned
1399    /// [`IceProposal`] must be simulated then committed.
1400    pub fn freeze_cluster(&self, ttl: Duration) -> IceProposal<'a> {
1401        IceProposal::new(self.client, IceActionProposal::FreezeCluster { ttl })
1402    }
1403
1404    /// Propose flushing avoid-list entries under `scope`.
1405    /// See [`super::meshos::ice::IceActionProposal::FlushAvoidLists`]
1406    /// for the three scope semantics.
1407    pub fn flush_avoid_lists(&self, scope: super::meshos::AvoidScope) -> IceProposal<'a> {
1408        IceProposal::new(self.client, IceActionProposal::FlushAvoidLists { scope })
1409    }
1410
1411    /// Propose force-evicting `victim` from `chain` bypassing
1412    /// the scheduler's rebalance cooldown. Only the chain's
1413    /// elected leader emits the resulting `RequestEviction`
1414    /// action; non-leader nodes fold the admin event silently.
1415    pub fn force_evict_replica(&self, chain: ChainId, victim: NodeId) -> IceProposal<'a> {
1416        IceProposal::new(
1417            self.client,
1418            IceActionProposal::ForceEvictReplica { chain, victim },
1419        )
1420    }
1421
1422    /// Propose force-restarting `daemon` by resetting its
1423    /// supervisor backoff gate so reconcile fires `StartDaemon`
1424    /// on the next tick. No-op if the daemon is already in
1425    /// `Idle` backoff state.
1426    pub fn force_restart_daemon(&self, daemon: super::meshos::DaemonRef) -> IceProposal<'a> {
1427        IceProposal::new(
1428            self.client,
1429            IceActionProposal::ForceRestartDaemon { daemon },
1430        )
1431    }
1432
1433    /// Propose force-placing `chain` on `target`, bypassing
1434    /// the placement scorer. Only the chain's elected leader
1435    /// emits the resulting `RequestPlacement` action with
1436    /// `target: Some(target)`; non-leader nodes fold silently.
1437    /// No-op if `target` is already a holder.
1438    pub fn force_cutover(&self, chain: ChainId, target: NodeId) -> IceProposal<'a> {
1439        IceProposal::new(
1440            self.client,
1441            IceActionProposal::ForceCutover { chain, target },
1442        )
1443    }
1444
1445    /// Propose aborting an in-flight migration. The wire-form
1446    /// substrate plumbing records the commit on the audit
1447    /// ring; the dispatcher hookup that finds the running
1448    /// migration and stops it is future substrate work, so
1449    /// until that lands the proposal commits without
1450    /// actually halting the migration. The audit trail tracks
1451    /// the operator's intent regardless.
1452    pub fn kill_migration(&self, migration: super::meshos::MigrationId) -> IceProposal<'a> {
1453        IceProposal::new(self.client, IceActionProposal::KillMigration { migration })
1454    }
1455
1456    /// Propose cancelling an in-effect cluster freeze.
1457    pub fn thaw_cluster(&self) -> IceProposal<'a> {
1458        IceProposal::new(self.client, IceActionProposal::ThawCluster)
1459    }
1460}
1461
1462/// An ICE proposal — pre-simulation handle carrying the
1463/// underlying [`IceActionProposal`] plus the `issued_at_ms`
1464/// stamp pinned at construction.
1465///
1466/// Per the plan's locked decision #4 a [`Self::simulate`] call
1467/// must precede commit. The type-state split enforces this at
1468/// compile time: `IceProposal` does **not** expose `commit`;
1469/// only a successful `simulate()` returns a
1470/// [`SimulatedIceProposal`] whose `commit` is callable. A
1471/// caller that wants to commit must thread the proposal through
1472/// `simulate()` first — the type system rejects the alternative.
1473///
1474/// Both `IceProposal` and `SimulatedIceProposal` are `Send + Sync`
1475/// (no `Cell` / `RefCell` interior mutability), so callers can
1476/// move them across `tokio::spawn` boundaries freely.
1477pub struct IceProposal<'a> {
1478    client: &'a DeckClient,
1479    action: IceActionProposal,
1480    issued_at_ms: u64,
1481}
1482
1483impl<'a> IceProposal<'a> {
1484    fn new(client: &'a DeckClient, action: IceActionProposal) -> Self {
1485        Self {
1486            client,
1487            action,
1488            issued_at_ms: super::meshos::now_ms_since_unix_epoch(),
1489        }
1490    }
1491
1492    /// Borrow the underlying [`IceActionProposal`].
1493    pub fn action(&self) -> &IceActionProposal {
1494        &self.action
1495    }
1496
1497    /// Milliseconds-since-`UNIX_EPOCH` stamp pinned at
1498    /// construction. Operators participating in the multi-sig
1499    /// flow read this from the [`SimulatedIceProposal`] (after
1500    /// simulating); the value is identical because `simulate()`
1501    /// preserves it.
1502    pub fn issued_at_ms(&self) -> u64 {
1503        self.issued_at_ms
1504    }
1505
1506    /// Pre-execution preview. Runs the substrate's pure
1507    /// simulator against the runtime's latest snapshot and
1508    /// returns a [`SimulatedIceProposal`] holding the result.
1509    /// The returned type is the only place
1510    /// [`SimulatedIceProposal::commit`] can be called — the
1511    /// type-state pattern enforces locked decision #4 at
1512    /// compile time.
1513    pub async fn simulate(self) -> Result<SimulatedIceProposal<'a>, IceError> {
1514        let snap = self.client.snapshot_reader.read();
1515        let blast = simulate_ice_proposal(&snap, &self.action);
1516        Ok(SimulatedIceProposal {
1517            client: self.client,
1518            action: self.action,
1519            issued_at_ms: self.issued_at_ms,
1520            blast,
1521        })
1522    }
1523}
1524
1525/// An ICE proposal that has run [`IceProposal::simulate`] and
1526/// is ready for the operator signing + commit workflow.
1527/// Carries the simulator's [`BlastRadius`] output so the
1528/// Deck-the-binary UI can render the preview before the
1529/// operator approves; [`Self::commit`] hashes the
1530/// `BlastRadius` and signs the envelope with the operator key.
1531///
1532/// Every operator participating in the multi-signature
1533/// workflow must sign over the same
1534/// `(action, issued_at_ms, blast_hash)` triple — fetch via
1535/// [`Self::action`] / [`Self::issued_at_ms`] /
1536/// [`Self::blast_hash`].
1537pub struct SimulatedIceProposal<'a> {
1538    client: &'a DeckClient,
1539    action: IceActionProposal,
1540    issued_at_ms: u64,
1541    blast: BlastRadius,
1542}
1543
1544impl<'a> SimulatedIceProposal<'a> {
1545    /// Borrow the simulator's pre-execution preview.
1546    pub fn blast_radius(&self) -> &BlastRadius {
1547        &self.blast
1548    }
1549
1550    /// Borrow the underlying [`IceActionProposal`].
1551    pub fn action(&self) -> &IceActionProposal {
1552        &self.action
1553    }
1554
1555    /// Milliseconds-since-`UNIX_EPOCH` stamp pinned at the
1556    /// original [`IceProposal`]'s construction; signatures
1557    /// must cover this exact value.
1558    pub fn issued_at_ms(&self) -> u64 {
1559        self.issued_at_ms
1560    }
1561
1562    /// Blake3 digest of the simulator's [`BlastRadius`].
1563    /// Signatures must cover this hash; the substrate verifier
1564    /// rebuilds the same payload and rejects bundles whose
1565    /// signatures don't bind to it.
1566    pub fn blast_hash(&self) -> super::meshos::BlastRadiusHash {
1567        super::meshos::blast_radius_hash(&self.blast)
1568    }
1569
1570    /// Commit the proposal. Verifies
1571    /// `signatures.len() >= ice_signature_threshold` before
1572    /// publishing; returns
1573    /// `Err(IceError::insufficient_signatures)` otherwise.
1574    /// Substrate-side multi-operator-signature verification
1575    /// rebuilds the same domain-tagged signing envelope —
1576    /// including the blast-radius hash — and rejects any
1577    /// bundle whose signatures don't cover the exact
1578    /// `(action, issued_at_ms, blast_hash)` triple.
1579    pub async fn commit(self, signatures: &[OperatorSignature]) -> Result<ChainCommit, IceError> {
1580        let blast_hash = self.blast_hash();
1581        let threshold = self.client.config.ice_signature_threshold;
1582        if signatures.len() < threshold {
1583            return Err(IceError::new(
1584                "insufficient_signatures",
1585                format!(
1586                    "ICE commit requires {} operator signatures; got {}",
1587                    threshold,
1588                    signatures.len()
1589                ),
1590            ));
1591        }
1592        if let Some(registry) = self.client.operator_registry.as_ref() {
1593            // SDK-side gate: verify locally before publishing so
1594            // a malformed bundle fails fast with the right error
1595            // kind. The substrate-side verifier on the loop runs
1596            // the same check on every `SignedIceCommit` for the
1597            // belt-and-suspenders property: even an SDK that
1598            // skipped this gate gets rejected by the loop.
1599            let payload =
1600                ice_proposal_signing_payload(&self.action, self.issued_at_ms, &blast_hash);
1601            let mut unique_operators: std::collections::BTreeSet<u64> =
1602                std::collections::BTreeSet::new();
1603            for sig in signatures {
1604                registry
1605                    .verify(sig, &payload)
1606                    .map_err(verify_error_to_ice)?;
1607                unique_operators.insert(sig.operator_id);
1608            }
1609            // Mirror the substrate-side distinct-operator check
1610            // so duplicate signatures from a single operator can't
1611            // satisfy M-of-N at the SDK gate either.
1612            if unique_operators.len() < threshold {
1613                return Err(IceError::new(
1614                    "insufficient_signatures",
1615                    format!(
1616                        "ICE commit requires {} distinct operator signatures; got {} distinct",
1617                        threshold,
1618                        unique_operators.len()
1619                    ),
1620                ));
1621            }
1622            // Route via SignedIceCommit so the substrate verifier
1623            // sees the bundle. The inner AdminEvent folds only
1624            // after the loop's own verification passes. The
1625            // event_kind on the returned ChainCommit mirrors the
1626            // unsigned path so consumers see one stable
1627            // discriminator regardless of whether verification
1628            // was wired.
1629            let kind = self.action.kind();
1630            self.client
1631                .publish_signed_ice(
1632                    self.action,
1633                    signatures.to_vec(),
1634                    self.issued_at_ms,
1635                    blast_hash,
1636                    kind,
1637                )
1638                .await
1639        } else {
1640            // No registry: route via the unsigned admin path.
1641            // Useful for in-process tests where the SDK isn't
1642            // gating on identity at all. Freeze / thaw go
1643            // through the same path; the old `AdminCommands`
1644            // surface for those was removed because it
1645            // duplicated the ICE ceremony around the
1646            // simulate-before-commit gate (plan locked
1647            // decision #4).
1648            let kind = self.action.kind();
1649            let event = self.action.to_admin_event();
1650            self.client.publish_admin(event, kind).await
1651        }
1652    }
1653}
1654
1655/// Audit-query builder reading the substrate's admin audit
1656/// ring. Constructed via [`DeckClient::audit`]; chain filter
1657/// methods, then call [`Self::collect`] to materialize the
1658/// matching entries.
1659///
1660/// # Scope
1661///
1662/// Reads the in-memory admin audit ring exported on every
1663/// [`super::meshos::MeshOsSnapshot`]. The ring carries every
1664/// admin commit the loop observed — signed ICE bundles AND
1665/// unsigned admin events — bounded at
1666/// [`super::meshos::DEFAULT_MAX_ADMIN_AUDIT_RECORDS`]. The
1667/// unbounded historical replay path is the eventual admin
1668/// audit subchain (substrate gap); this in-memory ring is the
1669/// near-history surface Deck-the-binary renders against.
1670///
1671/// Per-method semantics:
1672///
1673/// - [`Self::recent`] — keep the last N entries (newest-first
1674///   in the result). When unspecified, returns every entry on
1675///   the ring.
1676/// - [`Self::by_operator`] — keep entries whose
1677///   [`super::meshos::AdminAuditRecord::operator_ids`] include
1678///   the given id.
1679/// - [`Self::between`] — keep entries whose
1680///   `committed_at_ms` falls inside `[start_ms, end_ms]`
1681///   (inclusive).
1682/// - [`Self::force_only`] — restrict the result to ICE-class
1683///   admin events (`AdminEvent::is_ice` returns `true`).
1684///   Drops ordinary admin commits (`drain`, `cordon`, …) from
1685///   the result.
1686pub struct AuditQuery<'a> {
1687    client: &'a DeckClient,
1688    limit: Option<usize>,
1689    operator_filter: Option<u64>,
1690    time_range: Option<(u64, u64)>,
1691    force_only: bool,
1692    since_seq: Option<u64>,
1693}
1694
1695impl<'a> AuditQuery<'a> {
1696    fn new(client: &'a DeckClient) -> Self {
1697        Self {
1698            client,
1699            limit: None,
1700            operator_filter: None,
1701            time_range: None,
1702            force_only: false,
1703            since_seq: None,
1704        }
1705    }
1706
1707    /// Cap the result at the most-recent `limit` entries.
1708    /// Returned order is newest-first.
1709    ///
1710    /// `limit = 0` returns an empty result by design — useful
1711    /// in higher-level builder flows that compute the cap from
1712    /// runtime config (operator typed 0, no records wanted).
1713    /// Callers that want "as many as the ring holds" should
1714    /// omit `recent()` entirely; the builder's default returns
1715    /// every entry.
1716    pub fn recent(mut self, limit: usize) -> Self {
1717        self.limit = Some(limit);
1718        self
1719    }
1720
1721    /// Keep only entries that carry `op_id` in their
1722    /// `operator_ids` list. Records where one of several
1723    /// operators in a multi-op bundle matches still surface.
1724    pub fn by_operator(mut self, op_id: u64) -> Self {
1725        self.operator_filter = Some(op_id);
1726        self
1727    }
1728
1729    /// Keep only entries whose `committed_at_ms` falls inside
1730    /// `[start_ms, end_ms]` (inclusive on both ends).
1731    /// Milliseconds since `UNIX_EPOCH`.
1732    pub fn between(mut self, start_ms: u64, end_ms: u64) -> Self {
1733        self.time_range = Some((start_ms, end_ms));
1734        self
1735    }
1736
1737    /// Restrict the result to ICE force operations. The audit
1738    /// ring now interleaves ordinary signed-admin commits with
1739    /// ICE bundles, so this filter actively drops non-ICE
1740    /// records via [`super::meshos::AdminEvent::is_ice`].
1741    pub fn force_only(mut self) -> Self {
1742        self.force_only = true;
1743        self
1744    }
1745
1746    /// Restrict the result to records with `seq > since_seq`.
1747    /// Pagination primitive: consumers persist the last-seen
1748    /// `seq` and resume from there across restarts. For
1749    /// [`Self::stream`] the value seeds the stream's
1750    /// watermark — the stream tails from `since_seq + 1`
1751    /// onward rather than from "first new record after now".
1752    ///
1753    /// `since(0)` means "from the beginning of what's still in
1754    /// the ring" — equivalent to omitting `since()` entirely
1755    /// in terms of what records are returned. To tail only
1756    /// records that arrive after subscribe time, pair this
1757    /// with [`DeckClient::audit_head_seq`]: read the head
1758    /// once, then `since(head)`. The same convention applies
1759    /// to [`LogFilter::since`] and
1760    /// [`DeckClient::subscribe_failures`].
1761    pub fn since(mut self, since_seq: u64) -> Self {
1762        self.since_seq = Some(since_seq);
1763        self
1764    }
1765
1766    /// Materialize matching entries. Reads the runtime's
1767    /// latest snapshot, applies the configured filters, and
1768    /// returns the matching entries newest-first. Cheap — one
1769    /// snapshot read + a single iterator pass.
1770    pub fn collect(self) -> Vec<super::meshos::AdminAuditRecord> {
1771        let snap = self.client.snapshot_reader.read();
1772        let mut matched: Vec<super::meshos::AdminAuditRecord> = snap
1773            .admin_audit
1774            .iter()
1775            .filter(|r| {
1776                if let Some(since) = self.since_seq {
1777                    if r.seq <= since {
1778                        return false;
1779                    }
1780                }
1781                if let Some(op_id) = self.operator_filter {
1782                    if !r.operator_ids.contains(&op_id) {
1783                        return false;
1784                    }
1785                }
1786                if let Some((start, end)) = self.time_range {
1787                    if r.committed_at_ms < start || r.committed_at_ms > end {
1788                        return false;
1789                    }
1790                }
1791                if self.force_only && !r.event.is_ice() {
1792                    return false;
1793                }
1794                true
1795            })
1796            .cloned()
1797            .collect();
1798        // Ring order is oldest-first; the natural operator UI
1799        // shape is newest-first.
1800        matched.reverse();
1801        if let Some(limit) = self.limit {
1802            matched.truncate(limit);
1803        }
1804        matched
1805    }
1806
1807    /// Tail mode: convert the query into an async stream that
1808    /// yields each matching audit record as it arrives on the
1809    /// substrate's ring. Polls the snapshot reader at
1810    /// `DeckClientConfig::snapshot_poll_interval`. The
1811    /// `recent(limit)` filter is ignored in tail mode — the
1812    /// stream emits continuously, not a bounded batch.
1813    ///
1814    /// Uses [`super::meshos::AdminAuditRecord::seq`] to dedup
1815    /// across polls so two commits in the same millisecond
1816    /// never collapse.
1817    pub fn stream(self) -> AuditStream {
1818        AuditStream::new(
1819            self.client.snapshot_reader.clone(),
1820            self.client.config.snapshot_poll_interval,
1821            AuditFilter {
1822                operator: self.operator_filter,
1823                time_range: self.time_range,
1824                force_only: self.force_only,
1825            },
1826            self.since_seq.unwrap_or(0),
1827        )
1828    }
1829}
1830
1831/// Compiled audit filter the [`AuditStream`] re-applies on
1832/// every poll. Internal — exposed as `AuditQuery`'s builder
1833/// shape, not directly constructible by SDK consumers.
1834#[derive(Clone, Debug)]
1835struct AuditFilter {
1836    operator: Option<u64>,
1837    time_range: Option<(u64, u64)>,
1838    force_only: bool,
1839}
1840
1841impl AuditFilter {
1842    fn matches(&self, record: &super::meshos::AdminAuditRecord) -> bool {
1843        if let Some(op_id) = self.operator {
1844            if !record.operator_ids.contains(&op_id) {
1845                return false;
1846            }
1847        }
1848        if let Some((start, end)) = self.time_range {
1849            if record.committed_at_ms < start || record.committed_at_ms > end {
1850                return false;
1851            }
1852        }
1853        if self.force_only && !record.event.is_ice() {
1854            return false;
1855        }
1856        true
1857    }
1858}
1859
1860/// Audit-tail stream. Emits each matching
1861/// [`super::meshos::AdminAuditRecord`] as it lands on the
1862/// substrate's ring. Built via [`AuditQuery::stream`].
1863///
1864/// Dedup uses the per-runtime monotonic
1865/// [`super::meshos::AdminAuditRecord::seq`] field — the
1866/// stream tracks the highest seq it's emitted, and on each
1867/// poll yields records strictly above that watermark.
1868pub struct AuditStream {
1869    reader: super::meshos::MeshOsSnapshotReader,
1870    interval: Interval,
1871    filter: AuditFilter,
1872    last_seq: u64,
1873    /// Queue of records pending emission. Populated on a poll
1874    /// when multiple matching records arrived since the last
1875    /// tick; drained one-per-`poll_next` so the consumer sees
1876    /// each commit individually.
1877    queued: std::collections::VecDeque<super::meshos::AdminAuditRecord>,
1878}
1879
1880impl AuditStream {
1881    fn new(
1882        reader: super::meshos::MeshOsSnapshotReader,
1883        poll_interval: Duration,
1884        filter: AuditFilter,
1885        initial_seq_watermark: u64,
1886    ) -> Self {
1887        let poll_interval = poll_interval.max(Duration::from_millis(1));
1888        Self {
1889            reader,
1890            interval: interval(poll_interval),
1891            filter,
1892            last_seq: initial_seq_watermark,
1893            queued: std::collections::VecDeque::new(),
1894        }
1895    }
1896}
1897
1898impl Stream for AuditStream {
1899    type Item = Result<super::meshos::AdminAuditRecord, DeckError>;
1900
1901    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1902        // Drain any queued records first so the consumer sees
1903        // every record from a multi-record poll individually.
1904        if let Some(record) = self.queued.pop_front() {
1905            return Poll::Ready(Some(Ok(record)));
1906        }
1907        // Wait for the next poll tick.
1908        match self.interval.poll_tick(cx) {
1909            Poll::Ready(_) => {
1910                let snap = self.reader.read();
1911                let last_seq = self.last_seq;
1912                // Ring order is oldest-first; iterate forward.
1913                let mut max_seq = last_seq;
1914                for record in snap.admin_audit.iter().cloned() {
1915                    if record.seq <= last_seq {
1916                        continue;
1917                    }
1918                    if record.seq > max_seq {
1919                        max_seq = record.seq;
1920                    }
1921                    if self.filter.matches(&record) {
1922                        self.queued.push_back(record);
1923                    }
1924                }
1925                self.last_seq = max_seq;
1926                if let Some(record) = self.queued.pop_front() {
1927                    Poll::Ready(Some(Ok(record)))
1928                } else {
1929                    // The interval consumed its Ready tick on
1930                    // this poll; the next poll calls poll_tick
1931                    // again, which registers its own waker for
1932                    // the next period. No explicit wake_by_ref
1933                    // needed.
1934                    Poll::Pending
1935                }
1936            }
1937            Poll::Pending => Poll::Pending,
1938        }
1939    }
1940}
1941
1942/// Log-stream filter. Default is "everything"; chain the
1943/// builder methods to narrow.
1944#[derive(Clone, Debug, Default)]
1945pub struct LogFilter {
1946    /// Minimum severity. Records below this are dropped.
1947    /// `None` matches every level.
1948    pub min_level: Option<super::meshos::LogLevel>,
1949    /// Restrict to a specific daemon. `None` matches every
1950    /// daemon (and substrate-level lines with `daemon_id =
1951    /// None`).
1952    pub daemon_id: Option<u64>,
1953    /// Restrict to a specific node. `None` matches every node.
1954    /// Future-relevant when remote log lines arrive via the
1955    /// per-daemon RedEX-tail integration.
1956    pub node_id: Option<NodeId>,
1957    /// Initial seq watermark — the stream tails from
1958    /// `since_seq + 1` onward rather than from "first new
1959    /// record after subscribe-time." Pagination primitive:
1960    /// consumers persist the last-seen seq and resume.
1961    pub since_seq: Option<u64>,
1962}
1963
1964impl LogFilter {
1965    /// Empty filter — matches every record on the ring.
1966    pub fn new() -> Self {
1967        Self::default()
1968    }
1969
1970    /// Restrict to records at or above `level`.
1971    pub fn min_level(mut self, level: super::meshos::LogLevel) -> Self {
1972        self.min_level = Some(level);
1973        self
1974    }
1975
1976    /// Restrict to records originating from `daemon_id`.
1977    pub fn with_daemon(mut self, daemon_id: u64) -> Self {
1978        self.daemon_id = Some(daemon_id);
1979        self
1980    }
1981
1982    /// Restrict to records originating from `node_id`.
1983    pub fn with_node(mut self, node_id: NodeId) -> Self {
1984        self.node_id = Some(node_id);
1985        self
1986    }
1987
1988    /// Seed the stream's watermark to `since_seq`. The first
1989    /// record yielded has `seq > since_seq`. `since(0)` means
1990    /// "from the beginning of what's still in the ring" —
1991    /// to tail only post-subscribe records pair with
1992    /// [`DeckClient::log_head_seq`]: read the head once,
1993    /// then `since(head)`. Same convention as
1994    /// [`AuditQuery::since`].
1995    pub fn since(mut self, since_seq: u64) -> Self {
1996        self.since_seq = Some(since_seq);
1997        self
1998    }
1999
2000    fn matches(&self, record: &super::meshos::LogRecord) -> bool {
2001        if let Some(min) = self.min_level {
2002            if record.level < min {
2003                return false;
2004            }
2005        }
2006        if let Some(id) = self.daemon_id {
2007            if record.daemon_id != Some(id) {
2008                return false;
2009            }
2010        }
2011        if let Some(node) = self.node_id {
2012            if record.node_id != Some(node) {
2013                return false;
2014            }
2015        }
2016        true
2017    }
2018}
2019
2020/// Log-tail stream returned by [`DeckClient::subscribe_logs`].
2021/// Yields each matching [`super::meshos::LogRecord`] once.
2022/// Dedups across snapshot polls via the per-runtime monotonic
2023/// `LogRecord::seq` (same pattern as [`AuditStream`]).
2024pub struct LogStream {
2025    reader: super::meshos::MeshOsSnapshotReader,
2026    interval: Interval,
2027    filter: LogFilter,
2028    last_seq: u64,
2029    queued: std::collections::VecDeque<super::meshos::LogRecord>,
2030}
2031
2032impl LogStream {
2033    fn new(
2034        reader: super::meshos::MeshOsSnapshotReader,
2035        poll_interval: Duration,
2036        filter: LogFilter,
2037    ) -> Self {
2038        let poll_interval = poll_interval.max(Duration::from_millis(1));
2039        let last_seq = filter.since_seq.unwrap_or(0);
2040        Self {
2041            reader,
2042            interval: interval(poll_interval),
2043            filter,
2044            last_seq,
2045            queued: std::collections::VecDeque::new(),
2046        }
2047    }
2048}
2049
2050impl Stream for LogStream {
2051    type Item = Result<super::meshos::LogRecord, DeckError>;
2052
2053    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2054        if let Some(record) = self.queued.pop_front() {
2055            return Poll::Ready(Some(Ok(record)));
2056        }
2057        match self.interval.poll_tick(cx) {
2058            Poll::Ready(_) => {
2059                let snap = self.reader.read();
2060                let last_seq = self.last_seq;
2061                let mut max_seq = last_seq;
2062                for record in snap.log_ring.iter().cloned() {
2063                    if record.seq <= last_seq {
2064                        continue;
2065                    }
2066                    if record.seq > max_seq {
2067                        max_seq = record.seq;
2068                    }
2069                    if self.filter.matches(&record) {
2070                        self.queued.push_back(record);
2071                    }
2072                }
2073                self.last_seq = max_seq;
2074                if let Some(record) = self.queued.pop_front() {
2075                    Poll::Ready(Some(Ok(record)))
2076                } else {
2077                    cx.waker().wake_by_ref();
2078                    Poll::Pending
2079                }
2080            }
2081            Poll::Pending => Poll::Pending,
2082        }
2083    }
2084}
2085
2086/// Failure-tail stream returned by
2087/// [`DeckClient::subscribe_failures`]. Yields each new
2088/// [`super::meshos::FailureRecord`] the executor records.
2089/// Dedups across snapshot polls via the per-runtime
2090/// monotonic `FailureRecord::seq` (same pattern as
2091/// [`AuditStream`] / [`LogStream`]).
2092///
2093/// Chain-replay-derived failure records carry `seq = 0`;
2094/// they're naturally skipped because the watermark logic
2095/// is `seq > last_seq` and the initial watermark defaults
2096/// to 0.
2097pub struct FailureStream {
2098    reader: super::meshos::MeshOsSnapshotReader,
2099    interval: Interval,
2100    last_seq: u64,
2101    queued: std::collections::VecDeque<super::meshos::FailureRecord>,
2102}
2103
2104impl FailureStream {
2105    fn new(
2106        reader: super::meshos::MeshOsSnapshotReader,
2107        poll_interval: Duration,
2108        initial_seq_watermark: u64,
2109    ) -> Self {
2110        let poll_interval = poll_interval.max(Duration::from_millis(1));
2111        Self {
2112            reader,
2113            interval: interval(poll_interval),
2114            last_seq: initial_seq_watermark,
2115            queued: std::collections::VecDeque::new(),
2116        }
2117    }
2118}
2119
2120impl Stream for FailureStream {
2121    type Item = Result<super::meshos::FailureRecord, DeckError>;
2122
2123    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2124        if let Some(record) = self.queued.pop_front() {
2125            return Poll::Ready(Some(Ok(record)));
2126        }
2127        match self.interval.poll_tick(cx) {
2128            Poll::Ready(_) => {
2129                let snap = self.reader.read();
2130                let last_seq = self.last_seq;
2131                let mut max_seq = last_seq;
2132                for record in snap.recent_failures.iter().cloned() {
2133                    if record.seq <= last_seq {
2134                        continue;
2135                    }
2136                    if record.seq > max_seq {
2137                        max_seq = record.seq;
2138                    }
2139                    self.queued.push_back(record);
2140                }
2141                self.last_seq = max_seq;
2142                if let Some(record) = self.queued.pop_front() {
2143                    Poll::Ready(Some(Ok(record)))
2144                } else {
2145                    cx.waker().wake_by_ref();
2146                    Poll::Pending
2147                }
2148            }
2149            Poll::Pending => Poll::Pending,
2150        }
2151    }
2152}
2153
2154/// Shared, persistent snapshot change-generation receiver for the deck
2155/// `Stream` impls. Held in an async `Mutex` so the `'static` futures the
2156/// streams store in `pending` can re-borrow the SAME receiver each fire.
2157/// `watch::Receiver` tracks its seen generation, so re-arming after a
2158/// fire never misses a generation bumped in between — missed-wakeup-safe
2159/// even with a long ceiling, unlike a fresh subscription per arm (E-10).
2160type SharedSnapshotChangeRx = Arc<tokio::sync::Mutex<tokio::sync::watch::Receiver<u64>>>;
2161
2162/// Build the `'static` "next structural change" future a deck `Stream`
2163/// stores in `pending`. Locks the shared receiver (uncontended — one
2164/// per stream, one in-flight future at a time) and awaits the next
2165/// change-generation bump.
2166fn next_snapshot_change(
2167    rx: SharedSnapshotChangeRx,
2168) -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
2169    Box::pin(async move {
2170        let mut guard = rx.lock().await;
2171        // Err only if the loop's sender dropped (runtime gone) — then
2172        // the stream falls back to its ceiling.
2173        let _ = guard.changed().await;
2174    })
2175}
2176
2177/// Stream over the runtime's snapshot reader. Event-driven (E-9/E-10):
2178/// wakes on the loop's structural change-generation bump or the ceiling
2179/// tick, re-reads, and emits. The configured cadence is retained as a
2180/// debounce ceiling (and the first poll emits immediately so a consumer
2181/// sees the initial state). Phase 1 emits on every wake (consumers
2182/// de-dupe if they care); the substrate's tail-with-replay path replaces
2183/// this with a chain-driven stream when it lands.
2184pub struct SnapshotStream {
2185    reader: MeshOsSnapshotReader,
2186    /// Debounce-ceiling timer — bounds latency if a publish edge is
2187    /// missed and fires immediately on the first poll.
2188    ceiling: Interval,
2189    /// Persistent change-generation receiver shared with `pending`'s
2190    /// futures, so re-arming after a fire never misses an intervening
2191    /// bump (missed-wakeup-safe).
2192    change_rx: SharedSnapshotChangeRx,
2193    /// In-flight "next structural change" future, re-armed each time it
2194    /// fires. The boxed future is `Send` but `!Sync`; the `Mutex`
2195    /// restores `Sync` (required by the pyo3/napi `#[pyclass]` wrappers)
2196    /// without an async lock — `poll_next` holds it only across the sync
2197    /// poll, never across an await.
2198    pending: parking_lot::Mutex<Pin<Box<dyn std::future::Future<Output = ()> + Send>>>,
2199}
2200
2201impl SnapshotStream {
2202    fn new(reader: MeshOsSnapshotReader, poll_interval: Duration) -> Self {
2203        // Floor the interval so a zero-duration config doesn't
2204        // hot-spin the executor.
2205        let poll_interval = poll_interval.max(Duration::from_millis(1));
2206        let change_rx: SharedSnapshotChangeRx =
2207            Arc::new(tokio::sync::Mutex::new(reader.subscribe_changes()));
2208        let pending = parking_lot::Mutex::new(next_snapshot_change(change_rx.clone()));
2209        Self {
2210            reader,
2211            ceiling: interval(poll_interval),
2212            change_rx,
2213            pending,
2214        }
2215    }
2216}
2217
2218impl Stream for SnapshotStream {
2219    type Item = Result<MeshOsSnapshot, DeckError>;
2220
2221    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2222        // `Self: Unpin` (all fields are), so project to `&mut Self` once
2223        // and borrow disjoint fields freely.
2224        let this = self.get_mut();
2225        // Poll the publish signal; re-arm for the next publish if it
2226        // fired. Then poll the ceiling so its waker stays registered
2227        // even when the change arm woke us. The ceiling's first tick is
2228        // immediate, so the initial poll emits current state.
2229        let changed = {
2230            let mut pending = this.pending.lock();
2231            let ready = pending.as_mut().poll(cx).is_ready();
2232            if ready {
2233                *pending = next_snapshot_change(this.change_rx.clone());
2234            }
2235            ready
2236        };
2237        let ticked = this.ceiling.poll_tick(cx).is_ready();
2238        if changed || ticked {
2239            Poll::Ready(Some(Ok(this.reader.read())))
2240        } else {
2241            Poll::Pending
2242        }
2243    }
2244}
2245
2246/// Dedup'd [`StatusSummary`] stream. Returned by
2247/// [`DeckClient::status_summary_stream`]. Polls the snapshot
2248/// reader at the client's configured cadence, builds a fresh
2249/// summary, and yields it only when the summary differs from
2250/// the last emitted one (`PartialEq` dedup). The first poll
2251/// always emits — operators rendering a dashboard see the
2252/// initial state immediately, then change-driven updates from
2253/// there.
2254pub struct StatusSummaryStream {
2255    reader: super::meshos::MeshOsSnapshotReader,
2256    /// Debounce-ceiling timer (also fires immediately on first poll).
2257    ceiling: Interval,
2258    /// Persistent change-generation receiver shared with `pending`'s
2259    /// futures (missed-wakeup-safe re-arm; see [`SnapshotStream`]).
2260    change_rx: SharedSnapshotChangeRx,
2261    /// In-flight "next structural change" future, re-armed each time it
2262    /// fires. `Mutex` restores `Sync` over the `Send`-but-`!Sync` boxed
2263    /// future (the pyo3/napi `#[pyclass]` wrappers require it); the
2264    /// lock is held only across the sync poll, never across an await.
2265    pending: parking_lot::Mutex<Pin<Box<dyn std::future::Future<Output = ()> + Send>>>,
2266    last_emitted: Option<StatusSummary>,
2267}
2268
2269impl StatusSummaryStream {
2270    fn new(reader: super::meshos::MeshOsSnapshotReader, poll_interval: Duration) -> Self {
2271        let poll_interval = poll_interval.max(Duration::from_millis(1));
2272        let change_rx: SharedSnapshotChangeRx =
2273            Arc::new(tokio::sync::Mutex::new(reader.subscribe_changes()));
2274        let pending = parking_lot::Mutex::new(next_snapshot_change(change_rx.clone()));
2275        Self {
2276            reader,
2277            ceiling: interval(poll_interval),
2278            change_rx,
2279            pending,
2280            last_emitted: None,
2281        }
2282    }
2283}
2284
2285impl Stream for StatusSummaryStream {
2286    type Item = Result<StatusSummary, DeckError>;
2287
2288    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2289        let this = self.get_mut();
2290        // Event-driven (E-9): wake on each publish or the ceiling tick,
2291        // build a fresh summary, and emit only when it differs from the
2292        // last (PartialEq dedup). Loop on a dedup-suppressed wake so the
2293        // re-armed publish future is re-polled (registering its waker)
2294        // before we park — otherwise a suppressed edge would drop the
2295        // event-driven wake and fall back to the ceiling.
2296        loop {
2297            let changed = {
2298                let mut pending = this.pending.lock();
2299                let ready = pending.as_mut().poll(cx).is_ready();
2300                if ready {
2301                    *pending = next_snapshot_change(this.change_rx.clone());
2302                }
2303                ready
2304            };
2305            let ticked = this.ceiling.poll_tick(cx).is_ready();
2306            if !changed && !ticked {
2307                return Poll::Pending;
2308            }
2309            let summary = build_status_summary(&this.reader.read());
2310            let should_emit = match &this.last_emitted {
2311                None => true,
2312                Some(prev) => prev != &summary,
2313            };
2314            if should_emit {
2315                this.last_emitted = Some(summary.clone());
2316                return Poll::Ready(Some(Ok(summary)));
2317            }
2318            // Unchanged — loop to re-poll both wake sources.
2319        }
2320    }
2321}
2322
2323#[cfg(test)]
2324mod tests {
2325    use super::*;
2326    use crate::adapter::net::behavior::meshos::{
2327        LoggingDispatcher, MaintenanceTransition, MeshOsAction, MeshOsConfig,
2328    };
2329
2330    fn fast_config() -> MeshOsConfig {
2331        MeshOsConfig::default()
2332            .with_this_node(42)
2333            .with_tick_interval(Duration::from_millis(10))
2334            .with_event_queue_capacity(64)
2335            .with_action_queue_capacity(64)
2336    }
2337
2338    #[tokio::test]
2339    async fn operator_identity_id_matches_keypair_origin_hash() {
2340        let kp = EntityKeypair::generate();
2341        let origin = kp.origin_hash();
2342        let identity = OperatorIdentity::from_keypair(kp);
2343        assert_eq!(identity.operator_id(), origin);
2344    }
2345
2346    #[tokio::test]
2347    async fn deck_subnet_and_gateway_accessors_default_to_empty_without_mesh() {
2348        // Pin the "no mesh installed" contract — the new
2349        // subnet/gateway/channel accessors must surface
2350        // sensible empties rather than panicking. CliContext
2351        // currently wires DeckClient without a MeshNode; this
2352        // is the path operator tooling sees today.
2353        let dispatcher = Arc::new(LoggingDispatcher::new());
2354        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2355        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
2356        assert_eq!(deck.local_subnet(), None);
2357        assert!(deck.known_subnets().is_empty());
2358        assert!(deck.gateway_stats().is_none());
2359        assert!(deck.gateway_exports().is_empty());
2360        assert_eq!(deck.channel_visibility("any/name"), None);
2361        assert!(deck.channels().is_empty());
2362        assert_eq!(deck.channel_wire_hash("any/name"), None);
2363        let _ = runtime.shutdown().await;
2364    }
2365
2366    #[tokio::test]
2367    async fn deck_with_mesh_surfaces_local_subnet_and_gateway_stats() {
2368        // Pin the "mesh installed" contract — `with_mesh` wires
2369        // the MeshNode reference through; the accessors then
2370        // return the substrate-level values. Uses
2371        // `set_channel_configs` to install a registry so the
2372        // gateway is built and `gateway_stats()` returns Some.
2373        use crate::adapter::net::{
2374            ChannelConfig, ChannelConfigRegistry, ChannelId, MeshNodeConfig, SubnetId, Visibility,
2375        };
2376        use std::net::SocketAddr;
2377
2378        let dispatcher = Arc::new(LoggingDispatcher::new());
2379        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2380
2381        let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
2382        let mut mesh_cfg = MeshNodeConfig::new(addr, [0x17u8; 32]);
2383        mesh_cfg = mesh_cfg.with_subnet(SubnetId::new(&[3, 7]));
2384        let mut mesh = crate::adapter::net::MeshNode::new(EntityKeypair::generate(), mesh_cfg)
2385            .await
2386            .expect("MeshNode::new");
2387        let registry = Arc::new(ChannelConfigRegistry::new());
2388        let metrics_id = ChannelId::parse("internal/metrics").expect("channel id");
2389        registry.insert(
2390            ChannelConfig::new(metrics_id.clone()).with_visibility(Visibility::SubnetLocal),
2391        );
2392        mesh.set_channel_configs(registry);
2393        let mesh = Arc::new(mesh);
2394
2395        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate())
2396            .with_mesh(mesh.clone());
2397
2398        assert_eq!(deck.local_subnet(), Some(SubnetId::new(&[3, 7])));
2399        let stats = deck.gateway_stats().expect("gateway installed");
2400        assert_eq!(stats.local_subnet, SubnetId::new(&[3, 7]));
2401        assert_eq!(stats.forwarded, 0);
2402        assert_eq!(stats.dropped, 0);
2403        assert_eq!(stats.export_rules, 0);
2404        assert!(stats.peer_subnets.is_empty());
2405
2406        // Channel-visibility lookup round-trips the configured
2407        // visibility for the one channel we registered.
2408        assert_eq!(
2409            deck.channel_visibility("internal/metrics"),
2410            Some(Visibility::SubnetLocal),
2411        );
2412        // The list surface mirrors the same channel.
2413        let channels = deck.channels();
2414        assert_eq!(channels.len(), 1);
2415        assert_eq!(channels[0].0, "internal/metrics");
2416        assert_eq!(channels[0].1, Visibility::SubnetLocal);
2417        // Wire-hash + canonical lookups resolve.
2418        assert_eq!(
2419            deck.channel_wire_hash("internal/metrics"),
2420            Some(metrics_id.wire_hash()),
2421        );
2422        assert_eq!(
2423            deck.channel_canonical_hash("internal/metrics"),
2424            Some(metrics_id.hash()),
2425        );
2426
2427        let _ = runtime.shutdown().await;
2428    }
2429
2430    #[tokio::test]
2431    async fn deck_error_display_carries_kind_discriminator() {
2432        let err = DeckError::new("unknown_node", "node 99 is not in the cluster");
2433        let rendered = err.to_string();
2434        assert!(
2435            rendered.contains("<<deck-sdk-kind:unknown_node>>"),
2436            "expected discriminator envelope, got {rendered:?}",
2437        );
2438    }
2439
2440    #[tokio::test]
2441    async fn admin_enter_maintenance_publishes_admin_event_and_returns_commit() {
2442        // Sanity that the SDK's admin path lands an AdminEvent on
2443        // the loop. We don't drive the executor here — the
2444        // assertion is "commit handle was returned + the loop's
2445        // fold saw the event," verified via the snapshot reader's
2446        // local_maintenance transition.
2447        let dispatcher = Arc::new(LoggingDispatcher::new());
2448        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2449        let identity = OperatorIdentity::generate();
2450        let deck = DeckClient::from_runtime(&runtime, identity.clone());
2451        let commit = deck
2452            .admin()
2453            .enter_maintenance(42, None)
2454            .await
2455            .expect("commit");
2456        assert_eq!(commit.operator_id(), identity.operator_id());
2457        assert_eq!(commit.event_kind(), "enter_maintenance");
2458        assert!(commit.commit_id() >= 1);
2459
2460        // Give the loop a tick to fold + publish the post-state
2461        // snapshot. The maintenance enter triggers an
2462        // `EnteringMaintenance` discriminant; the snapshot reader
2463        // reflects it.
2464        tokio::time::sleep(Duration::from_millis(80)).await;
2465        let snap = runtime.snapshot();
2466        assert!(
2467            !matches!(
2468                snap.local_maintenance,
2469                crate::adapter::net::behavior::meshos::MaintenanceStateSnapshot::Active
2470            ),
2471            "local maintenance should have transitioned out of Active, got {:?}",
2472            snap.local_maintenance,
2473        );
2474
2475        let _ = runtime.shutdown().await;
2476    }
2477
2478    #[tokio::test]
2479    async fn admin_drop_replicas_publishes_with_supplied_chain_ids() {
2480        let dispatcher = Arc::new(LoggingDispatcher::new());
2481        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2482        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
2483        let commit = deck
2484            .admin()
2485            .drop_replicas(42, vec![1, 2, 3])
2486            .await
2487            .expect("commit");
2488        assert_eq!(commit.event_kind(), "drop_replicas");
2489        let _ = runtime.shutdown().await;
2490    }
2491
2492    #[tokio::test]
2493    async fn commit_ids_increment_monotonically_per_client() {
2494        let dispatcher = Arc::new(LoggingDispatcher::new());
2495        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2496        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
2497        let a = deck.admin().cordon(42).await.unwrap();
2498        let b = deck.admin().uncordon(42).await.unwrap();
2499        assert!(b.commit_id() > a.commit_id());
2500        let _ = runtime.shutdown().await;
2501    }
2502
2503    #[tokio::test]
2504    async fn snapshot_stream_yields_a_snapshot_per_poll_interval() {
2505        use futures::StreamExt;
2506        let dispatcher = Arc::new(LoggingDispatcher::new());
2507        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2508        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
2509            DeckClientConfig {
2510                snapshot_poll_interval: Duration::from_millis(20),
2511                ..DeckClientConfig::default()
2512            },
2513        );
2514
2515        let mut stream = deck.snapshots();
2516        // First tick lands immediately (tokio::time::interval
2517        // fires on first poll); collect two ticks and assert both
2518        // are Ok.
2519        let first = stream.next().await.expect("first").expect("ok");
2520        let second = stream.next().await.expect("second").expect("ok");
2521        // Same shape — both came from the same reader.
2522        assert_eq!(first.local_maintenance, second.local_maintenance);
2523        let _ = runtime.shutdown().await;
2524    }
2525
2526    #[tokio::test]
2527    async fn snapshot_stream_observes_admin_command_aftermath() {
2528        // The interesting end-to-end shape: issue an admin
2529        // command, then read a snapshot from the stream and
2530        // confirm the loop folded the event. Mirrors what
2531        // Deck-the-binary will see.
2532        use futures::StreamExt;
2533        let dispatcher = Arc::new(LoggingDispatcher::new());
2534        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2535        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
2536            DeckClientConfig {
2537                snapshot_poll_interval: Duration::from_millis(15),
2538                ..DeckClientConfig::default()
2539            },
2540        );
2541
2542        let _ = deck.admin().enter_maintenance(42, None).await.unwrap();
2543
2544        // Skip a few stream frames so the loop's tick has folded
2545        // the admin event + published a fresh snapshot.
2546        let mut stream = deck.snapshots();
2547        let mut saw_transition = false;
2548        for _ in 0..20 {
2549            let snap = stream.next().await.expect("next").expect("ok");
2550            if !matches!(
2551                snap.local_maintenance,
2552                crate::adapter::net::behavior::meshos::MaintenanceStateSnapshot::Active
2553            ) {
2554                saw_transition = true;
2555                break;
2556            }
2557        }
2558        assert!(
2559            saw_transition,
2560            "stream should have surfaced a non-Active local_maintenance after enter_maintenance",
2561        );
2562        let _ = runtime.shutdown().await;
2563    }
2564
2565    #[tokio::test]
2566    async fn change_signal_stays_quiet_on_idle_ticks_and_fires_on_structural_change() {
2567        // E-10 core guarantee. The loop publishes (and the snapshot's
2568        // time-projected fields advance) every tick, but the change
2569        // GENERATION must bump only on a genuine structural change — so
2570        // a consumer parked purely on the signal isn't woken by cosmetic
2571        // per-tick churn. fast_config ticks ~10ms, so several ticks pass
2572        // inside each sleep window below.
2573        let dispatcher = Arc::new(LoggingDispatcher::new());
2574        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2575        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
2576
2577        let mut rx = deck.snapshot_reader.subscribe_changes();
2578        // Let the runtime settle (its initial structural publishes) then
2579        // clear the seen mark so we measure only what happens next.
2580        tokio::time::sleep(Duration::from_millis(120)).await;
2581        rx.borrow_and_update();
2582
2583        // Many idle ticks elapse. age_ms etc. advance in the stored
2584        // snapshot every tick, but nothing structural changed — the
2585        // generation must not move.
2586        tokio::time::sleep(Duration::from_millis(200)).await;
2587        assert!(
2588            !rx.has_changed().unwrap(),
2589            "change signal fired on idle ticks — per-tick time progression \
2590             must NOT count as a structural change",
2591        );
2592
2593        // A real structural change (freeze commit: freeze_until None→Some)
2594        // must bump the generation promptly.
2595        let p = deck
2596            .ice()
2597            .freeze_cluster(Duration::from_secs(15))
2598            .simulate()
2599            .await
2600            .expect("simulate");
2601        let sig = deck
2602            .identity()
2603            .sign_proposal(p.action(), p.issued_at_ms(), &p.blast_hash());
2604        p.commit(&[sig]).await.expect("commit");
2605
2606        tokio::time::timeout(Duration::from_secs(2), rx.changed())
2607            .await
2608            .expect("a structural change must fire the signal well inside the timeout")
2609            .expect("change sender alive");
2610
2611        // And once the freeze is committed, the countdown ticking down
2612        // every tick must NOT keep bumping the generation.
2613        rx.borrow_and_update();
2614        tokio::time::sleep(Duration::from_millis(200)).await;
2615        assert!(
2616            !rx.has_changed().unwrap(),
2617            "freeze countdown advancing must not bump the change generation",
2618        );
2619
2620        let _ = runtime.shutdown().await;
2621    }
2622
2623    #[tokio::test]
2624    async fn admin_commit_after_runtime_shutdown_returns_loop_closed_error() {
2625        let dispatcher = Arc::new(LoggingDispatcher::new());
2626        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2627        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
2628        let _ = runtime.shutdown().await;
2629        // The runtime's `shutdown` future drains the loop and
2630        // drops the loop side of the publish channel. The SDK's
2631        // own handle is a clone; publishing on it returns
2632        // `LoopClosed` once the loop exits.
2633        let err = deck
2634            .admin()
2635            .cordon(42)
2636            .await
2637            .expect_err("publish after shutdown should fail");
2638        assert_eq!(err.kind, "loop_closed");
2639    }
2640
2641    // Silence the unused-import warning for types we re-export but
2642    // don't construct directly in tests.
2643    #[allow(dead_code)]
2644    fn _ensure_action_types_are_in_scope() -> (MaintenanceTransition, MeshOsAction) {
2645        (
2646            MaintenanceTransition::EnteringMaintenance,
2647            MeshOsAction::CommitMaintenanceTransition {
2648                node: 0,
2649                target: MaintenanceTransition::EnteringMaintenance,
2650            },
2651        )
2652    }
2653
2654    // Note: a "commit without simulate" test would not compile —
2655    // `IceProposal` does not expose `commit`; only the
2656    // `SimulatedIceProposal` returned from `IceProposal::simulate`
2657    // does. The type-state split enforces locked decision #4 at
2658    // compile time, so no runtime simulation-required gate exists
2659    // to test.
2660
2661    #[tokio::test]
2662    async fn ice_proposal_commit_with_insufficient_signatures_fails() {
2663        let dispatcher = Arc::new(LoggingDispatcher::new());
2664        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2665        // Bump threshold above what we'll supply.
2666        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
2667            DeckClientConfig {
2668                snapshot_poll_interval: Duration::from_millis(100),
2669                ice_signature_threshold: 2,
2670            },
2671        );
2672        let proposal = deck.ice().freeze_cluster(Duration::from_secs(10));
2673        let simulated = proposal.simulate().await.expect("simulate");
2674        let sig = deck.identity().sign_proposal(
2675            simulated.action(),
2676            simulated.issued_at_ms(),
2677            &simulated.blast_hash(),
2678        );
2679        let err = simulated
2680            .commit(&[sig])
2681            .await
2682            .expect_err("under-threshold commit should fail");
2683        assert_eq!(err.kind, "insufficient_signatures");
2684        let _ = runtime.shutdown().await;
2685    }
2686
2687    #[tokio::test]
2688    async fn ice_freeze_proposal_simulate_then_commit_lands_freeze_on_loop() {
2689        let dispatcher = Arc::new(LoggingDispatcher::new());
2690        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2691        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
2692
2693        let proposal = deck.ice().freeze_cluster(Duration::from_secs(30));
2694        let simulated = proposal.simulate().await.expect("simulate");
2695        // FreezeCluster sets the drain delay to the requested TTL.
2696        assert_eq!(
2697            simulated.blast_radius().estimated_drain_delay,
2698            Some(Duration::from_secs(30))
2699        );
2700        let sig = deck.identity().sign_proposal(
2701            simulated.action(),
2702            simulated.issued_at_ms(),
2703            &simulated.blast_hash(),
2704        );
2705        let commit = simulated.commit(&[sig]).await.expect("commit");
2706        assert_eq!(commit.event_kind(), "freeze_cluster");
2707
2708        // Give the loop a tick + reconcile + publish to fold the
2709        // freeze through to the snapshot.
2710        tokio::time::sleep(Duration::from_millis(80)).await;
2711        let snap = runtime.snapshot();
2712        assert!(
2713            snap.freeze_remaining_ms.is_some(),
2714            "freeze_remaining_ms should be set after committed freeze",
2715        );
2716        let _ = runtime.shutdown().await;
2717    }
2718
2719    #[tokio::test]
2720    async fn ice_thaw_proposal_simulate_warns_no_op_when_unfrozen() {
2721        let dispatcher = Arc::new(LoggingDispatcher::new());
2722        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2723        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
2724
2725        let proposal = deck.ice().thaw_cluster();
2726        let simulated = proposal.simulate().await.expect("simulate");
2727        // Simulator on a non-frozen snapshot warns "no freeze to cancel."
2728        assert!(simulated.blast_radius().warnings.iter().any(|w| matches!(
2729            w,
2730            crate::adapter::net::behavior::meshos::BlastWarning::ThawHasNoFreezeToCancel
2731        )));
2732        let _ = runtime.shutdown().await;
2733    }
2734
2735    /// Test helper: a non-sentinel blast-radius hash to use in
2736    /// tests that don't construct a real `BlastRadius` (they're
2737    /// exercising the signature plumbing, not the simulation
2738    /// gate).
2739    const TEST_BLAST_HASH: super::super::meshos::BlastRadiusHash =
2740        [1u8; super::super::meshos::BLAST_RADIUS_HASH_LEN];
2741
2742    /// Static assertion: every public stream type returned by
2743    /// the SDK must be `Send` so callers can move them across
2744    /// `tokio::spawn` boundaries. A future internal-field swap
2745    /// to a `!Send` type silently breaks every downstream
2746    /// `spawn` consumer; pinning the property here keeps that
2747    /// regression out of CI. The ICE proposal type-state pair
2748    /// also pins `Send + Sync` for the same reason.
2749    fn _assert_proposal_send_sync_static_check() {
2750        fn _assert_send<T: Send>() {}
2751        fn _assert_send_sync<T: Send + Sync>() {}
2752        _assert_send_sync::<IceProposal<'static>>();
2753        _assert_send_sync::<SimulatedIceProposal<'static>>();
2754        _assert_send::<SnapshotStream>();
2755        _assert_send::<StatusSummaryStream>();
2756        _assert_send::<AuditStream>();
2757        _assert_send::<LogStream>();
2758        _assert_send::<FailureStream>();
2759    }
2760
2761    #[tokio::test]
2762    async fn operator_signature_carries_issuing_operator_id() {
2763        let identity = OperatorIdentity::generate();
2764        let proposal = IceActionProposal::FreezeCluster {
2765            ttl: Duration::from_secs(60),
2766        };
2767        let sig = identity.sign_proposal(
2768            &proposal,
2769            super::super::meshos::now_ms_since_unix_epoch(),
2770            &TEST_BLAST_HASH,
2771        );
2772        assert_eq!(sig.operator_id, identity.operator_id());
2773        // ed25519 signatures are 64 bytes.
2774        assert_eq!(sig.signature.len(), 64);
2775    }
2776
2777    #[tokio::test]
2778    async fn operator_registry_verifies_a_well_formed_signature() {
2779        let identity = OperatorIdentity::generate();
2780        let mut registry = OperatorRegistry::new();
2781        registry.register(identity.keypair());
2782
2783        let proposal = IceActionProposal::FreezeCluster {
2784            ttl: Duration::from_secs(60),
2785        };
2786        let ts = super::super::meshos::now_ms_since_unix_epoch();
2787        let sig = identity.sign_proposal(&proposal, ts, &TEST_BLAST_HASH);
2788        let payload = ice_proposal_signing_payload(&proposal, ts, &TEST_BLAST_HASH);
2789        registry.verify(&sig, &payload).expect("valid signature");
2790    }
2791
2792    #[tokio::test]
2793    async fn operator_registry_rejects_unknown_operator() {
2794        let registry = OperatorRegistry::new();
2795        let identity = OperatorIdentity::generate();
2796        let proposal = IceActionProposal::ThawCluster;
2797        let ts = super::super::meshos::now_ms_since_unix_epoch();
2798        let sig = identity.sign_proposal(&proposal, ts, &TEST_BLAST_HASH);
2799        let payload = ice_proposal_signing_payload(&proposal, ts, &TEST_BLAST_HASH);
2800        let err = registry
2801            .verify(&sig, &payload)
2802            .expect_err("unregistered operator should not verify");
2803        assert_eq!(err.kind(), "not_authorized");
2804    }
2805
2806    #[tokio::test]
2807    async fn operator_registry_rejects_tampered_signature_bytes() {
2808        let identity = OperatorIdentity::generate();
2809        let mut registry = OperatorRegistry::new();
2810        registry.register(identity.keypair());
2811
2812        let proposal = IceActionProposal::FreezeCluster {
2813            ttl: Duration::from_secs(10),
2814        };
2815        let ts = super::super::meshos::now_ms_since_unix_epoch();
2816        let mut sig = identity.sign_proposal(&proposal, ts, &TEST_BLAST_HASH);
2817        // Flip one byte in the signature.
2818        sig.signature[0] ^= 0x01;
2819        let payload = ice_proposal_signing_payload(&proposal, ts, &TEST_BLAST_HASH);
2820        let err = registry
2821            .verify(&sig, &payload)
2822            .expect_err("tampered signature should not verify");
2823        assert_eq!(err.kind(), "signature_invalid");
2824    }
2825
2826    #[tokio::test]
2827    async fn operator_registry_rejects_signature_for_wrong_payload() {
2828        // A signature over `FreezeCluster { 10s }` should not
2829        // verify against the payload of a different proposal.
2830        // This is the contract the substrate verifier will rely
2831        // on to reject signature reuse across proposals.
2832        let identity = OperatorIdentity::generate();
2833        let mut registry = OperatorRegistry::new();
2834        registry.register(identity.keypair());
2835
2836        let signed_proposal = IceActionProposal::FreezeCluster {
2837            ttl: Duration::from_secs(10),
2838        };
2839        let other_proposal = IceActionProposal::FreezeCluster {
2840            ttl: Duration::from_secs(60),
2841        };
2842        let ts = super::super::meshos::now_ms_since_unix_epoch();
2843        let sig = identity.sign_proposal(&signed_proposal, ts, &TEST_BLAST_HASH);
2844        let payload = ice_proposal_signing_payload(&other_proposal, ts, &TEST_BLAST_HASH);
2845        let err = registry
2846            .verify(&sig, &payload)
2847            .expect_err("cross-proposal signature should not verify");
2848        assert_eq!(err.kind(), "signature_invalid");
2849    }
2850
2851    #[tokio::test]
2852    async fn operator_registry_rejects_wrong_length_signature() {
2853        let identity = OperatorIdentity::generate();
2854        let mut registry = OperatorRegistry::new();
2855        registry.register(identity.keypair());
2856
2857        let proposal = IceActionProposal::ThawCluster;
2858        let sig = OperatorSignature {
2859            operator_id: identity.operator_id(),
2860            signature: vec![0; 32], // wrong length
2861        };
2862        let payload = ice_proposal_signing_payload(
2863            &proposal,
2864            super::super::meshos::now_ms_since_unix_epoch(),
2865            &TEST_BLAST_HASH,
2866        );
2867        let err = registry
2868            .verify(&sig, &payload)
2869            .expect_err("wrong-length signature should not verify");
2870        assert_eq!(err.kind(), "signature_invalid");
2871    }
2872
2873    #[tokio::test]
2874    async fn ice_commit_with_registry_rejects_an_unverified_signature() {
2875        // Build a two-operator bundle where one signature is
2876        // tampered. The threshold is met (2 sigs supplied) but
2877        // verification rejects the bundle.
2878        let dispatcher = Arc::new(LoggingDispatcher::new());
2879        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2880        let op_a = OperatorIdentity::generate();
2881        let op_b = OperatorIdentity::generate();
2882        let mut registry = OperatorRegistry::new();
2883        registry.register(op_a.keypair());
2884        registry.register(op_b.keypair());
2885        let deck = DeckClient::new(
2886            runtime.handle_clone(),
2887            runtime.snapshot_reader().clone(),
2888            op_a.clone(),
2889            DeckClientConfig {
2890                snapshot_poll_interval: Duration::from_millis(100),
2891                ice_signature_threshold: 2,
2892            },
2893        )
2894        .with_operator_registry(registry);
2895
2896        let proposal = deck.ice().freeze_cluster(Duration::from_secs(15));
2897        let simulated = proposal.simulate().await.expect("simulate");
2898        let sig_a = op_a.sign_proposal(
2899            simulated.action(),
2900            simulated.issued_at_ms(),
2901            &simulated.blast_hash(),
2902        );
2903        let mut sig_b = op_b.sign_proposal(
2904            simulated.action(),
2905            simulated.issued_at_ms(),
2906            &simulated.blast_hash(),
2907        );
2908        sig_b.signature[3] ^= 0xFF; // tamper
2909
2910        let err = simulated
2911            .commit(&[sig_a, sig_b])
2912            .await
2913            .expect_err("commit with tampered sig should fail");
2914        assert_eq!(err.kind, "signature_invalid");
2915        let _ = runtime.shutdown().await;
2916    }
2917
2918    #[tokio::test]
2919    async fn ice_flush_avoid_lists_proposal_simulate_and_commit_round_trips() {
2920        use super::super::meshos::AvoidScope;
2921        let dispatcher = Arc::new(LoggingDispatcher::new());
2922        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2923        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
2924        let proposal = deck.ice().flush_avoid_lists(AvoidScope::OnPeer { peer: 5 });
2925        let simulated = proposal.simulate().await.expect("simulate");
2926        // OnPeer scope flushes everywhere; without registered
2927        // peers in the snapshot the affected_nodes list is
2928        // empty but the warning still fires.
2929        assert!(simulated.blast_radius().warnings.iter().any(|w| matches!(
2930            w,
2931            crate::adapter::net::behavior::meshos::BlastWarning::AvoidFlushRecoversPeer { peer: 5 }
2932        )));
2933        // commit through the unsigned path (no registry installed).
2934        let sig = deck.identity().sign_proposal(
2935            simulated.action(),
2936            simulated.issued_at_ms(),
2937            &simulated.blast_hash(),
2938        );
2939        let commit = simulated.commit(&[sig]).await.expect("commit");
2940        assert_eq!(commit.event_kind(), "flush_avoid_lists");
2941        let _ = runtime.shutdown().await;
2942    }
2943
2944    #[tokio::test]
2945    async fn status_summary_stream_emits_initial_summary_immediately() {
2946        use futures::StreamExt;
2947        let dispatcher = Arc::new(LoggingDispatcher::new());
2948        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2949        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
2950            DeckClientConfig {
2951                snapshot_poll_interval: Duration::from_millis(10),
2952                ..DeckClientConfig::default()
2953            },
2954        );
2955        let mut stream = deck.status_summary_stream();
2956        let first = tokio::time::timeout(Duration::from_secs(2), stream.next())
2957            .await
2958            .expect("first timed out")
2959            .expect("first closed")
2960            .expect("first ok");
2961        // Steady-state idle cluster — every count is zero.
2962        assert!(first.freeze_remaining_ms.is_none());
2963        assert!(!first.local_maintenance_active);
2964        let _ = runtime.shutdown().await;
2965    }
2966
2967    #[tokio::test]
2968    async fn status_summary_stream_dedups_unchanged_summaries() {
2969        use futures::StreamExt;
2970        let dispatcher = Arc::new(LoggingDispatcher::new());
2971        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2972        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
2973            DeckClientConfig {
2974                snapshot_poll_interval: Duration::from_millis(10),
2975                ..DeckClientConfig::default()
2976            },
2977        );
2978        let mut stream = deck.status_summary_stream();
2979        // First emission lands.
2980        let _ = tokio::time::timeout(Duration::from_secs(2), stream.next())
2981            .await
2982            .expect("first")
2983            .expect("closed")
2984            .expect("ok");
2985        // No state change — the stream should park (dedup).
2986        let second = tokio::time::timeout(Duration::from_millis(80), stream.next()).await;
2987        assert!(
2988            second.is_err(),
2989            "stream should not re-emit unchanged summary"
2990        );
2991        let _ = runtime.shutdown().await;
2992    }
2993
2994    #[tokio::test]
2995    async fn status_summary_stream_re_emits_on_freeze_state_change() {
2996        use futures::StreamExt;
2997        let dispatcher = Arc::new(LoggingDispatcher::new());
2998        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
2999        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3000            DeckClientConfig {
3001                snapshot_poll_interval: Duration::from_millis(10),
3002                ..DeckClientConfig::default()
3003            },
3004        );
3005        let mut stream = deck.status_summary_stream();
3006        let first = tokio::time::timeout(Duration::from_secs(2), stream.next())
3007            .await
3008            .expect("first")
3009            .expect("closed")
3010            .expect("ok");
3011        assert!(first.freeze_remaining_ms.is_none());
3012
3013        // Freeze the cluster — the next polled summary will
3014        // differ (`freeze_remaining_ms` flips to `Some`), so
3015        // the stream re-emits + the audit ring depth bumps.
3016        let p = deck
3017            .ice()
3018            .freeze_cluster(Duration::from_secs(30))
3019            .simulate()
3020            .await
3021            .expect("simulate");
3022        let sig = deck
3023            .identity()
3024            .sign_proposal(p.action(), p.issued_at_ms(), &p.blast_hash());
3025        p.commit(&[sig]).await.expect("freeze");
3026        let after_freeze = tokio::time::timeout(Duration::from_secs(2), stream.next())
3027            .await
3028            .expect("after_freeze timed out")
3029            .expect("after_freeze closed")
3030            .expect("after_freeze ok");
3031        assert!(after_freeze.freeze_remaining_ms.is_some());
3032        assert!(after_freeze.admin_audit_ring_depth >= 1);
3033        let _ = runtime.shutdown().await;
3034    }
3035
3036    #[tokio::test]
3037    async fn status_summary_reflects_steady_state_idle_cluster() {
3038        let dispatcher = Arc::new(LoggingDispatcher::new());
3039        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3040        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3041        let summary = deck.status_summary();
3042        assert_eq!(summary.peers, PeerCounts::default());
3043        assert_eq!(summary.daemons, DaemonCounts::default());
3044        assert_eq!(summary.replica_chains, 0);
3045        assert_eq!(summary.recently_emitted_count, 0);
3046        assert!(summary.freeze_remaining_ms.is_none());
3047        assert!(!summary.local_maintenance_active);
3048        let _ = runtime.shutdown().await;
3049    }
3050
3051    #[tokio::test]
3052    async fn status_summary_flags_freeze_after_freeze_cluster_commit() {
3053        let dispatcher = Arc::new(LoggingDispatcher::new());
3054        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3055        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3056        let p = deck
3057            .ice()
3058            .freeze_cluster(Duration::from_secs(30))
3059            .simulate()
3060            .await
3061            .expect("simulate");
3062        let sig = deck
3063            .identity()
3064            .sign_proposal(p.action(), p.issued_at_ms(), &p.blast_hash());
3065        p.commit(&[sig]).await.expect("freeze");
3066        tokio::time::sleep(Duration::from_millis(60)).await;
3067        let summary = deck.status_summary();
3068        assert!(summary.freeze_remaining_ms.is_some());
3069        // Audit ring should have at least one entry now.
3070        assert!(summary.admin_audit_ring_depth >= 1);
3071        let _ = runtime.shutdown().await;
3072    }
3073
3074    #[tokio::test]
3075    async fn status_summary_flags_local_maintenance_after_enter_maintenance() {
3076        let dispatcher = Arc::new(LoggingDispatcher::new());
3077        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3078        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3079        // `fast_config` pins this_node = 42; target the same.
3080        deck.admin()
3081            .enter_maintenance(42, None)
3082            .await
3083            .expect("commit");
3084        tokio::time::sleep(Duration::from_millis(60)).await;
3085        let summary = deck.status_summary();
3086        assert!(
3087            summary.local_maintenance_active,
3088            "local_maintenance_active should flip on after enter_maintenance",
3089        );
3090        let _ = runtime.shutdown().await;
3091    }
3092
3093    #[tokio::test]
3094    async fn subscribe_failures_yields_seeded_dispatcher_rejection() {
3095        use crate::adapter::net::behavior::meshos::DispatchError;
3096        use futures::StreamExt;
3097        let dispatcher = Arc::new(LoggingDispatcher::new());
3098        dispatcher.fail_next(DispatchError::drop("first"));
3099        let runtime = MeshOsRuntime::start(fast_config(), Arc::clone(&dispatcher));
3100        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3101            DeckClientConfig {
3102                snapshot_poll_interval: Duration::from_millis(15),
3103                ..DeckClientConfig::default()
3104            },
3105        );
3106        let mut stream = deck.subscribe_failures(0);
3107
3108        deck.admin().enter_maintenance(42, None).await.unwrap();
3109
3110        let record = tokio::time::timeout(Duration::from_secs(2), stream.next())
3111            .await
3112            .expect("timed out")
3113            .expect("closed")
3114            .expect("ok");
3115        // The executor stamps strictly-positive seqs; chain-
3116        // replay-derived records carry seq=0.
3117        assert!(record.seq > 0);
3118        assert!(record.reason.contains("first"));
3119        let _ = runtime.shutdown().await;
3120    }
3121
3122    #[tokio::test]
3123    async fn subscribe_failures_since_seq_drops_already_seen() {
3124        use crate::adapter::net::behavior::meshos::DispatchError;
3125        use futures::StreamExt;
3126        let dispatcher = Arc::new(LoggingDispatcher::new());
3127        dispatcher.fail_next(DispatchError::drop("first"));
3128        let runtime = MeshOsRuntime::start(fast_config(), Arc::clone(&dispatcher));
3129        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3130            DeckClientConfig {
3131                snapshot_poll_interval: Duration::from_millis(15),
3132                ..DeckClientConfig::default()
3133            },
3134        );
3135
3136        deck.admin().enter_maintenance(42, None).await.unwrap();
3137        // Wait for the first failure to land on the ring.
3138        let deadline = std::time::Instant::now() + Duration::from_secs(2);
3139        let mut seq_seen = 0u64;
3140        while std::time::Instant::now() < deadline {
3141            let all = deck.recent_failures();
3142            if let Some(r) = all.last() {
3143                seq_seen = r.seq;
3144                break;
3145            }
3146            tokio::time::sleep(Duration::from_millis(20)).await;
3147        }
3148        assert!(seq_seen > 0);
3149
3150        // Subscribe with the seq already-seen as the
3151        // watermark; no new failures => stream parks.
3152        let mut stream = deck.subscribe_failures(seq_seen);
3153        let parked = tokio::time::timeout(Duration::from_millis(60), stream.next()).await;
3154        assert!(parked.is_err(), "no new failures means parked stream");
3155        let _ = runtime.shutdown().await;
3156    }
3157
3158    #[tokio::test]
3159    async fn recent_failures_surfaces_dispatcher_rejections() {
3160        use crate::adapter::net::behavior::meshos::DispatchError;
3161        let dispatcher = Arc::new(LoggingDispatcher::new());
3162        dispatcher.fail_next(DispatchError::drop("synthetic rejection"));
3163        let runtime = MeshOsRuntime::start(fast_config(), Arc::clone(&dispatcher));
3164        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3165
3166        // Drive an admin event whose dispatched action will be
3167        // rejected — reconcile emits `CommitMaintenanceTransition`
3168        // for an empty-workload enter_maintenance.
3169        deck.admin()
3170            .enter_maintenance(42, None)
3171            .await
3172            .expect("commit");
3173
3174        // Poll up to 2s for the failure to land on the ring.
3175        let deadline = std::time::Instant::now() + Duration::from_secs(2);
3176        let mut got: Vec<crate::adapter::net::behavior::meshos::FailureRecord> = Vec::new();
3177        while std::time::Instant::now() < deadline {
3178            got = deck.recent_failures();
3179            if !got.is_empty() {
3180                break;
3181            }
3182            tokio::time::sleep(Duration::from_millis(20)).await;
3183        }
3184        assert!(
3185            !got.is_empty(),
3186            "recent_failures should reflect the seeded dispatcher rejection",
3187        );
3188        assert!(got[0].reason.contains("synthetic rejection"));
3189        let _ = runtime.shutdown().await;
3190    }
3191
3192    #[tokio::test]
3193    async fn recent_failures_since_drops_records_at_or_below_cutoff() {
3194        use crate::adapter::net::behavior::meshos::DispatchError;
3195        let dispatcher = Arc::new(LoggingDispatcher::new());
3196        dispatcher.fail_next(DispatchError::drop("first failure"));
3197        let runtime = MeshOsRuntime::start(fast_config(), Arc::clone(&dispatcher));
3198        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3199
3200        deck.admin()
3201            .enter_maintenance(42, None)
3202            .await
3203            .expect("commit");
3204        let deadline = std::time::Instant::now() + Duration::from_secs(2);
3205        let mut all: Vec<crate::adapter::net::behavior::meshos::FailureRecord> = Vec::new();
3206        while std::time::Instant::now() < deadline {
3207            all = deck.recent_failures();
3208            if !all.is_empty() {
3209                break;
3210            }
3211            tokio::time::sleep(Duration::from_millis(20)).await;
3212        }
3213        assert!(!all.is_empty(), "seed failure should land");
3214
3215        // Set the watermark to the existing record's ms — the
3216        // since filter uses `>` so the same record is dropped.
3217        let cutoff = all[0].recorded_at_ms;
3218        let after = deck.recent_failures_since(cutoff);
3219        assert!(
3220            after.iter().all(|r| r.recorded_at_ms > cutoff),
3221            "since filter should drop records at the cutoff",
3222        );
3223        // The seed record itself shouldn't appear (its ms ==
3224        // cutoff).
3225        assert!(after.iter().all(|r| r.reason != "first failure"));
3226        let _ = runtime.shutdown().await;
3227    }
3228
3229    #[tokio::test]
3230    async fn per_field_accessors_match_full_snapshot_contents() {
3231        let dispatcher = Arc::new(LoggingDispatcher::new());
3232        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3233        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3234
3235        let snap = deck.status();
3236        assert_eq!(deck.peers(), snap.peers);
3237        assert_eq!(deck.daemons(), snap.daemons);
3238        assert_eq!(deck.replicas(), snap.replicas);
3239        assert_eq!(deck.local_maintenance(), snap.local_maintenance);
3240        assert_eq!(deck.freeze_remaining_ms(), snap.freeze_remaining_ms);
3241        let _ = runtime.shutdown().await;
3242    }
3243
3244    #[tokio::test]
3245    async fn status_returns_freshest_snapshot_synchronously() {
3246        let dispatcher = Arc::new(LoggingDispatcher::new());
3247        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3248        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3249
3250        // Default state — local maintenance is Active.
3251        let s = deck.status();
3252        assert!(matches!(
3253            s.local_maintenance,
3254            crate::adapter::net::behavior::meshos::MaintenanceStateSnapshot::Active
3255        ));
3256
3257        // Issue a freeze; subsequent status() should see the
3258        // freeze once the loop folds.
3259        let p = deck
3260            .ice()
3261            .freeze_cluster(Duration::from_secs(20))
3262            .simulate()
3263            .await
3264            .expect("simulate");
3265        let sig = deck
3266            .identity()
3267            .sign_proposal(p.action(), p.issued_at_ms(), &p.blast_hash());
3268        p.commit(&[sig]).await.expect("commit");
3269        tokio::time::sleep(Duration::from_millis(60)).await;
3270        let s = deck.status();
3271        assert!(s.freeze_remaining_ms.is_some());
3272        let _ = runtime.shutdown().await;
3273    }
3274
3275    #[tokio::test]
3276    async fn watch_resolves_immediately_when_predicate_already_true() {
3277        let dispatcher = Arc::new(LoggingDispatcher::new());
3278        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3279        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3280        // Default state matches "no freeze in effect" — should
3281        // resolve immediately.
3282        let snap = tokio::time::timeout(
3283            Duration::from_millis(50),
3284            deck.watch(|s| s.freeze_remaining_ms.is_none()),
3285        )
3286        .await
3287        .expect("watch should not block when predicate already holds");
3288        assert!(snap.freeze_remaining_ms.is_none());
3289        let _ = runtime.shutdown().await;
3290    }
3291
3292    #[tokio::test]
3293    async fn watch_resolves_when_predicate_becomes_true_after_admin_commit() {
3294        let dispatcher = Arc::new(LoggingDispatcher::new());
3295        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3296        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3297            DeckClientConfig {
3298                snapshot_poll_interval: Duration::from_millis(10),
3299                ..DeckClientConfig::default()
3300            },
3301        );
3302
3303        // Spawn a watcher that waits for a freeze.
3304        let deck_handle = deck.snapshot_reader.clone();
3305        let watcher = {
3306            let identity = deck.identity().clone();
3307            let config = deck.config.clone();
3308            let handle = deck.handle.clone();
3309            // Build a sibling client that shares the same
3310            // snapshot reader — `DeckClient` isn't `Clone` and
3311            // a real consumer would `Arc::clone` the outer
3312            // handle, but spawning shows the watch is non-
3313            // blocking on the SDK level.
3314            let client = DeckClient::new(handle, deck_handle.clone(), identity, config);
3315            tokio::spawn(async move { client.watch(|s| s.freeze_remaining_ms.is_some()).await })
3316        };
3317
3318        // Wait a beat so the watcher is in its sleep loop.
3319        tokio::time::sleep(Duration::from_millis(40)).await;
3320        let p = deck
3321            .ice()
3322            .freeze_cluster(Duration::from_secs(15))
3323            .simulate()
3324            .await
3325            .expect("simulate");
3326        let sig = deck
3327            .identity()
3328            .sign_proposal(p.action(), p.issued_at_ms(), &p.blast_hash());
3329        p.commit(&[sig]).await.expect("commit");
3330
3331        let snap = tokio::time::timeout(Duration::from_secs(2), watcher)
3332            .await
3333            .expect("watcher should resolve")
3334            .expect("join");
3335        assert!(snap.freeze_remaining_ms.is_some());
3336        let _ = runtime.shutdown().await;
3337    }
3338
3339    #[tokio::test]
3340    async fn watch_is_event_driven_resolving_far_under_the_poll_ceiling() {
3341        // E-9: arm a deliberately long 30s poll-interval ceiling. If the
3342        // watch still resolves in well under a second after the commit,
3343        // it can only have woken on the loop's snapshot-publish signal —
3344        // a regression to interval-polling would wait ~30s and trip the
3345        // inner 2s timeout.
3346        let dispatcher = Arc::new(LoggingDispatcher::new());
3347        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3348        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3349            DeckClientConfig {
3350                snapshot_poll_interval: Duration::from_secs(30),
3351                ..DeckClientConfig::default()
3352            },
3353        );
3354
3355        let watcher = {
3356            let client = DeckClient::new(
3357                deck.handle.clone(),
3358                deck.snapshot_reader.clone(),
3359                deck.identity().clone(),
3360                deck.config.clone(),
3361            );
3362            tokio::spawn(async move { client.watch(|s| s.freeze_remaining_ms.is_some()).await })
3363        };
3364
3365        tokio::time::sleep(Duration::from_millis(40)).await;
3366        let started = std::time::Instant::now();
3367        let p = deck
3368            .ice()
3369            .freeze_cluster(Duration::from_secs(15))
3370            .simulate()
3371            .await
3372            .expect("simulate");
3373        let sig = deck
3374            .identity()
3375            .sign_proposal(p.action(), p.issued_at_ms(), &p.blast_hash());
3376        p.commit(&[sig]).await.expect("commit");
3377
3378        let snap = tokio::time::timeout(Duration::from_secs(2), watcher)
3379            .await
3380            .expect("watch must resolve far inside the 30s ceiling")
3381            .expect("join");
3382        assert!(snap.freeze_remaining_ms.is_some());
3383        assert!(
3384            started.elapsed() < Duration::from_secs(5),
3385            "watch took {:?}, expected ≪ 30s ceiling — not event-driven",
3386            started.elapsed(),
3387        );
3388        let _ = runtime.shutdown().await;
3389    }
3390
3391    #[tokio::test]
3392    async fn watch_timeout_returns_watch_timeout_error_when_predicate_never_holds() {
3393        let dispatcher = Arc::new(LoggingDispatcher::new());
3394        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3395        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3396            DeckClientConfig {
3397                snapshot_poll_interval: Duration::from_millis(10),
3398                ..DeckClientConfig::default()
3399            },
3400        );
3401
3402        let err = deck
3403            .watch_timeout(
3404                |s| s.freeze_remaining_ms.is_some(),
3405                Duration::from_millis(80),
3406            )
3407            .await
3408            .expect_err("predicate never holds, should time out");
3409        assert_eq!(err.kind, "watch_timeout");
3410        let _ = runtime.shutdown().await;
3411    }
3412
3413    #[tokio::test]
3414    async fn audit_since_filter_drops_records_at_or_below_watermark() {
3415        let dispatcher = Arc::new(LoggingDispatcher::new());
3416        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3417        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3418
3419        deck.admin().cordon(42).await.unwrap();
3420        deck.admin().uncordon(42).await.unwrap();
3421        deck.admin().invalidate_placement(42).await.unwrap();
3422        tokio::time::sleep(Duration::from_millis(80)).await;
3423
3424        let all = deck.audit().collect();
3425        assert_eq!(all.len(), 3);
3426        // Pick the middle record's seq as the watermark; only
3427        // the newest record (seq strictly greater) should
3428        // surface.
3429        let middle_seq = all[1].seq;
3430        let after_middle = deck.audit().since(middle_seq).collect();
3431        assert_eq!(after_middle.len(), 1, "since should keep only seq > middle");
3432        assert!(after_middle[0].seq > middle_seq);
3433        let _ = runtime.shutdown().await;
3434    }
3435
3436    #[tokio::test]
3437    async fn audit_stream_since_seeds_initial_watermark() {
3438        use futures::StreamExt;
3439        let dispatcher = Arc::new(LoggingDispatcher::new());
3440        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3441        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3442            DeckClientConfig {
3443                snapshot_poll_interval: Duration::from_millis(15),
3444                ..DeckClientConfig::default()
3445            },
3446        );
3447
3448        // Land three commits before subscribing.
3449        deck.admin().cordon(42).await.unwrap();
3450        deck.admin().uncordon(42).await.unwrap();
3451        deck.admin().invalidate_placement(42).await.unwrap();
3452        tokio::time::sleep(Duration::from_millis(80)).await;
3453
3454        let all = deck.audit().collect();
3455        assert_eq!(all.len(), 3);
3456        // Resume from the middle entry's seq. Stream should
3457        // yield only the newest entry (seq strictly above
3458        // middle) then park.
3459        let middle_seq = all[1].seq;
3460        let mut stream = deck.audit().since(middle_seq).stream();
3461        let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
3462            .await
3463            .expect("timed out")
3464            .expect("closed")
3465            .expect("ok");
3466        assert!(next.seq > middle_seq);
3467        // No more records — stream parks.
3468        let parked = tokio::time::timeout(Duration::from_millis(40), stream.next()).await;
3469        assert!(
3470            parked.is_err(),
3471            "stream should park after watermark catches up"
3472        );
3473        let _ = runtime.shutdown().await;
3474    }
3475
3476    #[tokio::test]
3477    async fn log_filter_since_seeds_stream_watermark() {
3478        use crate::adapter::net::behavior::meshos::{LogLine, MeshOsEvent};
3479        use futures::StreamExt;
3480        let dispatcher = Arc::new(LoggingDispatcher::new());
3481        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3482        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3483            DeckClientConfig {
3484                snapshot_poll_interval: Duration::from_millis(15),
3485                ..DeckClientConfig::default()
3486            },
3487        );
3488
3489        // Publish three lines and snapshot the middle seq.
3490        for i in 0..3 {
3491            runtime
3492                .handle()
3493                .publish(MeshOsEvent::LogLine(LogLine::info(
3494                    None,
3495                    format!("msg {i}"),
3496                )))
3497                .await
3498                .unwrap();
3499        }
3500        tokio::time::sleep(Duration::from_millis(80)).await;
3501
3502        let snap = runtime.snapshot();
3503        assert_eq!(snap.log_ring.len(), 3);
3504        let middle_seq = snap.log_ring[1].seq;
3505
3506        // Subscribe with since() seeded to the middle seq.
3507        // Stream should yield only the third line (seq > middle).
3508        let mut stream = deck.subscribe_logs(LogFilter::new().since(middle_seq));
3509        let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
3510            .await
3511            .expect("timed out")
3512            .expect("closed")
3513            .expect("ok");
3514        assert!(next.seq > middle_seq);
3515        assert_eq!(next.message, "msg 2");
3516        let _ = runtime.shutdown().await;
3517    }
3518
3519    #[tokio::test]
3520    async fn subscribe_logs_yields_published_log_lines_in_seq_order() {
3521        use crate::adapter::net::behavior::meshos::{LogLevel, LogLine, MeshOsEvent};
3522        use futures::StreamExt;
3523        let dispatcher = Arc::new(LoggingDispatcher::new());
3524        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3525        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3526            DeckClientConfig {
3527                snapshot_poll_interval: Duration::from_millis(15),
3528                ..DeckClientConfig::default()
3529            },
3530        );
3531
3532        let mut stream = deck.subscribe_logs(LogFilter::new());
3533        for (i, level) in [LogLevel::Info, LogLevel::Warn, LogLevel::Error]
3534            .into_iter()
3535            .enumerate()
3536        {
3537            runtime
3538                .handle()
3539                .publish(MeshOsEvent::LogLine(LogLine {
3540                    level,
3541                    daemon_id: Some(7),
3542                    message: format!("msg {}", i),
3543                }))
3544                .await
3545                .unwrap();
3546        }
3547
3548        let r1 = tokio::time::timeout(Duration::from_secs(2), stream.next())
3549            .await
3550            .expect("r1 timed out")
3551            .expect("r1 closed")
3552            .expect("r1 ok");
3553        let r2 = tokio::time::timeout(Duration::from_secs(2), stream.next())
3554            .await
3555            .expect("r2 timed out")
3556            .expect("r2 closed")
3557            .expect("r2 ok");
3558        let r3 = tokio::time::timeout(Duration::from_secs(2), stream.next())
3559            .await
3560            .expect("r3 timed out")
3561            .expect("r3 closed")
3562            .expect("r3 ok");
3563        assert!(r1.seq < r2.seq);
3564        assert!(r2.seq < r3.seq);
3565        assert_eq!(r1.level, LogLevel::Info);
3566        assert_eq!(r3.level, LogLevel::Error);
3567        // The loop stamps `node_id = Some(this_node)` on every
3568        // locally-published line.
3569        assert_eq!(r1.node_id, Some(42));
3570        let _ = runtime.shutdown().await;
3571    }
3572
3573    #[tokio::test]
3574    async fn subscribe_logs_min_level_filter_drops_below_threshold() {
3575        use crate::adapter::net::behavior::meshos::{LogLevel, LogLine, MeshOsEvent};
3576        use futures::StreamExt;
3577        let dispatcher = Arc::new(LoggingDispatcher::new());
3578        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3579        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3580            DeckClientConfig {
3581                snapshot_poll_interval: Duration::from_millis(15),
3582                ..DeckClientConfig::default()
3583            },
3584        );
3585
3586        let mut stream = deck.subscribe_logs(LogFilter::new().min_level(LogLevel::Warn));
3587        runtime
3588            .handle()
3589            .publish(MeshOsEvent::LogLine(LogLine::info(None, "info dropped")))
3590            .await
3591            .unwrap();
3592        runtime
3593            .handle()
3594            .publish(MeshOsEvent::LogLine(LogLine::warn(None, "warn kept")))
3595            .await
3596            .unwrap();
3597
3598        let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
3599            .await
3600            .expect("next timed out")
3601            .expect("next closed")
3602            .expect("next ok");
3603        assert_eq!(next.level, LogLevel::Warn);
3604        assert_eq!(next.message, "warn kept");
3605        let _ = runtime.shutdown().await;
3606    }
3607
3608    #[tokio::test]
3609    async fn subscribe_logs_with_daemon_filter_keeps_only_matching_daemon() {
3610        use crate::adapter::net::behavior::meshos::{LogLine, MeshOsEvent};
3611        use futures::StreamExt;
3612        let dispatcher = Arc::new(LoggingDispatcher::new());
3613        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3614        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3615            DeckClientConfig {
3616                snapshot_poll_interval: Duration::from_millis(15),
3617                ..DeckClientConfig::default()
3618            },
3619        );
3620
3621        let mut stream = deck.subscribe_logs(LogFilter::new().with_daemon(7));
3622        runtime
3623            .handle()
3624            .publish(MeshOsEvent::LogLine(LogLine::info(
3625                Some(99),
3626                "other daemon",
3627            )))
3628            .await
3629            .unwrap();
3630        runtime
3631            .handle()
3632            .publish(MeshOsEvent::LogLine(LogLine::info(
3633                Some(7),
3634                "target daemon",
3635            )))
3636            .await
3637            .unwrap();
3638
3639        let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
3640            .await
3641            .expect("next timed out")
3642            .expect("next closed")
3643            .expect("next ok");
3644        assert_eq!(next.daemon_id, Some(7));
3645        assert_eq!(next.message, "target daemon");
3646        let _ = runtime.shutdown().await;
3647    }
3648
3649    #[tokio::test]
3650    async fn audit_stream_emits_one_record_per_signed_commit_in_order() {
3651        use futures::StreamExt;
3652        let dispatcher = Arc::new(LoggingDispatcher::new());
3653        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3654        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3655            DeckClientConfig {
3656                snapshot_poll_interval: Duration::from_millis(15),
3657                ..DeckClientConfig::default()
3658            },
3659        );
3660
3661        let mut stream = deck.audit().stream();
3662        // No commits yet — the stream should yield nothing for
3663        // at least one tick.
3664        let first_attempt = tokio::time::timeout(Duration::from_millis(40), stream.next()).await;
3665        assert!(first_attempt.is_err(), "stream should park when no records");
3666
3667        // Issue three commits; the stream should yield three
3668        // records in seq order.
3669        deck.admin().cordon(42).await.unwrap();
3670        deck.admin().uncordon(42).await.unwrap();
3671        deck.admin().invalidate_placement(42).await.unwrap();
3672
3673        let r1 = tokio::time::timeout(Duration::from_secs(2), stream.next())
3674            .await
3675            .expect("r1 timed out")
3676            .expect("r1 closed")
3677            .expect("r1 ok");
3678        let r2 = tokio::time::timeout(Duration::from_secs(2), stream.next())
3679            .await
3680            .expect("r2 timed out")
3681            .expect("r2 closed")
3682            .expect("r2 ok");
3683        let r3 = tokio::time::timeout(Duration::from_secs(2), stream.next())
3684            .await
3685            .expect("r3 timed out")
3686            .expect("r3 closed")
3687            .expect("r3 ok");
3688
3689        // Stream emits in seq order (substrate guarantees seq
3690        // strictly increases).
3691        assert!(r1.seq < r2.seq);
3692        assert!(r2.seq < r3.seq);
3693        let _ = runtime.shutdown().await;
3694    }
3695
3696    #[tokio::test]
3697    async fn audit_stream_dedups_already_seen_records_across_polls() {
3698        // Two consecutive polls on the same snapshot must NOT
3699        // re-emit records. The seq-based watermark guarantees
3700        // this even when the snapshot ring carries records
3701        // older than the stream's watermark.
3702        use futures::StreamExt;
3703        let dispatcher = Arc::new(LoggingDispatcher::new());
3704        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3705        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3706            DeckClientConfig {
3707                snapshot_poll_interval: Duration::from_millis(10),
3708                ..DeckClientConfig::default()
3709            },
3710        );
3711
3712        deck.admin().cordon(42).await.unwrap();
3713        let mut stream = deck.audit().stream();
3714        let first = tokio::time::timeout(Duration::from_secs(2), stream.next())
3715            .await
3716            .expect("first timed out")
3717            .expect("first closed")
3718            .expect("first ok");
3719
3720        // No new commits — the stream should park (not
3721        // re-emit `first`).
3722        let second_attempt = tokio::time::timeout(Duration::from_millis(50), stream.next()).await;
3723        assert!(
3724            second_attempt.is_err(),
3725            "stream should not re-emit seen record"
3726        );
3727
3728        // Issue another commit; only the new one shows up.
3729        deck.admin().uncordon(42).await.unwrap();
3730        let second = tokio::time::timeout(Duration::from_secs(2), stream.next())
3731            .await
3732            .expect("second timed out")
3733            .expect("second closed")
3734            .expect("second ok");
3735        assert!(second.seq > first.seq);
3736        let _ = runtime.shutdown().await;
3737    }
3738
3739    #[tokio::test]
3740    async fn audit_stream_applies_force_only_filter_in_tail_mode() {
3741        // Mix ordinary (Cordon) and ICE (ThawCluster). With
3742        // force_only(), the stream yields only the ICE entry.
3743        use futures::StreamExt;
3744        let dispatcher = Arc::new(LoggingDispatcher::new());
3745        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3746        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate()).with_config(
3747            DeckClientConfig {
3748                snapshot_poll_interval: Duration::from_millis(10),
3749                ..DeckClientConfig::default()
3750            },
3751        );
3752
3753        let mut stream = deck.audit().force_only().stream();
3754        deck.admin().cordon(42).await.unwrap();
3755        let thaw = deck
3756            .ice()
3757            .thaw_cluster()
3758            .simulate()
3759            .await
3760            .expect("simulate");
3761        let sig =
3762            deck.identity()
3763                .sign_proposal(thaw.action(), thaw.issued_at_ms(), &thaw.blast_hash());
3764        thaw.commit(&[sig]).await.unwrap();
3765
3766        let next = tokio::time::timeout(Duration::from_secs(2), stream.next())
3767            .await
3768            .expect("next timed out")
3769            .expect("next closed")
3770            .expect("next ok");
3771        // Only the ThawCluster (ICE) should pass the filter.
3772        assert!(next.event.is_ice());
3773        let _ = runtime.shutdown().await;
3774    }
3775
3776    #[tokio::test]
3777    async fn audit_query_returns_empty_when_no_ice_commits_observed() {
3778        let dispatcher = Arc::new(LoggingDispatcher::new());
3779        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3780        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3781        let results = deck.audit().recent(10).collect();
3782        assert!(results.is_empty());
3783        let _ = runtime.shutdown().await;
3784    }
3785
3786    #[tokio::test]
3787    async fn audit_query_returns_recent_entries_newest_first() {
3788        // Two unsigned commits — they reach the loop via the
3789        // unsigned admin path because no registry is installed.
3790        // The audit ring only records `SignedIceCommit` events,
3791        // so unsigned commits don't appear. We instead publish
3792        // `SignedIceCommit` directly (no verifier installed, so
3793        // outcome = Unverified — but the ring records every
3794        // attempt regardless).
3795        use crate::adapter::net::behavior::meshos::{IceActionProposal, MeshOsEvent};
3796        let dispatcher = Arc::new(LoggingDispatcher::new());
3797        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3798        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3799
3800        for ttl_secs in [10, 20, 30] {
3801            runtime
3802                .handle()
3803                .publish(MeshOsEvent::SignedIceCommit {
3804                    proposal: IceActionProposal::FreezeCluster {
3805                        ttl: Duration::from_secs(ttl_secs),
3806                    },
3807                    signatures: Vec::new(),
3808                    issued_at_ms: super::super::meshos::now_ms_since_unix_epoch(),
3809                    blast_hash: TEST_BLAST_HASH,
3810                })
3811                .await
3812                .unwrap();
3813        }
3814        tokio::time::sleep(Duration::from_millis(80)).await;
3815        let all = deck.audit().collect();
3816        assert_eq!(all.len(), 3, "ring should hold all three entries");
3817        // Newest-first ordering: the 30s freeze is the last
3818        // commit submitted, so it should be the first result.
3819        assert!(matches!(
3820            all[0].event,
3821            AdminEvent::FreezeCluster { ttl } if ttl == Duration::from_secs(30)
3822        ));
3823        assert!(matches!(
3824            all[2].event,
3825            AdminEvent::FreezeCluster { ttl } if ttl == Duration::from_secs(10)
3826        ));
3827
3828        let recent_one = deck.audit().recent(1).collect();
3829        assert_eq!(recent_one.len(), 1);
3830        assert!(matches!(
3831            recent_one[0].event,
3832            AdminEvent::FreezeCluster { ttl } if ttl == Duration::from_secs(30)
3833        ));
3834        let _ = runtime.shutdown().await;
3835    }
3836
3837    #[tokio::test]
3838    async fn audit_query_filters_by_operator_id() {
3839        use crate::adapter::net::behavior::meshos::{IceActionProposal, MeshOsEvent};
3840        let dispatcher = Arc::new(LoggingDispatcher::new());
3841        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3842        let op_a = OperatorIdentity::generate();
3843        let op_b = OperatorIdentity::generate();
3844        let deck = DeckClient::from_runtime(&runtime, op_a.clone());
3845
3846        // Commit from op_a.
3847        let proposal_a = IceActionProposal::FreezeCluster {
3848            ttl: Duration::from_secs(10),
3849        };
3850        let ts_a = super::super::meshos::now_ms_since_unix_epoch();
3851        let sig_a = OperatorSignature::sign(op_a.keypair(), &proposal_a, ts_a, &TEST_BLAST_HASH);
3852        runtime
3853            .handle()
3854            .publish(MeshOsEvent::SignedIceCommit {
3855                proposal: proposal_a,
3856                signatures: vec![sig_a],
3857                issued_at_ms: ts_a,
3858                blast_hash: TEST_BLAST_HASH,
3859            })
3860            .await
3861            .unwrap();
3862        // Commit from op_b.
3863        let proposal_b = IceActionProposal::ThawCluster;
3864        let ts_b = super::super::meshos::now_ms_since_unix_epoch();
3865        let sig_b = OperatorSignature::sign(op_b.keypair(), &proposal_b, ts_b, &TEST_BLAST_HASH);
3866        runtime
3867            .handle()
3868            .publish(MeshOsEvent::SignedIceCommit {
3869                proposal: proposal_b,
3870                signatures: vec![sig_b],
3871                issued_at_ms: ts_b,
3872                blast_hash: TEST_BLAST_HASH,
3873            })
3874            .await
3875            .unwrap();
3876        tokio::time::sleep(Duration::from_millis(80)).await;
3877
3878        let filtered = deck.audit().by_operator(op_a.operator_id()).collect();
3879        assert_eq!(filtered.len(), 1);
3880        assert!(matches!(
3881            filtered[0].event,
3882            AdminEvent::FreezeCluster { .. }
3883        ));
3884        let _ = runtime.shutdown().await;
3885    }
3886
3887    #[tokio::test]
3888    async fn audit_query_force_only_drops_ordinary_admin_keeps_ice() {
3889        // Mix ordinary admin (Cordon) with ICE (ThawCluster);
3890        // force_only() should keep only the ICE entry.
3891        let dispatcher = Arc::new(LoggingDispatcher::new());
3892        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3893        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3894
3895        deck.admin().cordon(42).await.expect("cordon");
3896        let thaw = deck
3897            .ice()
3898            .thaw_cluster()
3899            .simulate()
3900            .await
3901            .expect("simulate");
3902        let sig =
3903            deck.identity()
3904                .sign_proposal(thaw.action(), thaw.issued_at_ms(), &thaw.blast_hash());
3905        thaw.commit(&[sig]).await.expect("thaw");
3906        tokio::time::sleep(Duration::from_millis(80)).await;
3907
3908        let baseline = deck.audit().collect();
3909        assert_eq!(
3910            baseline.len(),
3911            2,
3912            "ring should hold both ordinary and ICE commits"
3913        );
3914        let force_only = deck.audit().force_only().collect();
3915        assert_eq!(force_only.len(), 1, "force_only should drop Cordon");
3916        assert!(force_only[0].event.is_ice());
3917        let _ = runtime.shutdown().await;
3918    }
3919
3920    #[tokio::test]
3921    async fn admin_commit_routes_through_signed_path_when_registry_installed() {
3922        // With an OperatorRegistry installed, the SDK's
3923        // AdminCommands signs every admin event and routes via
3924        // SignedAdminCommit; the substrate verifier accepts +
3925        // the audit ring shows the operator + Accepted outcome.
3926        use std::sync::Arc as SArc;
3927        let dispatcher = Arc::new(LoggingDispatcher::new());
3928        let identity = OperatorIdentity::generate();
3929        let mut registry = OperatorRegistry::new();
3930        registry.register(identity.keypair());
3931        let verifier = SArc::new(crate::adapter::net::behavior::meshos::AdminVerifier::new(
3932            SArc::new(registry.clone()),
3933            1,
3934        ));
3935        let runtime = MeshOsRuntime::start_with_all(
3936            fast_config(),
3937            dispatcher,
3938            Default::default(),
3939            Default::default(),
3940            SArc::new(crate::adapter::net::compute::DaemonRegistry::new()),
3941            None,
3942            Some(verifier),
3943        );
3944        let deck =
3945            DeckClient::from_runtime(&runtime, identity.clone()).with_operator_registry(registry);
3946
3947        let commit = deck.admin().cordon(42).await.expect("commit");
3948        assert_eq!(commit.event_kind(), "cordon");
3949
3950        // Audit ring should show the commit with Accepted
3951        // outcome + the issuing operator id.
3952        tokio::time::sleep(Duration::from_millis(80)).await;
3953        let entries = deck.audit().collect();
3954        assert_eq!(entries.len(), 1);
3955        assert!(matches!(
3956            entries[0].outcome,
3957            crate::adapter::net::behavior::meshos::VerificationOutcome::Accepted
3958        ));
3959        assert_eq!(entries[0].operator_ids, vec![identity.operator_id()]);
3960        let _ = runtime.shutdown().await;
3961    }
3962
3963    #[tokio::test]
3964    async fn admin_commit_falls_back_to_unsigned_when_no_registry_installed() {
3965        // Without a registry the SDK routes through the
3966        // legacy unsigned admin path. Audit ring still records
3967        // the commit but with Unverified outcome.
3968        let dispatcher = Arc::new(LoggingDispatcher::new());
3969        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3970        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3971
3972        deck.admin().cordon(42).await.expect("commit");
3973        tokio::time::sleep(Duration::from_millis(80)).await;
3974
3975        let entries = deck.audit().collect();
3976        assert_eq!(entries.len(), 1);
3977        assert!(matches!(
3978            entries[0].outcome,
3979            crate::adapter::net::behavior::meshos::VerificationOutcome::Unverified
3980        ));
3981        assert!(entries[0].operator_ids.is_empty());
3982        let _ = runtime.shutdown().await;
3983    }
3984
3985    #[tokio::test]
3986    async fn audit_ring_records_unsigned_admin_with_unverified_outcome() {
3987        // Locked decision #2: every admin event the loop sees
3988        // is on the audit ring. Unsigned ones surface as
3989        // Unverified.
3990        let dispatcher = Arc::new(LoggingDispatcher::new());
3991        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
3992        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
3993
3994        deck.admin().cordon(42).await.expect("cordon");
3995        deck.admin()
3996            .drop_replicas(42, vec![1, 2])
3997            .await
3998            .expect("drop_replicas");
3999        tokio::time::sleep(Duration::from_millis(80)).await;
4000
4001        let entries = deck.audit().collect();
4002        assert_eq!(entries.len(), 2);
4003        for entry in &entries {
4004            assert!(matches!(
4005                entry.outcome,
4006                crate::adapter::net::behavior::meshos::VerificationOutcome::Unverified
4007            ));
4008            assert!(entry.operator_ids.is_empty());
4009        }
4010        let _ = runtime.shutdown().await;
4011    }
4012
4013    #[tokio::test]
4014    async fn audit_query_between_filters_outside_window() {
4015        use crate::adapter::net::behavior::meshos::{IceActionProposal, MeshOsEvent};
4016        let dispatcher = Arc::new(LoggingDispatcher::new());
4017        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
4018        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
4019
4020        runtime
4021            .handle()
4022            .publish(MeshOsEvent::SignedIceCommit {
4023                proposal: IceActionProposal::ThawCluster,
4024                signatures: Vec::new(),
4025                issued_at_ms: super::super::meshos::now_ms_since_unix_epoch(),
4026                blast_hash: TEST_BLAST_HASH,
4027            })
4028            .await
4029            .unwrap();
4030        tokio::time::sleep(Duration::from_millis(80)).await;
4031
4032        // A window entirely in the past — no entries should
4033        // match.
4034        let past_only = deck.audit().between(0, 1).collect();
4035        assert!(past_only.is_empty());
4036
4037        // A window covering "now" — should match.
4038        let now_ms = std::time::SystemTime::now()
4039            .duration_since(std::time::UNIX_EPOCH)
4040            .unwrap()
4041            .as_millis() as u64;
4042        let around_now = deck
4043            .audit()
4044            .between(now_ms - 10_000, now_ms + 10_000)
4045            .collect();
4046        assert_eq!(around_now.len(), 1);
4047        let _ = runtime.shutdown().await;
4048    }
4049
4050    #[tokio::test]
4051    async fn ice_force_restart_daemon_proposal_round_trips() {
4052        let dispatcher = Arc::new(LoggingDispatcher::new());
4053        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
4054        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
4055        let daemon = super::super::meshos::DaemonRef {
4056            id: 7,
4057            name: "telemetry".into(),
4058        };
4059        let proposal = deck.ice().force_restart_daemon(daemon.clone());
4060        let simulated = proposal.simulate().await.expect("simulate");
4061        assert_eq!(simulated.blast_radius().affected_daemons, vec![daemon]);
4062        let sig = deck.identity().sign_proposal(
4063            simulated.action(),
4064            simulated.issued_at_ms(),
4065            &simulated.blast_hash(),
4066        );
4067        let commit = simulated.commit(&[sig]).await.expect("commit");
4068        assert_eq!(commit.event_kind(), "force_restart_daemon");
4069        let _ = runtime.shutdown().await;
4070    }
4071
4072    #[tokio::test]
4073    async fn ice_kill_migration_proposal_round_trips_and_audits() {
4074        let dispatcher = Arc::new(LoggingDispatcher::new());
4075        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
4076        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
4077        let proposal = deck.ice().kill_migration(123);
4078        let simulated = proposal.simulate().await.expect("simulate");
4079        let sig = deck.identity().sign_proposal(
4080            simulated.action(),
4081            simulated.issued_at_ms(),
4082            &simulated.blast_hash(),
4083        );
4084        let commit = simulated.commit(&[sig]).await.expect("commit");
4085        assert_eq!(commit.event_kind(), "kill_migration");
4086
4087        // Confirms the commit lands on the audit ring even
4088        // though the dispatcher integration is pending.
4089        tokio::time::sleep(Duration::from_millis(60)).await;
4090        let entries = deck.audit().force_only().collect();
4091        assert!(entries
4092            .iter()
4093            .any(|r| matches!(r.event, AdminEvent::KillMigration { migration: 123 })));
4094        let _ = runtime.shutdown().await;
4095    }
4096
4097    #[tokio::test]
4098    async fn ice_force_cutover_proposal_round_trips() {
4099        let dispatcher = Arc::new(LoggingDispatcher::new());
4100        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
4101        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
4102        let proposal = deck.ice().force_cutover(100, 42);
4103        let simulated = proposal.simulate().await.expect("simulate");
4104        assert_eq!(simulated.blast_radius().affected_replicas, vec![100]);
4105        assert_eq!(simulated.blast_radius().affected_nodes, vec![42]);
4106        let sig = deck.identity().sign_proposal(
4107            simulated.action(),
4108            simulated.issued_at_ms(),
4109            &simulated.blast_hash(),
4110        );
4111        let commit = simulated.commit(&[sig]).await.expect("commit");
4112        assert_eq!(commit.event_kind(), "force_cutover");
4113        let _ = runtime.shutdown().await;
4114    }
4115
4116    #[tokio::test]
4117    async fn ice_force_evict_replica_proposal_round_trips() {
4118        let dispatcher = Arc::new(LoggingDispatcher::new());
4119        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
4120        let deck = DeckClient::from_runtime(&runtime, OperatorIdentity::generate());
4121        let proposal = deck.ice().force_evict_replica(100, 7);
4122        let simulated = proposal.simulate().await.expect("simulate");
4123        assert_eq!(simulated.blast_radius().affected_replicas, vec![100]);
4124        assert_eq!(simulated.blast_radius().affected_nodes, vec![7]);
4125        let sig = deck.identity().sign_proposal(
4126            simulated.action(),
4127            simulated.issued_at_ms(),
4128            &simulated.blast_hash(),
4129        );
4130        let commit = simulated.commit(&[sig]).await.expect("commit");
4131        assert_eq!(commit.event_kind(), "force_evict_replica");
4132        let _ = runtime.shutdown().await;
4133    }
4134
4135    #[tokio::test]
4136    async fn ice_commit_with_registry_accepts_a_valid_multi_op_bundle() {
4137        let dispatcher = Arc::new(LoggingDispatcher::new());
4138        let runtime = MeshOsRuntime::start(fast_config(), dispatcher);
4139        let op_a = OperatorIdentity::generate();
4140        let op_b = OperatorIdentity::generate();
4141        let mut registry = OperatorRegistry::new();
4142        registry.register(op_a.keypair());
4143        registry.register(op_b.keypair());
4144        let deck = DeckClient::new(
4145            runtime.handle_clone(),
4146            runtime.snapshot_reader().clone(),
4147            op_a.clone(),
4148            DeckClientConfig {
4149                snapshot_poll_interval: Duration::from_millis(100),
4150                ice_signature_threshold: 2,
4151            },
4152        )
4153        .with_operator_registry(registry);
4154
4155        let proposal = deck.ice().freeze_cluster(Duration::from_secs(15));
4156        let simulated = proposal.simulate().await.expect("simulate");
4157        let sig_a = op_a.sign_proposal(
4158            simulated.action(),
4159            simulated.issued_at_ms(),
4160            &simulated.blast_hash(),
4161        );
4162        let sig_b = op_b.sign_proposal(
4163            simulated.action(),
4164            simulated.issued_at_ms(),
4165            &simulated.blast_hash(),
4166        );
4167        let commit = simulated
4168            .commit(&[sig_a, sig_b])
4169            .await
4170            .expect("valid multi-op bundle should commit");
4171        assert_eq!(commit.event_kind(), "freeze_cluster");
4172        let _ = runtime.shutdown().await;
4173    }
4174}