Skip to main content

net/adapter/net/behavior/meshos/
event_loop.rs

1//! [`MeshOsLoop`] — the canonical event loop. Locked decision #1:
2//! one stream → one reconcile → consistent actions. Locked
3//! decision #4: reconcile emits, the action executor drains.
4//!
5//! Phase A wires the plumbing. The loop owns an mpsc receiver
6//! that consumes [`super::event::MeshOsEvent`]s from arbitrary
7//! sources, folds each event into [`super::state::MeshOsState`]
8//! (and routes desired-state input into
9//! [`super::state::DesiredState`]), runs
10//! [`super::reconcile::reconcile`] at most once per
11//! [`super::event::MeshOsEvent::Tick`], and pushes any emitted
12//! actions through an mpsc sender that the action executor will
13//! drain. Phase A's reconcile is a no-op so the executor sees an
14//! empty queue under steady state.
15//!
16//! Sources fan-in via converters owned by their subsystems —
17//! none ship in Phase A. Tests drive events directly through the
18//! source channel to exercise the ordering contract.
19
20use std::collections::VecDeque;
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::sync::Arc;
23
24use arc_swap::ArcSwap;
25use parking_lot::RwLock;
26use tokio::sync::mpsc;
27use tokio::time::{interval_at, Instant as TokioInstant, MissedTickBehavior};
28
29use super::action::{AllocateActionId, PendingAction};
30use super::config::MeshOsConfig;
31use super::control::{ControlSink, MeshOsControl};
32use super::event::MeshOsEvent;
33use super::maintenance::MaintenanceState;
34use super::probes::{HealthProbe, LocalityProbe};
35use super::reconcile::reconcile;
36use super::scheduler::SchedulerRegistry;
37use super::snapshot::{FailureRecord, MeshOsSnapshot};
38use super::state::{DesiredState, MeshOsState};
39
40/// Per-node MeshOS instance. Owns the actual + desired state
41/// folds, the event-source channel, and the action-executor
42/// channel. Cloneable handles (`MeshOsHandle`) hand out
43/// `mpsc::Sender<MeshOsEvent>` clones for sources to publish on;
44/// `MeshOsLoop::run` is the long-lived task.
45pub struct MeshOsLoop {
46    config: Arc<MeshOsConfig>,
47
48    /// Inbound event stream. The loop owns the receiver; sender
49    /// clones live on every [`MeshOsHandle`]. When the last
50    /// handle drops, `recv()` returns `None` and the loop exits.
51    events_rx: mpsc::Receiver<MeshOsEvent>,
52
53    /// Outbound action queue. The action executor task drains
54    /// this; Phase A drains and drops (no real subsystem
55    /// dispatch yet).
56    actions_tx: mpsc::Sender<PendingAction>,
57
58    /// Action id allocator.
59    action_ids: AllocateActionId,
60
61    /// Folded substrate state.
62    actual: MeshOsState,
63    /// Folded desired state (placement intent + future
64    /// daemon-intent feeds).
65    desired: DesiredState,
66
67    /// Most recent post-reconcile snapshot. Updated on every
68    /// Tick after the reconcile pass; readable through
69    /// [`MeshOsSnapshotReader::read`] from any other task /
70    /// thread. `ArcSwap` keeps the publish path one atomic
71    /// pointer store and the read path one atomic load + Arc
72    /// clone — no lock contention with reconcile.
73    snapshot: Arc<ArcSwap<MeshOsSnapshot>>,
74
75    /// Ring of the most-recently-emitted actions. The snapshot's
76    /// `pending` field renders this as "what reconcile recently
77    /// produced." Bounded by `action_queue_capacity`; older
78    /// entries drop FIFO when the cap is exceeded. The executor
79    /// does NOT signal completion back, so this is *not* a true
80    /// in-flight list — drained / completed actions stay in the
81    /// ring until they age out.
82    recent_emissions: Vec<PendingAction>,
83
84    /// Pull-via-tick probes the loop polls on each Tick. Shared
85    /// with the [`ProbeRegistry`] so consumers can install
86    /// probes after `MeshOsLoop::new` (the runtime in particular
87    /// spawns the loop immediately and attaches probes via the
88    /// registry).
89    probes: ProbeRegistryInner,
90
91    /// Phase D-1 scheduler registry — single-slot pluggable
92    /// [`super::scheduler::PlacementScorer`]. Shared via Arc;
93    /// install via [`SchedulerRegistry::install`] before or
94    /// after `run()`.
95    scheduler: SchedulerRegistry,
96
97    /// X-13 recovery registry — pluggable per-tick recovery
98    /// handlers for groups whose slots were marked unhealthy
99    /// after a placement failure. The tick handler calls
100    /// `try_run_all` after `poll_probes` so the very next
101    /// reconcile pass sees the recovered slots. Empty by default;
102    /// SDK consumers register handlers via
103    /// `MeshOsRuntime::recovery_registry()`.
104    recovery_registry: crate::adapter::net::compute::RecoveryRegistry,
105
106    /// Reconcile-pass counter — used by tests / diagnostics to
107    /// confirm reconcile fired exactly once per Tick.
108    reconcile_count: u64,
109
110    /// Monotonic counter the loop stamps onto every
111    /// `AdminAuditRecord` it emits. Strictly increasing across
112    /// the runtime's lifetime; the Deck SDK's audit-tail
113    /// stream depends on this for dedup across snapshot polls.
114    admin_audit_seq: u64,
115
116    /// Monotonic counter the loop stamps onto every
117    /// `LogRecord` it emits. Same pattern as
118    /// `admin_audit_seq`; the Deck SDK's log-tail stream
119    /// dedups against this.
120    log_seq: u64,
121    /// Boot-time identifier this loop stamps on every
122    /// published snapshot via
123    /// [`MeshOsSnapshot::runtime_epoch_id`]. SDK consumers
124    /// dedup'ing via `seq` values pair each watermark with this
125    /// value — when the snapshot's epoch flips, they reset.
126    runtime_epoch_id: u64,
127    /// Ring buffer of admin-commit outcomes — every admin
128    /// commit the loop observes (signed ICE bundle or unsigned
129    /// `MeshOsEvent::AdminEvent(...)`) lands here, regardless
130    /// of whether a verifier accepted, rejected, or skipped
131    /// it. Bounded to
132    /// [`super::ice::DEFAULT_MAX_ADMIN_AUDIT_RECORDS`]; the
133    /// loop drops the oldest entry FIFO when the cap is
134    /// exceeded. Lives on the loop rather than `MeshOsState`
135    /// because it's an append-only output buffer, not fold
136    /// state — `state.rs` had explicit dead arms for
137    /// `SignedIceCommit` / `SignedAdminCommit` precisely
138    /// because they don't fold.
139    admin_audit_ring: VecDeque<super::ice::AdminAuditRecord>,
140    /// Ring buffer of log records — every
141    /// `MeshOsEvent::LogLine` daemons or source converters
142    /// publish lands here. Bounded to
143    /// [`super::logs::DEFAULT_MAX_LOG_RING_RECORDS`]; older
144    /// entries drop FIFO. Lives on the loop rather than
145    /// `MeshOsState` because log lines don't fold.
146    log_ring: VecDeque<super::logs::LogRecord>,
147
148    /// Actions reconcile emitted that the action-queue
149    /// `try_send` rejected because the executor was at
150    /// `action_queue_capacity`. Cloneable counter — the runtime
151    /// surfaces it through [`MeshOsRuntime::executor_stats`] for
152    /// operator visibility.
153    dropped_actions: Arc<AtomicU64>,
154    /// Shared reference to the executor's recent-failures ring.
155    /// The executor task writes; the loop reads on every
156    /// `publish_snapshot` and copies the records into the
157    /// snapshot's `recent_failures` field. Optional so a loop
158    /// constructed without an executor (e.g. unit tests) still
159    /// publishes (with an empty ring).
160    executor_failures: Option<Arc<RwLock<VecDeque<FailureRecord>>>>,
161    /// Shared failure-seq counter — same counter the executor
162    /// uses. The loop stamps its own runtime-side failures
163    /// (e.g. migration-abort dispatcher errors) with this so
164    /// SDK consumers' dedup gate sees a single monotonic
165    /// sequence across both writers.
166    executor_failure_seq: Option<Arc<AtomicU64>>,
167    /// Shared failure-chain appender — same instance the
168    /// executor uses. The loop dual-writes runtime-side
169    /// failures to the durable chain via this so chain
170    /// history covers loop-side failures too.
171    executor_failure_appender: Option<Arc<dyn super::failure_chain::FailureChainAppender>>,
172    /// Optional control-event sink. When attached, the loop
173    /// translates this-node `local_maintenance` transitions and
174    /// future backpressure flips into [`MeshOsControl`] events
175    /// and forwards them through the sink. The SDK installs one
176    /// that fans events out to per-daemon channels via its
177    /// router; substrate-internal uses can ignore.
178    control_sink: Option<Arc<dyn ControlSink>>,
179    /// Discriminant of the most recent `local_maintenance` value
180    /// the loop observed. Updated on every `apply()` call so
181    /// state transitions caused by either an admin event or a
182    /// `MaintenanceTransitionObserved` (chain replay) surface
183    /// uniformly.
184    last_local_maintenance: MaintenanceDiscriminant,
185    /// Optional ICE-commit verifier. When installed, every
186    /// `MeshOsEvent::SignedIceCommit` is run through this gate
187    /// before the inner `AdminEvent` folds into state. When
188    /// absent, signed commits fold their inner event without
189    /// verification — useful for in-process tests where the
190    /// SDK side already gates. The substrate slice that ships
191    /// the operator-policy chain wires a registry in by default.
192    admin_verifier: Option<Arc<super::ice::AdminVerifier>>,
193
194    /// Admin audit chain appender. Production deployments
195    /// wire a `TypedRedexFile<AdminAuditRecord>` here so
196    /// security review can replay every admin commit across
197    /// the cluster's lifetime. Default is
198    /// `NoOpAdminAuditChainAppender` — the in-memory ring on
199    /// `MeshOsState.admin_audit` is the only readable surface
200    /// when no chain is wired.
201    admin_audit_appender: Arc<dyn super::audit_chain::AdminAuditChainAppender>,
202
203    /// Log chain appender. Production deployments wire a
204    /// `TypedRedexFile<LogRecord>` here so the per-daemon log
205    /// view extends to cluster-lifetime replay. Default is
206    /// `NoOpLogChainAppender` — only the in-memory ring on
207    /// `MeshOsState.log_ring` is observable when no chain is
208    /// wired.
209    log_appender: Arc<dyn super::log_chain::LogChainAppender>,
210
211    /// Migration-abort dispatcher. The loop calls this after
212    /// folding a verified
213    /// [`super::event::AdminEvent::KillMigration`]. Production
214    /// deployments wire the
215    /// [`super::migration_aborter::OrchestratorMigrationAborter`]
216    /// adapter so the local `MigrationOrchestrator` actually
217    /// aborts in-flight migrations; tests + bootstrap use the
218    /// `NoOpMigrationAborter` default and the commit remains
219    /// audit-only.
220    migration_aborter: Arc<dyn super::migration_aborter::MigrationAborter>,
221
222    /// Migration-snapshot source. The loop calls this on every
223    /// snapshot publish and embeds the result in the
224    /// snapshot's `in_flight_migrations` field, so the ICE
225    /// simulator can enumerate which daemon a `KillMigration`
226    /// target would affect. Production deployments wire the
227    /// [`super::migration_snapshot_source::OrchestratorMigrationSnapshotSource`]
228    /// adapter; bootstrap + tests use the no-op default and
229    /// the snapshot's `in_flight_migrations` reads empty.
230    migration_snapshot_source: Arc<dyn super::migration_snapshot_source::MigrationSnapshotSource>,
231}
232
233/// Cheap-to-compare snapshot of [`MaintenanceState`]'s variant
234/// (no embedded `Instant` or `String`). Used by the loop to
235/// detect transitions without holding the full state value.
236#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
237enum MaintenanceDiscriminant {
238    #[default]
239    Active,
240    EnteringMaintenance,
241    Maintenance,
242    ExitingMaintenance,
243    DrainFailed,
244    Recovery,
245}
246
247impl MaintenanceDiscriminant {
248    fn from_state(state: &MaintenanceState) -> Self {
249        match state {
250            MaintenanceState::Active => Self::Active,
251            MaintenanceState::EnteringMaintenance { .. } => Self::EnteringMaintenance,
252            MaintenanceState::Maintenance { .. } => Self::Maintenance,
253            MaintenanceState::ExitingMaintenance { .. } => Self::ExitingMaintenance,
254            MaintenanceState::DrainFailed { .. } => Self::DrainFailed,
255            MaintenanceState::Recovery { .. } => Self::Recovery,
256        }
257    }
258}
259
260/// Inner shared cell — both probe lists behind one
261/// `Arc<RwLock>` so the runtime + loop see the same set AND
262/// [`ProbeRegistry::probe_counts`] is an atomic snapshot. The
263/// install path is rare; the per-tick poll already clones the
264/// lists out under the read lock, so the single-lock pattern
265/// costs nothing on the hot path.
266#[derive(Default)]
267struct ProbeListsInner {
268    locality: Vec<Arc<dyn LocalityProbe>>,
269    health: Vec<Arc<dyn HealthProbe>>,
270    inventory: Vec<Arc<dyn super::probes::InventoryProbe>>,
271}
272
273#[derive(Clone, Default)]
274struct ProbeRegistryInner {
275    lists: Arc<parking_lot::RwLock<ProbeListsInner>>,
276}
277
278/// External handle for attaching probes to the loop after
279/// `MeshOsLoop::new` has consumed the loop (e.g. the runtime
280/// spawned it as a tokio task). Clone-shared with the loop.
281#[derive(Clone, Default)]
282pub struct ProbeRegistry {
283    inner: ProbeRegistryInner,
284}
285
286impl ProbeRegistry {
287    /// Construct an empty registry. The loop reads through this
288    /// instance after [`MeshOsLoop::with_probe_registry`]
289    /// installs it.
290    pub fn new() -> Self {
291        Self::default()
292    }
293
294    /// Install a [`LocalityProbe`]. Probes are polled by the
295    /// loop in registration order, once per Tick.
296    pub fn add_locality_probe(&self, probe: Arc<dyn LocalityProbe>) {
297        self.inner.lists.write().locality.push(probe);
298    }
299
300    /// Install a [`HealthProbe`]. Probes are polled by the loop
301    /// in registration order, once per Tick.
302    pub fn add_health_probe(&self, probe: Arc<dyn HealthProbe>) {
303        self.inner.lists.write().health.push(probe);
304    }
305
306    /// Install an [`super::probes::InventoryProbe`]. Probes are
307    /// polled by the loop in registration order, once per Tick;
308    /// partial samples merge into per-peer state (later probes
309    /// overwrite earlier on the same peer + axis).
310    pub fn add_inventory_probe(&self, probe: Arc<dyn super::probes::InventoryProbe>) {
311        self.inner.lists.write().inventory.push(probe);
312    }
313
314    /// Atomic snapshot of installed probe counts —
315    /// `(locality, health, inventory)`. One read lock covers
316    /// all three lists, so the triple is consistent even if a
317    /// concurrent installer fires between them.
318    pub fn probe_counts(&self) -> (usize, usize, usize) {
319        let guard = self.inner.lists.read();
320        (
321            guard.locality.len(),
322            guard.health.len(),
323            guard.inventory.len(),
324        )
325    }
326
327    /// Drop every installed [`LocalityProbe`]. Long-running
328    /// runtimes that swap probe sources (test harnesses,
329    /// hot-config reloaders) would otherwise accumulate dead
330    /// probes that keep firing every Tick; this lets a caller
331    /// detach the old set before installing replacements.
332    pub fn clear_locality_probes(&self) {
333        self.inner.lists.write().locality.clear();
334    }
335
336    /// Drop every installed [`HealthProbe`]. Same rationale as
337    /// [`Self::clear_locality_probes`].
338    pub fn clear_health_probes(&self) {
339        self.inner.lists.write().health.clear();
340    }
341
342    /// Drop every installed [`super::probes::InventoryProbe`].
343    /// Same rationale as [`Self::clear_locality_probes`]. Last-
344    /// writer-wins per peer means a stale probe left installed
345    /// can stomp a live replacement's samples, so callers
346    /// swapping inventory sources should clear first.
347    pub fn clear_inventory_probes(&self) {
348        self.inner.lists.write().inventory.clear();
349    }
350}
351
352/// Read-only handle to the loop's most recent snapshot.
353/// Construction returns one of these from
354/// [`MeshOsLoop::new`]; Deck / Phase F integrations clone the
355/// handle (cheap — one Arc clone) and call
356/// [`MeshOsSnapshotReader::read`] to sample the current view
357/// without entering the loop's event stream.
358#[derive(Clone, Debug)]
359pub struct MeshOsSnapshotReader {
360    snapshot: Arc<ArcSwap<MeshOsSnapshot>>,
361}
362
363impl MeshOsSnapshotReader {
364    /// Sample the most recent post-reconcile snapshot. One
365    /// atomic load + one `Arc` clone — no lock acquisition, so
366    /// reads cannot stall the loop's publish path.
367    pub fn read(&self) -> MeshOsSnapshot {
368        (**self.snapshot.load()).clone()
369    }
370
371    /// Borrow the latest snapshot through an `Arc`. Avoids the
372    /// per-call deep clone when the caller only needs a few
373    /// fields. The returned guard pins the snapshot until
374    /// dropped — keep the borrow short.
375    pub fn load(&self) -> arc_swap::Guard<Arc<MeshOsSnapshot>> {
376        self.snapshot.load()
377    }
378}
379
380/// Cloneable handle for publishing events into the loop. Cheap
381/// to clone (just clones the `mpsc::Sender`). Drop the last
382/// handle to signal end-of-events; the loop will exit after
383/// draining its current backlog.
384#[derive(Clone, Debug)]
385pub struct MeshOsHandle {
386    events: mpsc::Sender<MeshOsEvent>,
387}
388
389impl MeshOsHandle {
390    /// Publish an event into the loop's stream. Async — backs
391    /// pressure when the source channel is at
392    /// `event_queue_capacity`. Sources that need a fire-and-
393    /// forget path can `try_send` directly on the underlying
394    /// sender via `into_sender`.
395    ///
396    /// **Note on wedge risk:** if the loop is stuck (probe
397    /// holding a sync lock, dispatcher saturated, etc.) this
398    /// future never resolves. Long-lived sources that must
399    /// remain responsive should prefer
400    /// [`Self::publish_timeout`] or [`Self::try_publish`].
401    pub async fn publish(&self, event: MeshOsEvent) -> Result<(), MeshOsHandleError> {
402        self.events
403            .send(event)
404            .await
405            .map_err(|_| MeshOsHandleError::LoopClosed)
406    }
407
408    /// Publish an event with a bounded wait. Returns
409    /// [`MeshOsHandleError::QueueFull`] if the source channel
410    /// stayed at capacity for the entire `timeout` window.
411    /// Sources that can't afford to park indefinitely on a
412    /// wedged loop should call this instead of [`Self::publish`].
413    pub async fn publish_timeout(
414        &self,
415        event: MeshOsEvent,
416        timeout: std::time::Duration,
417    ) -> Result<(), MeshOsHandleError> {
418        match tokio::time::timeout(timeout, self.events.send(event)).await {
419            Ok(Ok(())) => Ok(()),
420            Ok(Err(_)) => Err(MeshOsHandleError::LoopClosed),
421            Err(_) => Err(MeshOsHandleError::QueueFull),
422        }
423    }
424
425    /// Try to publish without awaiting. Returns
426    /// `MeshOsHandleError::QueueFull` when the source channel is
427    /// at capacity.
428    pub fn try_publish(&self, event: MeshOsEvent) -> Result<(), MeshOsHandleError> {
429        self.events.try_send(event).map_err(|e| match e {
430            mpsc::error::TrySendError::Closed(_) => MeshOsHandleError::LoopClosed,
431            mpsc::error::TrySendError::Full(_) => MeshOsHandleError::QueueFull,
432        })
433    }
434
435    /// Hand out the underlying sender for sources that need to
436    /// manage their own backpressure / batching.
437    pub fn into_sender(self) -> mpsc::Sender<MeshOsEvent> {
438        self.events
439    }
440}
441
442/// Surface-side errors from [`MeshOsHandle::publish`] /
443/// [`MeshOsHandle::try_publish`]. The loop is conservative —
444/// callers decide whether to retry, drop, or apply their own
445/// backpressure.
446#[derive(Debug, Eq, PartialEq)]
447#[non_exhaustive]
448pub enum MeshOsHandleError {
449    /// The loop has exited; further publishes will be dropped.
450    LoopClosed,
451    /// The source channel is at `event_queue_capacity`. The
452    /// caller picks: back off + retry, drop the event, or apply
453    /// source-side backpressure.
454    QueueFull,
455}
456
457/// The result of [`MeshOsLoop::new`]. Held together so callers
458/// destructure the pieces they need; future fields (metrics,
459/// chain handles) can be added without breaking the
460/// constructor signature.
461pub struct MeshOsLoopParts {
462    /// The loop itself — consume by spawning [`MeshOsLoop::run`].
463    pub mesh_loop: MeshOsLoop,
464    /// Publish handle; clone for each source converter.
465    pub handle: MeshOsHandle,
466    /// Action-queue receiver, fed into [`super::executor::ActionExecutor::new`].
467    pub actions_rx: mpsc::Receiver<PendingAction>,
468    /// Snapshot reader; clone for each Deck / consumer.
469    pub reader: MeshOsSnapshotReader,
470}
471
472impl MeshOsLoop {
473    /// Construct a loop bound to the given config. Returns the
474    /// loop + its publish handle + the action-queue receiver +
475    /// the snapshot reader, bundled in [`MeshOsLoopParts`] so
476    /// future additions don't break the constructor signature.
477    #[allow(clippy::new_ret_no_self)]
478    pub fn new(config: MeshOsConfig) -> MeshOsLoopParts {
479        let config = Arc::new(config);
480        let (events_tx, events_rx) = mpsc::channel(config.event_queue_capacity);
481        let (actions_tx, actions_rx) = mpsc::channel(config.action_queue_capacity);
482        let handle = MeshOsHandle { events: events_tx };
483        // Stamp a unique runtime epoch id at construction.
484        // 64-bit random per-runtime stamp. Pre-fix this was
485        // `SystemTime::now().as_nanos() ^ static_counter.fetch_add(1)`,
486        // but the static counter resets to 1 each process start —
487        // two processes booting in the same nanosecond (CI parallel,
488        // VM resume) XOR identical `(epoch, counter)` and produced
489        // identical runtime_epoch_ids. The SDK's watermark-reset
490        // gate (snapshot's `runtime_epoch_id` vs last-seen) was then
491        // defeated: post-restart admin_audit_seq / log_seq /
492        // failure_seq start back at 1 and pass the consumer's dedup
493        // gate as "already seen," silently filtering valid post-
494        // restart audit records. A `getrandom::fill` u64 has a
495        // 2⁻⁶⁴ collision probability across all process restarts
496        // in the fleet. The fallback path on a getrandom failure
497        // preserves the prior (epoch ^ counter) shape so the SDK
498        // gate still gets a non-zero stamp under the (extremely
499        // rare) getrandom failure mode rather than panicking
500        // through the substrate's loop construction.
501        let runtime_epoch_id: u64 = {
502            let mut buf = [0u8; 8];
503            if getrandom::fill(&mut buf).is_ok() {
504                u64::from_le_bytes(buf)
505            } else {
506                // Fallback only on `getrandom` failure (vanishingly
507                // rare: seccomp-jailed environments without the
508                // syscall). Mix the process ID into the (epoch ^
509                // counter) shape so two same-nanosecond boots in
510                // a CI parallel matrix still produce distinct
511                // ids — the pre-fix shape collided here exactly.
512                // `std::process::id()` is a u32 on every supported
513                // platform; rotated into the upper bits so it
514                // doesn't trivially XOR-cancel against the counter.
515                static RUNTIME_EPOCH_COUNTER: AtomicU64 = AtomicU64::new(1);
516                let nanos = std::time::SystemTime::now()
517                    .duration_since(std::time::UNIX_EPOCH)
518                    .map(|d| d.as_nanos() as u64)
519                    .unwrap_or(0);
520                let pid = (std::process::id() as u64).rotate_left(32);
521                nanos ^ RUNTIME_EPOCH_COUNTER.fetch_add(1, Ordering::SeqCst) ^ pid
522            }
523        };
524        let initial_snapshot = MeshOsSnapshot {
525            runtime_epoch_id,
526            ..Default::default()
527        };
528        let snapshot = Arc::new(ArcSwap::from_pointee(initial_snapshot));
529        let reader = MeshOsSnapshotReader {
530            snapshot: Arc::clone(&snapshot),
531        };
532        let me = Self {
533            config,
534            events_rx,
535            actions_tx,
536            action_ids: AllocateActionId::new(),
537            actual: MeshOsState::default(),
538            desired: DesiredState::default(),
539            snapshot,
540            recent_emissions: Vec::new(),
541            probes: ProbeRegistryInner::default(),
542            scheduler: SchedulerRegistry::new(),
543            recovery_registry: crate::adapter::net::compute::RecoveryRegistry::new(),
544            reconcile_count: 0,
545            admin_audit_seq: 0,
546            log_seq: 0,
547            runtime_epoch_id,
548            admin_audit_ring: VecDeque::with_capacity(super::ice::DEFAULT_MAX_ADMIN_AUDIT_RECORDS),
549            log_ring: VecDeque::with_capacity(super::logs::DEFAULT_MAX_LOG_RING_RECORDS),
550            dropped_actions: Arc::new(AtomicU64::new(0)),
551            executor_failures: None,
552            executor_failure_seq: None,
553            executor_failure_appender: None,
554            control_sink: None,
555            last_local_maintenance: MaintenanceDiscriminant::Active,
556            admin_verifier: None,
557            admin_audit_appender: super::audit_chain::no_op_arc(),
558            log_appender: super::log_chain::no_op_arc(),
559            migration_aborter: super::migration_aborter::no_op_arc(),
560            migration_snapshot_source: super::migration_snapshot_source::no_op_arc(),
561        };
562        MeshOsLoopParts {
563            mesh_loop: me,
564            handle,
565            actions_rx,
566            reader,
567        }
568    }
569
570    /// Clone the dropped-action counter. The runtime uses this
571    /// to surface the count through `ExecutorStatsSnapshot`;
572    /// tests can also assert against it directly.
573    pub fn dropped_actions_counter(&self) -> Arc<AtomicU64> {
574        Arc::clone(&self.dropped_actions)
575    }
576
577    /// Attach a probe registry. The loop polls each registered
578    /// probe on every Tick, before reconcile. The registry is
579    /// shareable + cloneable, so callers retain it to add probes
580    /// after `MeshOsLoop::new` returns (the loop has been moved
581    /// into the spawned task at that point).
582    pub fn with_probe_registry(mut self, registry: ProbeRegistry) -> Self {
583        self.probes = registry.inner;
584        self
585    }
586
587    /// Attach a scheduler registry. The reconcile pass reads
588    /// the registered scorer to drive Phase D-1 rebalancing.
589    /// Cloneable + shareable like the probe registry.
590    pub fn with_scheduler_registry(mut self, registry: SchedulerRegistry) -> Self {
591        self.scheduler = registry;
592        self
593    }
594
595    /// Attach a recovery registry. SDK consumers register one
596    /// `RecoveryHandler` per group whose slots can be
597    /// re-placed; the tick handler runs them all once per Tick
598    /// (after `poll_probes`, before `run_reconcile`) so the
599    /// reconcile pass sees the recovered slot states.
600    pub fn with_recovery_registry(
601        mut self,
602        registry: crate::adapter::net::compute::RecoveryRegistry,
603    ) -> Self {
604        self.recovery_registry = registry;
605        self
606    }
607
608    /// Borrow the recovery registry — SDK consumers `register`
609    /// new handlers post-build. The `MeshOsRuntime` accessor
610    /// surfaces this so callers don't have to retain a clone.
611    pub fn recovery_registry(&self) -> &crate::adapter::net::compute::RecoveryRegistry {
612        &self.recovery_registry
613    }
614
615    /// Attach the executor's recent-failures ring. The loop reads
616    /// it on every `publish_snapshot` so the snapshot's
617    /// `recent_failures` field reflects executor-side dispatch
618    /// failures (the `MeshOsSnapshotFold` chain-record path is
619    /// not the only failure surface). The runtime calls this
620    /// after `ActionExecutor::new` so both halves of the pair
621    /// share the same ring.
622    pub fn with_executor_failures(
623        mut self,
624        failures: Arc<RwLock<VecDeque<FailureRecord>>>,
625    ) -> Self {
626        self.executor_failures = Some(failures);
627        self
628    }
629
630    /// Attach the executor's failure-seq counter + chain
631    /// appender so the loop can record its own runtime-side
632    /// failures (e.g. migration-abort dispatcher errors) with
633    /// the same monotonic sequence + durable chain dual-write
634    /// the executor uses. Pair this with
635    /// [`Self::with_executor_failures`]; together the trio
636    /// makes the loop's internal `record_runtime_failure`
637    /// helper a complete dual write into the snapshot ring +
638    /// the chain.
639    pub fn with_executor_failure_writer(
640        mut self,
641        seq: Arc<AtomicU64>,
642        appender: Arc<dyn super::failure_chain::FailureChainAppender>,
643    ) -> Self {
644        self.executor_failure_seq = Some(seq);
645        self.executor_failure_appender = Some(appender);
646        self
647    }
648
649    /// Attach a [`ControlSink`]. When set, the loop translates
650    /// this-node maintenance state transitions into
651    /// [`MeshOsControl`] events and forwards them through the
652    /// sink. The SDK installs a sink that routes events to
653    /// per-daemon control channels via its router; substrate
654    /// code that doesn't need the SDK surface can leave this
655    /// unset.
656    pub fn with_control_sink(mut self, sink: Arc<dyn ControlSink>) -> Self {
657        self.control_sink = Some(sink);
658        self
659    }
660
661    /// Attach an [`super::ice::AdminVerifier`]. When set, every
662    /// [`MeshOsEvent::SignedIceCommit`] is gated on signature
663    /// verification + the cluster's signature threshold before
664    /// folding the inner [`super::event::AdminEvent`]. Verified
665    /// commits fold normally; rejected commits drop + emit a
666    /// failure record so operators see the rejection in the
667    /// snapshot's `recent_failures` ring (substrate slice that
668    /// wires the failure pipe lands alongside the SDK surface
669    /// upgrade).
670    pub fn with_admin_verifier(mut self, verifier: Arc<super::ice::AdminVerifier>) -> Self {
671        self.admin_verifier = Some(verifier);
672        self
673    }
674
675    /// Attach a [`super::audit_chain::AdminAuditChainAppender`].
676    /// The loop's `record_admin_audit` path dual-writes every
677    /// admin commit to both the in-memory ring (snapshot
678    /// readable) and this appender (chain-backed history).
679    /// Without an explicit appender the loop uses the no-op
680    /// default; only the in-memory ring is observable.
681    pub fn with_admin_audit_appender(
682        mut self,
683        appender: Arc<dyn super::audit_chain::AdminAuditChainAppender>,
684    ) -> Self {
685        self.admin_audit_appender = appender;
686        self
687    }
688
689    /// Attach a [`super::log_chain::LogChainAppender`]. The
690    /// loop's `record_log_line` path dual-writes every log
691    /// line to both the in-memory ring (snapshot readable)
692    /// and this appender (chain-backed history). Without an
693    /// explicit appender the loop uses the no-op default.
694    pub fn with_log_appender(
695        mut self,
696        appender: Arc<dyn super::log_chain::LogChainAppender>,
697    ) -> Self {
698        self.log_appender = appender;
699        self
700    }
701
702    /// Attach a [`super::migration_aborter::MigrationAborter`].
703    /// The loop calls this after folding a verified
704    /// [`super::event::AdminEvent::KillMigration`]; production
705    /// deployments wire the
706    /// [`super::migration_aborter::OrchestratorMigrationAborter`]
707    /// adapter so the cluster's local `MigrationOrchestrator`
708    /// actually aborts in-flight migrations. Without an
709    /// explicit aborter the commit lands on the audit chain
710    /// but the migration runs to completion.
711    pub fn with_migration_aborter(
712        mut self,
713        aborter: Arc<dyn super::migration_aborter::MigrationAborter>,
714    ) -> Self {
715        self.migration_aborter = aborter;
716        self
717    }
718
719    /// Attach a
720    /// [`super::migration_snapshot_source::MigrationSnapshotSource`].
721    /// The loop reads this on every snapshot publish and
722    /// embeds the result in the snapshot's
723    /// `in_flight_migrations` field — the ICE simulator
724    /// reads it to enumerate which daemon a `KillMigration`
725    /// target would affect.
726    pub fn with_migration_snapshot_source(
727        mut self,
728        source: Arc<dyn super::migration_snapshot_source::MigrationSnapshotSource>,
729    ) -> Self {
730        self.migration_snapshot_source = source;
731        self
732    }
733
734    /// Drive the loop until either:
735    /// 1. all `MeshOsHandle` clones drop and the source channel
736    ///    empties (graceful end-of-events), or
737    /// 2. a `MeshOsEvent::Shutdown` is dequeued.
738    ///
739    /// Returns the final `reconcile_count` — used by tests; in
740    /// production it's diagnostic-only.
741    pub async fn run(mut self) -> u64 {
742        tracing::debug!(
743            target: "meshos",
744            this_node = self.config.this_node,
745            tick_interval_ms = self.config.tick_interval.as_millis() as u64,
746            event_queue_capacity = self.config.event_queue_capacity,
747            action_queue_capacity = self.config.action_queue_capacity,
748            "MeshOsLoop starting",
749        );
750        // Tick timer — fires every `tick_interval`. Configured
751        // to skip missed ticks rather than burst, since the
752        // reconcile pass is the slow-and-steady cadence the plan
753        // locks in.
754        let mut tick = interval_at(
755            TokioInstant::now() + self.config.tick_interval,
756            self.config.tick_interval,
757        );
758        // `Delay` over `Skip`: under load the reconcile cadence
759        // drifts but no tick is silently dropped, so the loop
760        // never falls behind on probe + reconcile passes that
761        // would surface stale state. `Burst` would amplify a
762        // backlog, which is what the locked-decision-1 "one
763        // pass per tick" guarantee is designed to prevent.
764        tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
765
766        // Bound how many events get drained between ticks so a
767        // heavy reconcile + 100 ms tick can't starve `events_rx`
768        // under the `biased;` priority below. The pre-fix
769        // `biased; tick.tick() ⇒ events_rx.recv()` ordering
770        // correctly defends event-burst-starves-tick (the original
771        // bug), but in the other direction a long `run_reconcile`
772        // that yields mid-await re-enters the select and tick
773        // wins again — events sit in the queue until the queue
774        // saturates the sender or the system idles. Drain up to
775        // `EVENTS_PER_TICK` events in a non-blocking pass after
776        // each tick so both directions make bounded progress.
777        const EVENTS_PER_TICK: usize = 32;
778        let mut shutdown = false;
779        loop {
780            tokio::select! {
781                // `biased` polls arms in source order, so a
782                // saturated `events_rx` cannot starve the reconcile
783                // tick. The previous (random) order let a sustained
784                // event burst defer reconcile / poll_probes / gc_freeze
785                // for the duration of the burst — `local_maintenance`
786                // went stale, `applied_backoffs` stuck, and
787                // `freeze_until` never GC'd because gc_freeze only
788                // runs on Tick.
789                biased;
790                _ = tick.tick() => {
791                    // The Tick event drives reconcile; we route
792                    // it through the same `apply` path so the
793                    // `last_tick` fold field updates uniformly.
794                    self.apply(&MeshOsEvent::Tick);
795                    // Pull-via-tick probes — folded BEFORE
796                    // reconcile so the reconcile pass sees the
797                    // latest samples in this tick window.
798                    self.poll_probes();
799                    // X-13: run the recovery handlers between
800                    // probe samples and reconcile. Probes refresh
801                    // the per-peer healthy view, then recovery
802                    // re-places any unhealthy slots against the
803                    // current healthy node pool, then reconcile
804                    // sees the post-recovery state. Empty
805                    // registry → no-op; handlers that report
806                    // recovered slots are logged at debug for
807                    // operator visibility.
808                    let recovered = self.recovery_registry.try_run_all();
809                    if !recovered.is_empty() {
810                        tracing::debug!(
811                            target: "meshos",
812                            slots = ?recovered,
813                            "X-13: recovery registry placed unhealthy slots"
814                        );
815                    }
816                    self.run_reconcile().await;
817
818                    // Drain a bounded batch of events from the
819                    // queue before the next tick can re-fire. This
820                    // is the inverse-direction starvation guard
821                    // for the `biased;` above. `try_recv` is non-
822                    // blocking, so an empty queue costs one Err
823                    // each iteration and the loop falls through to
824                    // the next select.
825                    for _ in 0..EVENTS_PER_TICK {
826                        match self.events_rx.try_recv() {
827                            Ok(event) => {
828                                if matches!(event, MeshOsEvent::Shutdown) {
829                                    tracing::debug!(
830                                        target: "meshos",
831                                        reconcile_count = self.reconcile_count,
832                                        "MeshOsLoop exiting — Shutdown drained post-tick",
833                                    );
834                                    shutdown = true;
835                                    break;
836                                }
837                                self.apply(&event);
838                            }
839                            Err(mpsc::error::TryRecvError::Empty) => break,
840                            Err(mpsc::error::TryRecvError::Disconnected) => {
841                                tracing::debug!(
842                                    target: "meshos",
843                                    reconcile_count = self.reconcile_count,
844                                    "MeshOsLoop exiting — all handles dropped (drained post-tick)",
845                                );
846                                shutdown = true;
847                                break;
848                            }
849                        }
850                    }
851                    if shutdown {
852                        break;
853                    }
854                }
855                event = self.events_rx.recv() => {
856                    let Some(event) = event else {
857                        tracing::debug!(
858                            target: "meshos",
859                            reconcile_count = self.reconcile_count,
860                            "MeshOsLoop exiting — all handles dropped",
861                        );
862                        break;
863                    };
864                    if matches!(event, MeshOsEvent::Shutdown) {
865                        tracing::debug!(
866                            target: "meshos",
867                            reconcile_count = self.reconcile_count,
868                            "MeshOsLoop exiting — Shutdown event received",
869                        );
870                        break;
871                    }
872                    self.apply(&event);
873                }
874            }
875        }
876
877        self.reconcile_count
878    }
879
880    /// Poll every registered locality / health probe and fold
881    /// the samples into the actual-state view. Idempotent — the
882    /// folds overwrite per-peer entries, so a re-poll within
883    /// the same tick produces the same state.
884    ///
885    /// Each probe call runs inside `catch_unwind` so a panicking
886    /// probe doesn't unwind the loop task. Probes are
887    /// third-party-installed by definition (the substrate hands
888    /// out the `ProbeRegistry`), so trust-but-isolate is the
889    /// right posture.
890    fn poll_probes(&mut self) {
891        // Clone all probe lists out under a single read lock so
892        // a concurrent install between the locality / health /
893        // inventory passes doesn't see a half-applied registry.
894        let (locality, health, inventory) = {
895            let guard = self.probes.lists.read();
896            (
897                guard.locality.clone(),
898                guard.health.clone(),
899                guard.inventory.clone(),
900            )
901        };
902        for probe in &locality {
903            let probe = Arc::clone(probe);
904            let result =
905                std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| probe.rtt_samples()));
906            match result {
907                Ok(samples) => {
908                    for (peer, rtt) in samples {
909                        self.actual.rtt.insert(peer, rtt);
910                    }
911                }
912                Err(_) => {
913                    tracing::error!(
914                        target: "meshos",
915                        "locality probe panicked — sample skipped this tick",
916                    );
917                }
918            }
919        }
920        for probe in &health {
921            let probe = Arc::clone(probe);
922            let result =
923                std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| probe.health_samples()));
924            match result {
925                Ok(samples) => {
926                    for (peer, hc) in samples {
927                        self.actual.node_health.insert(peer, hc);
928                    }
929                }
930                Err(_) => {
931                    tracing::error!(
932                        target: "meshos",
933                        "health probe panicked — sample skipped this tick",
934                    );
935                }
936            }
937        }
938        // Track inventory samples seen this tick so the GC pass
939        // below drops entries for peers no probe sees anymore.
940        // The map is probe-exclusive (no `MeshOsEvent` populates
941        // it), so it's safe to authoritatively prune from the
942        // probe pass. The rtt + node_health maps are also fed by
943        // `MeshOsEvent::{RttSample, NodeHealth}` event folds, so
944        // their per-tick prune would erase event-driven samples;
945        // those maps grow with proximity churn but the inventory
946        // leak the review flagged was per-peer multi-field and
947        // strictly worse.
948        let mut peers_seen_inventory: std::collections::HashSet<super::event::NodeId> =
949            std::collections::HashSet::new();
950        let mut all_probes_succeeded = true;
951        let mut any_probe_saw_samples = false;
952        for probe in &inventory {
953            let probe = Arc::clone(probe);
954            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
955                probe.inventory_samples()
956            }));
957            match result {
958                Ok(samples) => {
959                    if !samples.is_empty() {
960                        any_probe_saw_samples = true;
961                    }
962                    for (peer, inv) in samples {
963                        peers_seen_inventory.insert(peer);
964                        self.actual.inventory.insert(peer, inv);
965                    }
966                }
967                Err(_) => {
968                    all_probes_succeeded = false;
969                    tracing::error!(
970                        target: "meshos",
971                        "inventory probe panicked — sample skipped this tick",
972                    );
973                }
974            }
975        }
976        // Two-axis guard for the GC pass:
977        //
978        //   * `all_probes_succeeded` — a partial-panic pass
979        //     can leave `peers_seen_inventory` short of peers
980        //     that the panicking probe owned authoritatively
981        //     (e.g. two inventory probes with disjoint
982        //     coverage); pruning then drops the panicking
983        //     probe's peers despite no probe knowing they
984        //     departed. Wait for the next clean tick instead.
985        //
986        //   * `any_probe_saw_samples` — the trait contract
987        //     allows a probe to return `Ok(vec![])` for a
988        //     transient empty-this-tick (procfs unavailable,
989        //     no peers known yet at startup); pruning on an
990        //     all-empty pass would wipe every previously-seen
991        //     peer's inventory. The first tick at startup
992        //     legitimately has no peers, so this also
993        //     prevents a cold-start wipe.
994        if all_probes_succeeded && any_probe_saw_samples {
995            self.actual
996                .inventory
997                .retain(|peer, _| peers_seen_inventory.contains(peer));
998        }
999    }
1000
1001    fn apply(&mut self, event: &MeshOsEvent) {
1002        // ICE-signed commits get their own gate: verify the
1003        // bundle before folding the inner AdminEvent. The
1004        // outcome (accepted, rejected, unverified) lands on the
1005        // admin_audit ring regardless of verification result so
1006        // security review can replay every attempt.
1007        if let MeshOsEvent::SignedIceCommit {
1008            proposal,
1009            signatures,
1010            issued_at_ms,
1011            blast_hash,
1012        } = event
1013        {
1014            let outcome = match self.admin_verifier.as_ref() {
1015                Some(verifier) => {
1016                    let now_ms = super::ice::now_ms_since_unix_epoch();
1017                    match verifier.verify_commit(
1018                        proposal,
1019                        signatures,
1020                        *issued_at_ms,
1021                        blast_hash,
1022                        now_ms,
1023                    ) {
1024                        Ok(()) => super::ice::VerificationOutcome::Accepted,
1025                        Err(err) => super::ice::VerificationOutcome::Rejected {
1026                            kind: err.kind().to_string(),
1027                            message: err.to_string(),
1028                        },
1029                    }
1030                }
1031                None => super::ice::VerificationOutcome::Unverified,
1032            };
1033            let admin_event = proposal.to_admin_event();
1034            let operator_ids: Vec<u64> = signatures.iter().map(|s| s.operator_id).collect();
1035            self.record_admin_audit(&admin_event, operator_ids, outcome.clone());
1036            if let super::ice::VerificationOutcome::Rejected { kind, message } = &outcome {
1037                tracing::warn!(
1038                    target: "meshos",
1039                    kind = %kind,
1040                    error = %message,
1041                    "rejected SignedIceCommit — signature verification failed",
1042                );
1043                return;
1044            }
1045            // Verification passed (or no verifier installed —
1046            // tests / dev mode). Fold as if the inner AdminEvent
1047            // arrived directly. Dispatch fires AFTER actual.apply
1048            // so the dispatcher observes post-fold state — the
1049            // ordering invariant the loop preserves for every
1050            // chain-committed admin event.
1051            self.desired
1052                .apply_admin(&admin_event, self.config.this_node);
1053            let unwrapped = MeshOsEvent::AdminEvent(admin_event.clone());
1054            self.actual.apply(&unwrapped, self.config.this_node);
1055            self.dispatch_kill_migration_if_applicable(&admin_event);
1056            self.emit_maintenance_transitions();
1057            return;
1058        }
1059        // Single-signature signed-admin commits mirror the ICE
1060        // path: verify, audit, fold on success. The signature
1061        // covers `admin_event_signing_payload(event)` so the
1062        // SDK and substrate agree on the byte sequence.
1063        if let MeshOsEvent::SignedAdminCommit {
1064            event: admin_event,
1065            signature,
1066            issued_at_ms,
1067        } = event
1068        {
1069            // Freeze gate: ordinary admin commits that arrive
1070            // during an in-effect cluster freeze land on the
1071            // audit ring as Rejected with kind
1072            // "freeze_in_effect" and the inner event drops. ICE
1073            // commits (the multi-op SignedIceCommit path) bypass
1074            // by design — operators must be able to thaw the
1075            // cluster mid-freeze.
1076            let now = self
1077                .actual
1078                .last_tick
1079                .unwrap_or_else(std::time::Instant::now);
1080            if self.actual.is_frozen(now) && !admin_event.is_ice() {
1081                self.record_admin_audit(
1082                    admin_event,
1083                    vec![signature.operator_id],
1084                    super::ice::VerificationOutcome::Rejected {
1085                        kind: "freeze_in_effect".to_string(),
1086                        message: "ordinary admin commits are gated during a cluster freeze; \
1087                                  thaw via the ICE surface to unblock"
1088                            .to_string(),
1089                    },
1090                );
1091                tracing::warn!(
1092                    target: "meshos",
1093                    kind = "freeze_in_effect",
1094                    "rejected SignedAdminCommit — cluster is frozen and the event is non-ICE",
1095                );
1096                return;
1097            }
1098            let outcome = match self.admin_verifier.as_ref() {
1099                Some(verifier) => {
1100                    let now_ms = super::ice::now_ms_since_unix_epoch();
1101                    match verifier.verify_admin_commit(
1102                        admin_event,
1103                        signature,
1104                        *issued_at_ms,
1105                        now_ms,
1106                    ) {
1107                        Ok(()) => super::ice::VerificationOutcome::Accepted,
1108                        Err(err) => super::ice::VerificationOutcome::Rejected {
1109                            kind: err.kind().to_string(),
1110                            message: err.to_string(),
1111                        },
1112                    }
1113                }
1114                None => super::ice::VerificationOutcome::Unverified,
1115            };
1116            self.record_admin_audit(admin_event, vec![signature.operator_id], outcome.clone());
1117            if let super::ice::VerificationOutcome::Rejected { kind, message } = &outcome {
1118                tracing::warn!(
1119                    target: "meshos",
1120                    kind = %kind,
1121                    error = %message,
1122                    "rejected SignedAdminCommit — signature verification failed",
1123                );
1124                return;
1125            }
1126            self.desired.apply_admin(admin_event, self.config.this_node);
1127            let unwrapped = MeshOsEvent::AdminEvent(admin_event.clone());
1128            self.actual.apply(&unwrapped, self.config.this_node);
1129            self.dispatch_kill_migration_if_applicable(admin_event);
1130            self.emit_maintenance_transitions();
1131            return;
1132        }
1133        // Log lines bypass the actual/desired fold entirely;
1134        // the loop stamps + pushes onto the log ring directly.
1135        if let MeshOsEvent::LogLine(line) = event {
1136            self.record_log_line(line);
1137            return;
1138        }
1139        match event {
1140            MeshOsEvent::PlacementIntent(intent) => self.desired.apply(intent),
1141            MeshOsEvent::DaemonIntentUpdate(update) => self.desired.apply_daemon_intent(update),
1142            MeshOsEvent::LocalReplicaIntent(update) => {
1143                self.desired.apply_local_replica_intent(update)
1144            }
1145            MeshOsEvent::AdminEvent(admin) => {
1146                // Freeze gate, same as the SignedAdminCommit
1147                // path: ordinary admin events drop during a
1148                // cluster freeze. ICE events bypass.
1149                let now = self
1150                    .actual
1151                    .last_tick
1152                    .unwrap_or_else(std::time::Instant::now);
1153                if self.actual.is_frozen(now) && !admin.is_ice() {
1154                    self.record_admin_audit(
1155                        admin,
1156                        Vec::new(),
1157                        super::ice::VerificationOutcome::Rejected {
1158                            kind: "freeze_in_effect".to_string(),
1159                            message: "ordinary admin commits are gated during a cluster freeze; \
1160                                      thaw via the ICE surface to unblock"
1161                                .to_string(),
1162                        },
1163                    );
1164                    tracing::warn!(
1165                        target: "meshos",
1166                        kind = "freeze_in_effect",
1167                        "rejected unsigned AdminEvent — cluster is frozen and the event is non-ICE",
1168                    );
1169                    return;
1170                }
1171                // Unsigned admin commits also land on the audit
1172                // ring with Unverified outcome — so audit
1173                // consumers see every admin attempt the cluster
1174                // observed, not just signed ICE bundles.
1175                self.record_admin_audit(
1176                    admin,
1177                    Vec::new(),
1178                    super::ice::VerificationOutcome::Unverified,
1179                );
1180                self.desired.apply_admin(admin, self.config.this_node);
1181            }
1182            _ => {}
1183        }
1184        self.actual.apply(event, self.config.this_node);
1185        // KillMigration / future dispatchers fire AFTER
1186        // actual.apply so they observe post-fold state. Lifted
1187        // out of the AdminEvent arm above for that ordering.
1188        if let MeshOsEvent::AdminEvent(admin) = event {
1189            self.dispatch_kill_migration_if_applicable(admin);
1190        }
1191        self.emit_maintenance_transitions();
1192    }
1193
1194    /// Record an admin-commit outcome on the state's audit
1195    /// ring. Bounded by
1196    /// [`super::ice::DEFAULT_MAX_ADMIN_AUDIT_RECORDS`]; drops
1197    /// oldest FIFO when the cap is exceeded.
1198    #[expect(
1199        clippy::expect_used,
1200        reason = "u64 admin_audit_seq overflow takes ~58 million years at 100 µs/event; the documented loud-panic strategy is preferable to silent saturation that collapses every subsequent record into a single ring entry"
1201    )]
1202    fn record_admin_audit(
1203        &mut self,
1204        event: &super::event::AdminEvent,
1205        operator_ids: Vec<u64>,
1206        outcome: super::ice::VerificationOutcome,
1207    ) {
1208        use std::time::{SystemTime, UNIX_EPOCH};
1209        let committed_at_ms = SystemTime::now()
1210            .duration_since(UNIX_EPOCH)
1211            .map(|d| d.as_millis() as u64)
1212            .unwrap_or(0);
1213        // Stamp a monotonic per-runtime seq starting at 1 so a
1214        // `0` seq downstream reads as "unset." Panic on overflow:
1215        // the SDK dedup gate keys on `seq`, so a saturating add
1216        // would pin every record past `u64::MAX` at the cap and
1217        // silently collapse them all into a single ring entry —
1218        // permanent audit-loss disguised as routine. `checked_add`
1219        // is the loud signal an operator wants: at 100 µs/event
1220        // we'd reach the overflow ~58 million years in, so the
1221        // panic surfaces only a true runaway bug, not a real
1222        // production condition.
1223        self.admin_audit_seq = self
1224            .admin_audit_seq
1225            .checked_add(1)
1226            .expect("admin_audit_seq overflowed u64 — runaway producer or counter corruption");
1227        let record = super::ice::AdminAuditRecord {
1228            seq: self.admin_audit_seq,
1229            committed_at_ms,
1230            event: event.clone(),
1231            operator_ids,
1232            outcome,
1233            chain_pending: false,
1234        };
1235        // Ring-first then chain. The ring is the immediate
1236        // user-visible surface that the Deck SDK polls; the chain
1237        // is durability backup for cluster-lifetime replay. If the
1238        // chain append fails, mark the ring entry's chain_pending
1239        // flag so chain consumers (replaying after restart) can
1240        // distinguish "entry never landed" from "entry hasn't
1241        // reached me yet." Pre-fix the chain attempt ran BEFORE
1242        // the ring push and the ring entry never recorded the
1243        // chain-side outcome — chain consumers saw a gap with no
1244        // way to know it was permanent.
1245        self.admin_audit_ring.push_back(record.clone());
1246        while self.admin_audit_ring.len() > super::ice::DEFAULT_MAX_ADMIN_AUDIT_RECORDS {
1247            self.admin_audit_ring.pop_front();
1248        }
1249        if let Err(err) = self.admin_audit_appender.append(&record) {
1250            tracing::warn!(
1251                target: "meshos",
1252                seq = record.seq,
1253                error = %err,
1254                "admin-audit-chain append failed — ring entry marked chain_pending",
1255            );
1256            // Locate the just-pushed entry (back of the ring; cap
1257            // protects against an empty back() but we just pushed)
1258            // and flip its chain_pending flag so the ring surface
1259            // reports the gap.
1260            if let Some(last) = self.admin_audit_ring.back_mut() {
1261                last.chain_pending = true;
1262            }
1263        }
1264    }
1265
1266    /// Stamp + push a `LogLine` onto the per-node log ring.
1267    /// Bounded by [`super::logs::DEFAULT_MAX_LOG_RING_RECORDS`];
1268    /// drops oldest FIFO when the cap is exceeded.
1269    #[expect(
1270        clippy::expect_used,
1271        reason = "u64 log_seq overflow is astronomically distant; the documented loud-panic strategy is preferable to silent saturation"
1272    )]
1273    fn record_log_line(&mut self, line: &super::logs::LogLine) {
1274        use std::time::{SystemTime, UNIX_EPOCH};
1275        let ts_ms = SystemTime::now()
1276            .duration_since(UNIX_EPOCH)
1277            .map(|d| d.as_millis() as u64)
1278            .unwrap_or(0);
1279        // Checked add mirrors admin_audit_seq above: panic on the
1280        // (astronomical) u64::MAX boundary rather than silently
1281        // collapse every subsequent record into the saturating
1282        // cap, which the SDK dedup gate would treat as a single
1283        // duplicate.
1284        self.log_seq = self
1285            .log_seq
1286            .checked_add(1)
1287            .expect("log_seq overflowed u64 — runaway producer or counter corruption");
1288        let record = super::logs::LogRecord {
1289            seq: self.log_seq,
1290            ts_ms,
1291            level: line.level,
1292            daemon_id: line.daemon_id,
1293            node_id: Some(self.config.this_node),
1294            message: line.message.clone(),
1295            chain_pending: false,
1296        };
1297        // Ring-first then chain. Same rationale as record_admin_audit:
1298        // the ring is the immediate user surface; the chain is
1299        // durability backup. Mark chain_pending on the just-pushed
1300        // ring entry when the chain append fails so chain consumers
1301        // can distinguish "gap" from "haven't replicated yet."
1302        self.log_ring.push_back(record.clone());
1303        while self.log_ring.len() > super::logs::DEFAULT_MAX_LOG_RING_RECORDS {
1304            self.log_ring.pop_front();
1305        }
1306        if let Err(err) = self.log_appender.append(&record) {
1307            tracing::warn!(
1308                target: "meshos",
1309                seq = record.seq,
1310                error = %err,
1311                "log-chain append failed — ring entry marked chain_pending",
1312            );
1313            if let Some(last) = self.log_ring.back_mut() {
1314                last.chain_pending = true;
1315            }
1316        }
1317    }
1318
1319    /// Route a verified [`super::event::AdminEvent::KillMigration`]
1320    /// to the installed
1321    /// [`super::migration_aborter::MigrationAborter`]. No-op for
1322    /// every other admin variant — the match guards the lookup so
1323    /// callers can invoke this unconditionally after fold without
1324    /// re-pattern-matching. Errors are logged + swallowed; a
1325    /// dispatcher hiccup must never wedge the loop.
1326    fn dispatch_kill_migration_if_applicable(&self, admin: &super::event::AdminEvent) {
1327        let super::event::AdminEvent::KillMigration { migration } = admin else {
1328            return;
1329        };
1330        // No-op aborter installed with a verifier wired — this
1331        // is "production-partial" config: the chain commit
1332        // landed but the migration runs to completion because
1333        // the aborter is a no-op. Surface as a FailureRecord
1334        // so operators reading subscribe_failures see it.
1335        if self.migration_aborter.is_no_op() && self.admin_verifier.is_some() {
1336            self.record_runtime_failure(
1337                format!("kill-migration:{}", *migration),
1338                "no-op migration aborter installed while admin verifier is wired — \
1339                 chain commit landed but KillMigration is a no-op on this node"
1340                    .to_string(),
1341            );
1342            return;
1343        }
1344        if let Err(err) = self.migration_aborter.abort(*migration) {
1345            // Dispatch failure: log AND push to the failure
1346            // ring so the Deck SDK's subscribe_failures stream
1347            // surfaces it. A previous slice swallowed this
1348            // failure to a tracing::warn! only.
1349            tracing::warn!(
1350                target: "meshos",
1351                migration = migration,
1352                error = %err,
1353                "migration-abort dispatcher failed — chain commit landed but migration may continue",
1354            );
1355            self.record_runtime_failure(
1356                format!("kill-migration:{}", *migration),
1357                format!("migration-abort dispatcher error: {err}"),
1358            );
1359        }
1360    }
1361
1362    /// Push a runtime-side failure onto the executor's failure
1363    /// ring + chain. Used by the loop for failures that don't
1364    /// originate from an action dispatch — e.g. migration-abort
1365    /// dispatcher errors after a `KillMigration` commit. Falls
1366    /// back to a `tracing::warn!` when the executor handles
1367    /// aren't wired (in-process tests that don't run the
1368    /// executor task).
1369    fn record_runtime_failure(&self, source: String, reason: String) {
1370        let recorded_at_ms = std::time::SystemTime::now()
1371            .duration_since(std::time::UNIX_EPOCH)
1372            .map(|d| d.as_millis() as u64)
1373            .unwrap_or(0);
1374        let seq = match self.executor_failure_seq.as_ref() {
1375            Some(s) => s.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1,
1376            None => {
1377                tracing::warn!(
1378                    target: "meshos",
1379                    source = %source,
1380                    reason = %reason,
1381                    "runtime-side failure surfaced but executor handles aren't wired",
1382                );
1383                return;
1384            }
1385        };
1386        let record = FailureRecord {
1387            seq,
1388            source,
1389            reason,
1390            recorded_at_ms,
1391        };
1392        if let Some(appender) = self.executor_failure_appender.as_ref() {
1393            if let Err(err) = appender.append(&record) {
1394                tracing::warn!(
1395                    target: "meshos",
1396                    seq = record.seq,
1397                    error = %err,
1398                    "failure-chain append failed — record kept on in-memory ring only",
1399                );
1400            }
1401        }
1402        if let Some(ring) = self.executor_failures.as_ref() {
1403            let mut g = ring.write();
1404            if g.len() >= super::snapshot::RECENT_FAILURES_CAPACITY {
1405                g.pop_front();
1406            }
1407            g.push_back(record);
1408        }
1409    }
1410
1411    /// Detect `local_maintenance` discriminant transitions and
1412    /// fan-out the corresponding [`MeshOsControl`] through the
1413    /// installed sink. Idempotent: re-entering the same
1414    /// discriminant emits nothing. Forward arcs only — the fold
1415    /// rejects late-arriving backward arcs upstream.
1416    fn emit_maintenance_transitions(&mut self) {
1417        let Some(sink) = self.control_sink.as_ref() else {
1418            self.last_local_maintenance =
1419                MaintenanceDiscriminant::from_state(&self.actual.local_maintenance);
1420            return;
1421        };
1422        let current = MaintenanceDiscriminant::from_state(&self.actual.local_maintenance);
1423        if current == self.last_local_maintenance {
1424            return;
1425        }
1426        // Anchor the fall-back deadline on the actual state's
1427        // last_tick so loop-replay produces identical deadlines.
1428        // Falling back to wall-clock would break the
1429        // replay-convergence contract the fold side already
1430        // pins on `last_tick`.
1431        let anchor = self
1432            .actual
1433            .last_tick
1434            .unwrap_or_else(std::time::Instant::now);
1435        let event = match (self.last_local_maintenance, current) {
1436            (_, MaintenanceDiscriminant::EnteringMaintenance) => {
1437                // Operator opened a drain. Use the configured
1438                // deadline if the admin event carried one; else
1439                // fall back to the maintenance-config default.
1440                let deadline = match &self.actual.local_maintenance {
1441                    MaintenanceState::EnteringMaintenance {
1442                        deadline: Some(d), ..
1443                    } => *d,
1444                    _ => anchor + self.config.maintenance.default_drain_deadline,
1445                };
1446                Some(MeshOsControl::DrainStart { deadline })
1447            }
1448            (
1449                MaintenanceDiscriminant::EnteringMaintenance,
1450                MaintenanceDiscriminant::Maintenance | MaintenanceDiscriminant::DrainFailed,
1451            ) => Some(MeshOsControl::DrainFinish),
1452            _ => None,
1453        };
1454        self.last_local_maintenance = current;
1455        if let Some(event) = event {
1456            sink.emit(event);
1457        }
1458    }
1459
1460    async fn run_reconcile(&mut self) {
1461        // Anchor every per-tick timestamp on `last_tick` (set by
1462        // `apply(Tick)` immediately before this call) so two
1463        // replays of the same event stream produce identical
1464        // `last_rebalance`, `applied_backoffs`, and
1465        // `PendingAction.emitted_at` values. Bootstrap fallback to
1466        // `Instant::now()` only when no Tick has fired yet.
1467        let now = self
1468            .actual
1469            .last_tick
1470            .unwrap_or_else(std::time::Instant::now);
1471        let scorer = self.scheduler.current();
1472        let actions = reconcile(
1473            &self.actual,
1474            &self.desired,
1475            self.config.this_node,
1476            &self.config.locality,
1477            &self.config.maintenance,
1478            &self.config.scheduler,
1479            scorer.as_deref(),
1480        );
1481        // Record cooldowns for any RequestEviction we emit so
1482        // the same chain doesn't flap on the next tick; track
1483        // `ApplyBackoff` emissions so reconcile suppresses
1484        // re-emit while the daemon stays in the same backoff
1485        // window.
1486        for action in &actions {
1487            match action {
1488                super::action::MeshOsAction::RequestEviction { chain, .. } => {
1489                    self.actual.last_rebalance.insert(*chain, now);
1490                }
1491                super::action::MeshOsAction::ApplyBackoff { daemon, until } => {
1492                    self.actual.applied_backoffs.insert(daemon.clone(), *until);
1493                }
1494                _ => {}
1495            }
1496        }
1497        // Drain queued force-evictions: reconcile consumed them
1498        // (whether or not this node is the leader for each
1499        // chain). Clearing here unconditionally keeps non-leader
1500        // observers from accumulating stale entries.
1501        if !self.actual.forced_evictions.is_empty() {
1502            self.actual.forced_evictions.clear();
1503        }
1504        // Same pattern for force-cutovers — every node drains
1505        // unconditionally so non-leader observers don't
1506        // accumulate stale entries.
1507        if !self.actual.forced_placements.is_empty() {
1508            self.actual.forced_placements.clear();
1509        }
1510        self.reconcile_count += 1;
1511        let mut dropped_this_tick: u64 = 0;
1512        let mut first_dropped_kind: Option<&'static str> = None;
1513        for action in actions {
1514            let pending = PendingAction {
1515                id: self.action_ids.next(),
1516                action,
1517                emitted_at: now,
1518            };
1519            // Drop on backpressure rather than block reconcile —
1520            // the executor's job is to apply admit(); reconcile
1521            // staying responsive is the higher-order property.
1522            // Count + log drops so the silent-loss path is
1523            // observable. recent_emissions only gets the action
1524            // on try_send success — pre-fix every action landed in
1525            // recent_emissions (and surfaced as `recently_emitted`
1526            // on the snapshot) even when the executor's queue was
1527            // full and the action would never run, double-counting
1528            // the drop in dropped_actions while painting a misleading
1529            // "emitted" picture in the snapshot.
1530            let pending_clone = pending.clone();
1531            match self.actions_tx.try_send(pending) {
1532                Ok(()) => self.recent_emissions.push(pending_clone),
1533                Err(mpsc::error::TrySendError::Full(rejected)) => {
1534                    dropped_this_tick += 1;
1535                    if first_dropped_kind.is_none() {
1536                        first_dropped_kind =
1537                            Some(super::snapshot::action_kind_str(&rejected.action));
1538                    }
1539                }
1540                Err(mpsc::error::TrySendError::Closed(_)) => {
1541                    // Executor task is gone — count as dropped and
1542                    // skip recent_emissions; subsequent reconciles
1543                    // will keep counting until the loop tears down.
1544                    dropped_this_tick += 1;
1545                }
1546            }
1547        }
1548        if dropped_this_tick > 0 {
1549            self.dropped_actions
1550                .fetch_add(dropped_this_tick, Ordering::Relaxed);
1551            tracing::warn!(
1552                target: "meshos",
1553                dropped = dropped_this_tick,
1554                first_kind = first_dropped_kind.unwrap_or("?"),
1555                queue_capacity = self.config.action_queue_capacity,
1556                "reconcile output dropped — action queue full",
1557            );
1558        }
1559        self.publish_snapshot();
1560        // Bound the in-loop pending mirror so a backed-up
1561        // executor doesn't let snapshot pending grow unbounded.
1562        // Action queue capacity is the natural bound.
1563        if self.recent_emissions.len() > self.config.action_queue_capacity {
1564            let overflow = self.recent_emissions.len() - self.config.action_queue_capacity;
1565            self.recent_emissions.drain(..overflow);
1566        }
1567    }
1568
1569    fn publish_snapshot(&self) {
1570        // Read the executor's failures ring under a short read
1571        // lock and copy it into the snapshot. The lock is held
1572        // only across the clone to keep the executor's write
1573        // path (record_failure) responsive.
1574        let failures: Vec<FailureRecord> = match self.executor_failures.as_ref() {
1575            Some(ring) => ring.read().iter().cloned().collect(),
1576            None => Vec::new(),
1577        };
1578        let in_flight_migrations = self.migration_snapshot_source.list();
1579        let mut snap = MeshOsSnapshot::from_state(
1580            &self.actual,
1581            &self.desired,
1582            &self.recent_emissions,
1583            &failures,
1584            in_flight_migrations,
1585            &self.admin_audit_ring,
1586            &self.log_ring,
1587            self.config.this_node,
1588        );
1589        snap.runtime_epoch_id = self.runtime_epoch_id;
1590        self.snapshot.store(Arc::new(snap));
1591    }
1592}
1593
1594/// Convenience: a config with a very short `tick_interval` for
1595/// tests, so the reconcile pass fires quickly. Not exported
1596/// outside the crate.
1597#[cfg(test)]
1598pub(crate) fn fast_test_config() -> MeshOsConfig {
1599    MeshOsConfig {
1600        this_node: 1,
1601        tick_interval: std::time::Duration::from_millis(10),
1602        event_queue_capacity: 64,
1603        action_queue_capacity: 64,
1604        backpressure: Default::default(),
1605        locality: Default::default(),
1606        maintenance: Default::default(),
1607        scheduler: Default::default(),
1608    }
1609}
1610
1611#[cfg(test)]
1612mod tests {
1613    #![allow(
1614        clippy::disallowed_methods,
1615        reason = "test code legitimately uses std::sync::{Mutex,RwLock} for SUT setup; tests have no real poison concern"
1616    )]
1617    use std::time::Duration as StdDuration;
1618
1619    use super::super::event::{
1620        ChainId, LocalReplicaIntent, LocalReplicaIntentUpdate, MeshOsEvent, ReplicaUpdate,
1621    };
1622    use super::*;
1623
1624    #[tokio::test]
1625    async fn loop_exits_cleanly_when_all_handles_drop() {
1626        let MeshOsLoopParts {
1627            mesh_loop: loop_,
1628            handle,
1629            mut actions_rx,
1630            reader: _,
1631        } = MeshOsLoop::new(fast_test_config());
1632        let task = tokio::spawn(loop_.run());
1633        drop(handle);
1634        // Loop should drain quickly. `run` returns the
1635        // reconcile count.
1636        let count = tokio::time::timeout(StdDuration::from_secs(1), task)
1637            .await
1638            .expect("loop did not exit after all handles dropped")
1639            .expect("join");
1640        // Zero or more ticks may have fired before we dropped;
1641        // assert we got at least zero (compiles + ran).
1642        let _ = count;
1643        // The action queue must be empty under Phase A.
1644        assert!(actions_rx.try_recv().is_err());
1645    }
1646
1647    #[tokio::test]
1648    async fn loop_exits_on_shutdown_event() {
1649        let MeshOsLoopParts {
1650            mesh_loop: loop_,
1651            handle,
1652            actions_rx: _,
1653            reader: _,
1654        } = MeshOsLoop::new(fast_test_config());
1655        let task = tokio::spawn(loop_.run());
1656        handle.publish(MeshOsEvent::Shutdown).await.unwrap();
1657        let count = tokio::time::timeout(StdDuration::from_secs(1), task)
1658            .await
1659            .expect("loop did not exit after Shutdown")
1660            .expect("join");
1661        let _ = count;
1662    }
1663
1664    #[tokio::test]
1665    async fn publish_timeout_returns_queue_full_when_loop_is_wedged() {
1666        // Regression for I11: `publish` parks indefinitely on a
1667        // wedged loop. `publish_timeout` surfaces
1668        // QueueFull after the configured window so sources don't
1669        // stall.
1670        let cfg = MeshOsConfig {
1671            event_queue_capacity: 1,
1672            ..fast_test_config()
1673        };
1674        let MeshOsLoopParts {
1675            mesh_loop: _loop_,
1676            handle,
1677            actions_rx: _actions_rx,
1678            reader: _reader,
1679        } = MeshOsLoop::new(cfg);
1680        // Don't spawn the loop — handle is alive but the
1681        // receiver is parked inside `_loop_`. First publish
1682        // fills the capacity-1 channel; second blocks
1683        // indefinitely. publish_timeout surfaces QueueFull.
1684        handle
1685            .publish(MeshOsEvent::Tick)
1686            .await
1687            .expect("first send fits in the single-slot channel");
1688        let started = std::time::Instant::now();
1689        let err = handle
1690            .publish_timeout(MeshOsEvent::Tick, StdDuration::from_millis(50))
1691            .await
1692            .expect_err("second send should time out");
1693        assert!(matches!(err, MeshOsHandleError::QueueFull));
1694        assert!(
1695            started.elapsed() < StdDuration::from_millis(500),
1696            "publish_timeout must honor its budget",
1697        );
1698    }
1699
1700    #[tokio::test]
1701    async fn panicking_probe_does_not_kill_the_loop() {
1702        // Regression for I6: a panicking probe used to unwind
1703        // the loop task. The catch_unwind wrapper in
1704        // `poll_probes` now logs + skips the bad sample so
1705        // reconcile keeps running.
1706        struct PanickyProbe;
1707        impl super::super::probes::LocalityProbe for PanickyProbe {
1708            fn rtt_samples(&self) -> Vec<(u64, std::time::Duration)> {
1709                panic!("boom from probe");
1710            }
1711        }
1712        let registry = ProbeRegistry::new();
1713        registry.add_locality_probe(Arc::new(PanickyProbe));
1714        let cfg = MeshOsConfig {
1715            tick_interval: StdDuration::from_millis(10),
1716            ..fast_test_config()
1717        };
1718        let MeshOsLoopParts {
1719            mesh_loop: loop_,
1720            handle,
1721            actions_rx: _actions_rx,
1722            reader: _reader,
1723        } = MeshOsLoop::new(cfg);
1724        let loop_ = loop_.with_probe_registry(registry);
1725        let task = tokio::spawn(loop_.run());
1726        // Let several ticks fire — each one will invoke the
1727        // panicking probe.
1728        tokio::time::sleep(StdDuration::from_millis(60)).await;
1729        handle.publish(MeshOsEvent::Shutdown).await.unwrap();
1730        let count = tokio::time::timeout(StdDuration::from_secs(2), task)
1731            .await
1732            .expect("loop should still exit cleanly")
1733            .expect("loop task survived probe panics");
1734        assert!(
1735            count >= 1,
1736            "loop should have completed at least one reconcile pass despite probe panics",
1737        );
1738    }
1739
1740    #[tokio::test]
1741    async fn inventory_gc_drops_peers_no_probe_samples_anymore() {
1742        // poll_probes used to insert per-peer inventory samples
1743        // and never remove them. A peer that departed from
1744        // proximity would leak into actual.inventory forever,
1745        // surfacing in every snapshot until the process
1746        // restarted. Pin the per-tick GC: an inventory probe
1747        // whose sample set shrinks across ticks should leave
1748        // actual.inventory pruned to match.
1749        use std::sync::atomic::{AtomicUsize, Ordering as AOrdering};
1750        use std::sync::Mutex;
1751        struct ShrinkingProbe {
1752            tick: AtomicUsize,
1753            // Mutex so the trait method can mutate freely
1754            // without &mut self.
1755            samples_per_tick: Mutex<Vec<Vec<(u64, super::super::probes::PeerInventory)>>>,
1756        }
1757        impl super::super::probes::InventoryProbe for ShrinkingProbe {
1758            fn inventory_samples(&self) -> Vec<(u64, super::super::probes::PeerInventory)> {
1759                let t = self.tick.fetch_add(1, AOrdering::Relaxed);
1760                let guard = self.samples_per_tick.lock().unwrap();
1761                // Past the end of the scripted set, stick on the
1762                // final entry so the steady state stays observed.
1763                let idx = t.min(guard.len().saturating_sub(1));
1764                guard.get(idx).cloned().unwrap_or_default()
1765            }
1766        }
1767        let inv_with_cpu = |cpu: f64| super::super::probes::PeerInventory {
1768            cpu_load_1m: Some(cpu),
1769            ..Default::default()
1770        };
1771        let probe = Arc::new(ShrinkingProbe {
1772            tick: AtomicUsize::new(0),
1773            samples_per_tick: Mutex::new(vec![
1774                vec![
1775                    (0xA, inv_with_cpu(0.1)),
1776                    (0xB, inv_with_cpu(0.2)),
1777                    (0xC, inv_with_cpu(0.3)),
1778                ],
1779                vec![(0xB, inv_with_cpu(0.2)), (0xC, inv_with_cpu(0.3))],
1780                vec![(0xB, inv_with_cpu(0.2)), (0xC, inv_with_cpu(0.3))],
1781            ]),
1782        });
1783        let registry = ProbeRegistry::new();
1784        registry.add_inventory_probe(probe);
1785        let cfg = MeshOsConfig {
1786            tick_interval: StdDuration::from_millis(10),
1787            ..fast_test_config()
1788        };
1789        let MeshOsLoopParts {
1790            mesh_loop: loop_,
1791            handle,
1792            actions_rx: _actions_rx,
1793            reader,
1794        } = MeshOsLoop::new(cfg);
1795        let loop_ = loop_.with_probe_registry(registry);
1796        let task = tokio::spawn(loop_.run());
1797        // Wait long enough for at least two ticks to fire so
1798        // the second sample set (without peer 0xA) lands.
1799        tokio::time::sleep(StdDuration::from_millis(80)).await;
1800        let snap = reader.read();
1801        let inv_peers: std::collections::BTreeSet<u64> = snap
1802            .peers
1803            .iter()
1804            .filter(|(_, p)| p.cpu_load_1m.is_some())
1805            .map(|(id, _)| *id)
1806            .collect();
1807        handle.publish(MeshOsEvent::Shutdown).await.unwrap();
1808        let _ = tokio::time::timeout(StdDuration::from_secs(2), task).await;
1809        assert!(
1810            !inv_peers.contains(&0xA),
1811            "peer 0xA stopped being sampled but still appears in the inventory: {inv_peers:?}",
1812        );
1813        assert!(inv_peers.contains(&0xB), "peer 0xB should still be sampled");
1814        assert!(inv_peers.contains(&0xC), "peer 0xC should still be sampled");
1815    }
1816
1817    #[tokio::test]
1818    async fn inventory_gc_does_not_wipe_peers_when_one_of_two_probes_panics() {
1819        // Multi-probe partial-panic case: probe A reports peer
1820        // 0xA, probe B reports peer 0xB. On tick 2, probe B
1821        // panics; the GC pass must not drop 0xB just because A
1822        // didn't sample it. Without the `all_probes_succeeded`
1823        // guard, `peers_seen_inventory = {0xA}` and `retain`
1824        // would wipe 0xB even though no probe authoritatively
1825        // observed its departure.
1826        use std::sync::atomic::{AtomicUsize, Ordering as AOrdering};
1827        struct SteadyProbe {
1828            peer: u64,
1829            cpu: f64,
1830        }
1831        impl super::super::probes::InventoryProbe for SteadyProbe {
1832            fn inventory_samples(&self) -> Vec<(u64, super::super::probes::PeerInventory)> {
1833                vec![(
1834                    self.peer,
1835                    super::super::probes::PeerInventory {
1836                        cpu_load_1m: Some(self.cpu),
1837                        ..Default::default()
1838                    },
1839                )]
1840            }
1841        }
1842        struct SometimesPanickingProbe {
1843            tick: AtomicUsize,
1844            peer: u64,
1845            cpu: f64,
1846        }
1847        impl super::super::probes::InventoryProbe for SometimesPanickingProbe {
1848            fn inventory_samples(&self) -> Vec<(u64, super::super::probes::PeerInventory)> {
1849                let t = self.tick.fetch_add(1, AOrdering::Relaxed);
1850                if t == 0 {
1851                    vec![(
1852                        self.peer,
1853                        super::super::probes::PeerInventory {
1854                            cpu_load_1m: Some(self.cpu),
1855                            ..Default::default()
1856                        },
1857                    )]
1858                } else {
1859                    panic!("probe B panicked on tick {t}");
1860                }
1861            }
1862        }
1863        let registry = ProbeRegistry::new();
1864        registry.add_inventory_probe(Arc::new(SteadyProbe {
1865            peer: 0xA,
1866            cpu: 0.1,
1867        }));
1868        registry.add_inventory_probe(Arc::new(SometimesPanickingProbe {
1869            tick: AtomicUsize::new(0),
1870            peer: 0xB,
1871            cpu: 0.2,
1872        }));
1873        let cfg = MeshOsConfig {
1874            tick_interval: StdDuration::from_millis(10),
1875            ..fast_test_config()
1876        };
1877        let MeshOsLoopParts {
1878            mesh_loop: loop_,
1879            handle,
1880            actions_rx: _actions_rx,
1881            reader,
1882        } = MeshOsLoop::new(cfg);
1883        let loop_ = loop_.with_probe_registry(registry);
1884        let task = tokio::spawn(loop_.run());
1885        // Wait for the first tick (both probes succeed) and
1886        // several subsequent ticks (probe B panics).
1887        tokio::time::sleep(StdDuration::from_millis(80)).await;
1888        let snap = reader.read();
1889        let inv_peers: std::collections::BTreeSet<u64> = snap
1890            .peers
1891            .iter()
1892            .filter(|(_, p)| p.cpu_load_1m.is_some())
1893            .map(|(id, _)| *id)
1894            .collect();
1895        handle.publish(MeshOsEvent::Shutdown).await.unwrap();
1896        let _ = tokio::time::timeout(StdDuration::from_secs(2), task).await;
1897        assert!(
1898            inv_peers.contains(&0xA),
1899            "probe A's peer should still be in inventory: {inv_peers:?}",
1900        );
1901        assert!(
1902            inv_peers.contains(&0xB),
1903            "probe B's peer must survive probe B's panic — partial-panic ticks are not authoritative for the panicking probe's peers: {inv_peers:?}",
1904        );
1905    }
1906
1907    #[tokio::test]
1908    async fn inventory_gc_does_not_wipe_on_a_transient_empty_probe() {
1909        // The InventoryProbe trait permits an `Ok(vec![])`
1910        // return ("no peers this tick" — transient procfs
1911        // unavailability, cold start, etc.). Without the
1912        // `any_probe_saw_samples` guard the GC pass would
1913        // treat the empty return as "no peers authoritatively
1914        // exist anymore" and wipe every previously-seen peer's
1915        // inventory.
1916        use std::sync::atomic::{AtomicUsize, Ordering as AOrdering};
1917        struct FlakyProbe {
1918            tick: AtomicUsize,
1919        }
1920        impl super::super::probes::InventoryProbe for FlakyProbe {
1921            fn inventory_samples(&self) -> Vec<(u64, super::super::probes::PeerInventory)> {
1922                let t = self.tick.fetch_add(1, AOrdering::Relaxed);
1923                // Tick 0: populate. Subsequent ticks: empty
1924                // (simulate the resource probe losing its
1925                // procfs handle for a few cycles).
1926                if t == 0 {
1927                    vec![
1928                        (
1929                            0xA,
1930                            super::super::probes::PeerInventory {
1931                                cpu_load_1m: Some(0.1),
1932                                ..Default::default()
1933                            },
1934                        ),
1935                        (
1936                            0xB,
1937                            super::super::probes::PeerInventory {
1938                                cpu_load_1m: Some(0.2),
1939                                ..Default::default()
1940                            },
1941                        ),
1942                    ]
1943                } else {
1944                    vec![]
1945                }
1946            }
1947        }
1948        let registry = ProbeRegistry::new();
1949        registry.add_inventory_probe(Arc::new(FlakyProbe {
1950            tick: AtomicUsize::new(0),
1951        }));
1952        let cfg = MeshOsConfig {
1953            tick_interval: StdDuration::from_millis(10),
1954            ..fast_test_config()
1955        };
1956        let MeshOsLoopParts {
1957            mesh_loop: loop_,
1958            handle,
1959            actions_rx: _actions_rx,
1960            reader,
1961        } = MeshOsLoop::new(cfg);
1962        let loop_ = loop_.with_probe_registry(registry);
1963        let task = tokio::spawn(loop_.run());
1964        // Wait long enough for the populating tick + several
1965        // empty ticks.
1966        tokio::time::sleep(StdDuration::from_millis(80)).await;
1967        let snap = reader.read();
1968        let inv_peers: std::collections::BTreeSet<u64> = snap
1969            .peers
1970            .iter()
1971            .filter(|(_, p)| p.cpu_load_1m.is_some())
1972            .map(|(id, _)| *id)
1973            .collect();
1974        handle.publish(MeshOsEvent::Shutdown).await.unwrap();
1975        let _ = tokio::time::timeout(StdDuration::from_secs(2), task).await;
1976        assert!(
1977            inv_peers.contains(&0xA),
1978            "transient empty probe return must not wipe inventory: {inv_peers:?}",
1979        );
1980        assert!(
1981            inv_peers.contains(&0xB),
1982            "transient empty probe return must not wipe inventory: {inv_peers:?}",
1983        );
1984    }
1985
1986    #[tokio::test]
1987    async fn snapshot_reader_does_not_stall_under_concurrent_reads() {
1988        // With ArcSwap, publish is a pointer store and read is
1989        // a pointer load + Arc clone; concurrent readers cannot
1990        // stall the publisher. Smoke-test by spawning many
1991        // readers polling the snapshot in a tight loop while
1992        // the loop ticks repeatedly.
1993        let cfg = MeshOsConfig {
1994            tick_interval: StdDuration::from_millis(5),
1995            ..fast_test_config()
1996        };
1997        let MeshOsLoopParts {
1998            mesh_loop: loop_,
1999            handle,
2000            actions_rx: _actions_rx,
2001            reader,
2002        } = MeshOsLoop::new(cfg);
2003        let task = tokio::spawn(loop_.run());
2004
2005        let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
2006        let mut readers = Vec::new();
2007        for _ in 0..8 {
2008            let reader = reader.clone();
2009            let stop = Arc::clone(&stop);
2010            readers.push(tokio::spawn(async move {
2011                let mut count = 0u64;
2012                while !stop.load(Ordering::Relaxed) {
2013                    let _snap = reader.read();
2014                    count += 1;
2015                    tokio::task::yield_now().await;
2016                }
2017                count
2018            }));
2019        }
2020        // Let the loop fire ~10 ticks.
2021        tokio::time::sleep(StdDuration::from_millis(60)).await;
2022        stop.store(true, Ordering::Relaxed);
2023        handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2024        let _loop_count = task.await.expect("loop join");
2025        let mut total = 0u64;
2026        for r in readers {
2027            total += r.await.expect("reader join");
2028        }
2029        assert!(
2030            total > 0,
2031            "readers should have made progress while the loop published",
2032        );
2033    }
2034
2035    #[tokio::test]
2036    async fn dropped_actions_counter_increments_when_action_queue_is_full() {
2037        // Regression for C4: reconcile output silently dropped
2038        // when the action mpsc is at capacity. Make the queue
2039        // tiny, hold the receiver without draining, project a
2040        // reconcile pass that emits multiple actions, assert the
2041        // counter caught the drop.
2042        let cfg = MeshOsConfig {
2043            action_queue_capacity: 1,
2044            tick_interval: StdDuration::from_millis(10),
2045            ..fast_test_config()
2046        };
2047        let this_node = cfg.this_node;
2048        let MeshOsLoopParts {
2049            mesh_loop: loop_,
2050            handle,
2051            actions_rx: _actions_rx,
2052            reader: _reader,
2053        } = MeshOsLoop::new(cfg);
2054        let counter = loop_.dropped_actions_counter();
2055        let task = tokio::spawn(loop_.run());
2056        // Put `this_node` as a holder of five chains.
2057        for chain in 1..=5u64 {
2058            handle
2059                .publish(MeshOsEvent::ReplicaUpdate(ReplicaUpdate::Added {
2060                    chain: chain as ChainId,
2061                    holder: this_node,
2062                }))
2063                .await
2064                .unwrap();
2065        }
2066        // Project local Drop intent → reconcile emits one
2067        // DropReplica per chain on the next tick.
2068        for chain in 1..=5u64 {
2069            handle
2070                .publish(MeshOsEvent::LocalReplicaIntent(LocalReplicaIntentUpdate {
2071                    chain: chain as ChainId,
2072                    intent: LocalReplicaIntent::Drop,
2073                }))
2074                .await
2075                .unwrap();
2076        }
2077        // Let several ticks fire.
2078        tokio::time::sleep(StdDuration::from_millis(80)).await;
2079        handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2080        let _ = task.await;
2081        let dropped = counter.load(Ordering::Relaxed);
2082        assert!(
2083            dropped >= 1,
2084            "expected at least one dropped reconcile action with \
2085             action_queue_capacity = 1; got {dropped}",
2086        );
2087    }
2088
2089    #[tokio::test]
2090    async fn ticks_drive_reconcile_passes() {
2091        let cfg = MeshOsConfig {
2092            tick_interval: StdDuration::from_millis(20),
2093            ..fast_test_config()
2094        };
2095        let MeshOsLoopParts {
2096            mesh_loop: loop_,
2097            handle,
2098            actions_rx: _,
2099            reader: _,
2100        } = MeshOsLoop::new(cfg);
2101        let task = tokio::spawn(loop_.run());
2102        // Let several ticks fire.
2103        tokio::time::sleep(StdDuration::from_millis(120)).await;
2104        handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2105        let count = task.await.expect("join");
2106        // At a 20 ms tick over a 120 ms window we expect roughly
2107        // 4–7 reconcile passes; require at least 2 so a slow CI
2108        // host doesn't flake.
2109        assert!(
2110            count >= 2,
2111            "expected at least 2 reconcile passes, got {count}"
2112        );
2113    }
2114
2115    #[tokio::test]
2116    async fn locality_probe_samples_fold_into_actual_rtt_via_snapshot() {
2117        // A LocalityProbe is polled on every Tick; its samples
2118        // land in MeshOsState::rtt; the snapshot's peers map
2119        // surfaces them.
2120        struct FixedProbe(Vec<(u64, std::time::Duration)>);
2121        impl super::super::probes::LocalityProbe for FixedProbe {
2122            fn rtt_samples(&self) -> Vec<(u64, std::time::Duration)> {
2123                self.0.clone()
2124            }
2125        }
2126
2127        let registry = ProbeRegistry::new();
2128        registry.add_locality_probe(std::sync::Arc::new(FixedProbe(vec![
2129            (10, std::time::Duration::from_millis(33)),
2130            (11, std::time::Duration::from_millis(150)),
2131        ])));
2132        let MeshOsLoopParts {
2133            mesh_loop: loop_,
2134            handle,
2135            actions_rx: _,
2136            reader,
2137        } = MeshOsLoop::new(fast_test_config());
2138        let loop_ = loop_.with_probe_registry(registry);
2139        let task = tokio::spawn(loop_.run());
2140
2141        tokio::time::sleep(StdDuration::from_millis(80)).await;
2142        let snap = reader.read();
2143        // Peer 10 surfaces with 33 ms.
2144        let p10 = snap.peers.get(&10).expect("peer 10 in snapshot");
2145        assert_eq!(p10.rtt_ms, Some(33));
2146        let p11 = snap.peers.get(&11).expect("peer 11 in snapshot");
2147        assert_eq!(p11.rtt_ms, Some(150));
2148
2149        handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2150        let _ = task.await;
2151    }
2152
2153    #[tokio::test]
2154    async fn health_probe_samples_fold_into_actual_health_via_snapshot() {
2155        struct FixedProbe(Vec<(u64, super::super::event::NodeHealth)>);
2156        impl super::super::probes::HealthProbe for FixedProbe {
2157            fn health_samples(&self) -> Vec<(u64, super::super::event::NodeHealth)> {
2158                self.0.clone()
2159            }
2160        }
2161
2162        let registry = ProbeRegistry::new();
2163        registry.add_health_probe(std::sync::Arc::new(FixedProbe(vec![(
2164            5,
2165            super::super::event::NodeHealth::Unreachable,
2166        )])));
2167        let MeshOsLoopParts {
2168            mesh_loop: loop_,
2169            handle,
2170            actions_rx: _,
2171            reader,
2172        } = MeshOsLoop::new(fast_test_config());
2173        let loop_ = loop_.with_probe_registry(registry);
2174        let task = tokio::spawn(loop_.run());
2175
2176        tokio::time::sleep(StdDuration::from_millis(80)).await;
2177        let snap = reader.read();
2178        let p5 = snap.peers.get(&5).expect("peer 5 in snapshot");
2179        // Wire form differs from the enum form but the
2180        // discriminator matches.
2181        assert!(matches!(
2182            p5.health,
2183            Some(super::super::snapshot::PeerHealthSnapshot::Unreachable)
2184        ));
2185
2186        handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2187        let _ = task.await;
2188    }
2189
2190    #[tokio::test]
2191    async fn probe_registry_attached_post_construction_is_polled_on_next_tick() {
2192        // The runtime / production pattern: construct the loop,
2193        // pass a shared registry through `with_probe_registry`,
2194        // THEN install probes on the registry. The next Tick
2195        // picks them up because both ends share the same Arc.
2196        struct FixedProbe;
2197        impl super::super::probes::LocalityProbe for FixedProbe {
2198            fn rtt_samples(&self) -> Vec<(u64, std::time::Duration)> {
2199                vec![(99, std::time::Duration::from_millis(7))]
2200            }
2201        }
2202
2203        let registry = ProbeRegistry::new();
2204        let MeshOsLoopParts {
2205            mesh_loop: loop_,
2206            handle,
2207            actions_rx: _,
2208            reader,
2209        } = MeshOsLoop::new(fast_test_config());
2210        let loop_ = loop_.with_probe_registry(registry.clone());
2211        let task = tokio::spawn(loop_.run());
2212
2213        // Add the probe AFTER spawning the loop — the shared
2214        // Arc means the loop sees it on the next Tick.
2215        registry.add_locality_probe(std::sync::Arc::new(FixedProbe));
2216        tokio::time::sleep(StdDuration::from_millis(80)).await;
2217
2218        let snap = reader.read();
2219        let p99 = snap.peers.get(&99).expect("peer 99 in snapshot");
2220        assert_eq!(p99.rtt_ms, Some(7));
2221
2222        handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2223        let _ = task.await;
2224    }
2225
2226    #[tokio::test]
2227    async fn snapshot_reader_returns_updated_state_after_each_tick() {
2228        // The reader should reflect the most recent
2229        // post-reconcile snapshot. Fire some events that change
2230        // state, let ticks fire, sample the reader.
2231        let MeshOsLoopParts {
2232            mesh_loop: loop_,
2233            handle,
2234            actions_rx: _,
2235            reader,
2236        } = MeshOsLoop::new(fast_test_config());
2237        let task = tokio::spawn(loop_.run());
2238
2239        // Add a replica observation.
2240        handle
2241            .publish(MeshOsEvent::ReplicaUpdate(ReplicaUpdate::Added {
2242                chain: 0xC0FFEE,
2243                holder: 7,
2244            }))
2245            .await
2246            .unwrap();
2247        // Give ticks time to fire + reconcile + publish.
2248        tokio::time::sleep(StdDuration::from_millis(100)).await;
2249
2250        let snap = reader.read();
2251        let entry = snap
2252            .replicas
2253            .get(&0xC0FFEE)
2254            .expect("snapshot should carry the replica");
2255        assert_eq!(entry.holders, vec![7]);
2256
2257        handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2258        let _ = task.await;
2259    }
2260
2261    #[tokio::test]
2262    async fn snapshot_reader_is_cloneable_and_sees_same_state() {
2263        let MeshOsLoopParts {
2264            mesh_loop: loop_,
2265            handle,
2266            actions_rx: _,
2267            reader: reader_a,
2268        } = MeshOsLoop::new(fast_test_config());
2269        let reader_b = reader_a.clone();
2270        let task = tokio::spawn(loop_.run());
2271
2272        handle
2273            .publish(MeshOsEvent::ReplicaUpdate(ReplicaUpdate::Added {
2274                chain: 1,
2275                holder: 1,
2276            }))
2277            .await
2278            .unwrap();
2279        tokio::time::sleep(StdDuration::from_millis(100)).await;
2280
2281        let snap_a = reader_a.read();
2282        let snap_b = reader_b.read();
2283        assert_eq!(snap_a, snap_b);
2284
2285        handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2286        let _ = task.await;
2287    }
2288
2289    #[tokio::test]
2290    async fn loop_drains_event_burst_without_panicking() {
2291        // Smoke test: the loop accepts a burst of arbitrary
2292        // events and exits cleanly when shutdown is published.
2293        // The fold-side ordering property is asserted directly
2294        // on `MeshOsState::apply` in `state::tests` — that
2295        // covers the substantive ordering guarantee without
2296        // having to crack open the consumed-loop's state.
2297        let MeshOsLoopParts {
2298            mesh_loop: loop_,
2299            handle,
2300            actions_rx: _,
2301            reader: _,
2302        } = MeshOsLoop::new(fast_test_config());
2303
2304        let chain: ChainId = 0xC0FFEE;
2305        let probe = tokio::spawn(async move {
2306            handle
2307                .publish(MeshOsEvent::ReplicaUpdate(ReplicaUpdate::Added {
2308                    chain,
2309                    holder: 11,
2310                }))
2311                .await
2312                .unwrap();
2313            handle
2314                .publish(MeshOsEvent::ReplicaUpdate(ReplicaUpdate::Added {
2315                    chain,
2316                    holder: 12,
2317                }))
2318                .await
2319                .unwrap();
2320            handle
2321                .publish(MeshOsEvent::ReplicaUpdate(ReplicaUpdate::Removed {
2322                    chain,
2323                    holder: 11,
2324                }))
2325                .await
2326                .unwrap();
2327            handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2328        });
2329
2330        let task = tokio::spawn(loop_.run());
2331        probe.await.expect("publisher panicked");
2332        let _count = tokio::time::timeout(StdDuration::from_millis(200), task)
2333            .await
2334            .expect("loop did not exit")
2335            .expect("join");
2336    }
2337
2338    #[test]
2339    fn loop_construction_returns_handle_and_actions_receiver() {
2340        // Compile + type-check: `new` returns the triple, the
2341        // handle is cloneable, the actions receiver is the
2342        // documented type.
2343        let MeshOsLoopParts {
2344            mesh_loop: _loop_,
2345            handle,
2346            actions_rx: _actions_rx,
2347            reader: _,
2348        } = MeshOsLoop::new(MeshOsConfig::default());
2349        let _clone = handle.clone();
2350    }
2351
2352    #[tokio::test]
2353    async fn try_publish_surfaces_queue_full_under_saturation() {
2354        // Capacity-1 channel, loop not yet running — second
2355        // try_publish should hit QueueFull rather than block.
2356        let cfg = MeshOsConfig {
2357            event_queue_capacity: 1,
2358            ..fast_test_config()
2359        };
2360        let MeshOsLoopParts {
2361            mesh_loop: loop_,
2362            handle,
2363            actions_rx: _,
2364            reader: _,
2365        } = MeshOsLoop::new(cfg);
2366        handle.try_publish(MeshOsEvent::Tick).unwrap();
2367        match handle.try_publish(MeshOsEvent::Tick) {
2368            Err(MeshOsHandleError::QueueFull) => {}
2369            other => panic!("expected QueueFull, got {other:?}"),
2370        }
2371        drop(handle);
2372        // Drain so the loop exits.
2373        let _ = loop_.run().await;
2374    }
2375
2376    #[tokio::test]
2377    async fn shutdown_event_short_circuits_pending_events_after_it() {
2378        // Per the loop contract: Shutdown breaks the loop the
2379        // moment it's dequeued. Events queued behind Shutdown
2380        // must NOT mutate state — the post-Shutdown
2381        // ReplicaUpdate below should leave the snapshot's
2382        // replica fold empty. Use a long tick interval so the
2383        // reconcile arm doesn't race the assertion.
2384        let cfg = MeshOsConfig {
2385            tick_interval: StdDuration::from_secs(60),
2386            ..fast_test_config()
2387        };
2388        let MeshOsLoopParts {
2389            mesh_loop: loop_,
2390            handle,
2391            actions_rx: _,
2392            reader,
2393        } = MeshOsLoop::new(cfg);
2394        handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2395        // Post-Shutdown event: enqueued behind Shutdown. The
2396        // loop must NOT apply it before exiting.
2397        handle
2398            .publish(MeshOsEvent::ReplicaUpdate(ReplicaUpdate::Added {
2399                chain: 1,
2400                holder: 1,
2401            }))
2402            .await
2403            .unwrap();
2404        let count = tokio::time::timeout(StdDuration::from_secs(1), loop_.run())
2405            .await
2406            .expect("loop did not exit on Shutdown");
2407        // Reconcile never fired (Shutdown short-circuits the
2408        // loop before any tick).
2409        assert_eq!(count, 0, "Shutdown must break before reconcile fires");
2410        // The post-Shutdown ReplicaUpdate must NOT have applied —
2411        // the snapshot's replica fold stays empty.
2412        let snap = reader.read();
2413        assert!(
2414            snap.replicas.is_empty(),
2415            "post-Shutdown ReplicaUpdate must not enter the fold; saw {} entries",
2416            snap.replicas.len(),
2417        );
2418    }
2419
2420    #[tokio::test]
2421    async fn reconcile_emitted_at_anchored_on_last_tick_for_replay_determinism() {
2422        // Reconcile must stamp every PendingAction.emitted_at on
2423        // the same Instant the Tick fold wrote into
2424        // actual.last_tick — not on a fresh Instant::now(). The
2425        // snapshot derives `age_ms` from
2426        // `now.saturating_duration_since(emitted_at)` against the
2427        // same anchor, so an action emitted in the latest tick
2428        // renders with age_ms == 0.
2429        //
2430        // We use a long tick interval and shut down right after
2431        // the first tick fires so the read sees actions whose
2432        // emitted_at matches the last_tick the snapshot is built
2433        // against.
2434        use super::super::event::AdminEvent;
2435        let cfg = MeshOsConfig {
2436            tick_interval: StdDuration::from_millis(80),
2437            ..fast_test_config()
2438        };
2439        let MeshOsLoopParts {
2440            mesh_loop: loop_,
2441            handle,
2442            // Keep the receiver alive so the reconciler's try_send
2443            // succeeds and the action lands in recent_emissions —
2444            // pre-fix recent_emissions populated regardless of
2445            // try_send outcome (an actions_tx.Closed would still
2446            // mark the action as "emitted" in the snapshot).
2447            actions_rx: _actions_rx,
2448            reader,
2449        } = MeshOsLoop::new(cfg);
2450        let task = tokio::spawn(loop_.run());
2451        // Drive an EnterMaintenance — reconcile emits a
2452        // CommitMaintenanceTransition that lands in the
2453        // snapshot's `pending` mirror.
2454        handle
2455            .publish(MeshOsEvent::AdminEvent(AdminEvent::EnterMaintenance {
2456                node: 1,
2457                drain_for: None,
2458            }))
2459            .await
2460            .unwrap();
2461        // Wait long enough for ONE tick to fire (80 ms), then
2462        // shutdown before a second tick.
2463        tokio::time::sleep(StdDuration::from_millis(100)).await;
2464        handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2465        let _ = task.await;
2466        let snap = reader.read();
2467        assert!(
2468            !snap.recently_emitted.is_empty(),
2469            "expected at least one recently-emitted action; saw none",
2470        );
2471        // The latest tick anchors the snapshot's now. Actions
2472        // emitted in that tick render with age_ms == 0 only when
2473        // reconcile uses last_tick for emitted_at.
2474        let zero_age_count = snap
2475            .recently_emitted
2476            .iter()
2477            .filter(|p| p.age_ms == 0)
2478            .count();
2479        assert!(
2480            zero_age_count >= 1,
2481            "expected at least one action emitted in the snapshot's tick to render \
2482             age_ms == 0 (anchored on last_tick); recently_emitted = {:?}",
2483            snap.recently_emitted,
2484        );
2485    }
2486
2487    #[tokio::test]
2488    async fn log_chain_appender_receives_every_published_log_line() {
2489        use super::super::log_chain::BufferingLogChainAppender;
2490        use super::super::logs::{LogLevel, LogLine};
2491        let appender = Arc::new(BufferingLogChainAppender::default());
2492        let MeshOsLoopParts {
2493            mesh_loop: loop_,
2494            handle,
2495            actions_rx: _,
2496            reader,
2497        } = MeshOsLoop::new(fast_test_config());
2498        let loop_ = loop_.with_log_appender(
2499            appender.clone() as Arc<dyn super::super::log_chain::LogChainAppender>
2500        );
2501        let task = tokio::spawn(loop_.run());
2502
2503        for (i, level) in [LogLevel::Info, LogLevel::Warn, LogLevel::Error]
2504            .into_iter()
2505            .enumerate()
2506        {
2507            handle
2508                .publish(MeshOsEvent::LogLine(LogLine {
2509                    level,
2510                    daemon_id: Some(7),
2511                    message: format!("msg {i}"),
2512                }))
2513                .await
2514                .unwrap();
2515        }
2516        tokio::time::sleep(StdDuration::from_millis(60)).await;
2517
2518        let captured = appender.captured();
2519        assert_eq!(captured.len(), 3, "appender should see three records");
2520        let snap = reader.read();
2521        assert_eq!(snap.log_ring.len(), 3, "ring should hold three records");
2522        // Appender + ring see the SAME records (seq + content match
2523        // for each).
2524        for (i, captured_record) in captured.iter().enumerate().take(3) {
2525            assert_eq!(snap.log_ring[i].seq, captured_record.seq);
2526            assert_eq!(snap.log_ring[i].message, captured_record.message);
2527        }
2528
2529        drop(handle);
2530        let _ = tokio::time::timeout(StdDuration::from_secs(1), task).await;
2531    }
2532
2533    #[tokio::test]
2534    async fn admin_audit_chain_appender_receives_every_recorded_commit() {
2535        // Wire a buffering appender into the loop; drive an
2536        // admin event through; the appender + the snapshot
2537        // ring should each carry one record with the same seq.
2538        use super::super::audit_chain::BufferingAdminAuditChainAppender;
2539        use super::super::event::AdminEvent;
2540        let appender = Arc::new(BufferingAdminAuditChainAppender::default());
2541        let MeshOsLoopParts {
2542            mesh_loop: loop_,
2543            handle,
2544            actions_rx: _,
2545            reader,
2546        } = MeshOsLoop::new(fast_test_config());
2547        let loop_ = loop_.with_admin_audit_appender(
2548            appender.clone() as Arc<dyn super::super::audit_chain::AdminAuditChainAppender>
2549        );
2550        let task = tokio::spawn(loop_.run());
2551
2552        // Publish an unsigned admin event — fold records it on
2553        // the ring with Unverified outcome AND fires the
2554        // appender.
2555        handle
2556            .publish(MeshOsEvent::AdminEvent(AdminEvent::Cordon { node: 42 }))
2557            .await
2558            .unwrap();
2559        tokio::time::sleep(StdDuration::from_millis(60)).await;
2560
2561        let captured = appender.captured();
2562        assert_eq!(captured.len(), 1, "appender should see one record");
2563        assert_eq!(captured[0].event, AdminEvent::Cordon { node: 42 });
2564
2565        let snap = reader.read();
2566        assert_eq!(snap.admin_audit.len(), 1, "ring should hold one record");
2567        // The appender + ring see the SAME record (seq + content match).
2568        assert_eq!(snap.admin_audit[0].seq, captured[0].seq);
2569        // Successful append → ring entry has chain_pending == false.
2570        assert!(
2571            !snap.admin_audit[0].chain_pending,
2572            "ring entry must NOT be marked chain_pending when the appender succeeded"
2573        );
2574
2575        // Tidy.
2576        drop(handle);
2577        let _ = tokio::time::timeout(StdDuration::from_secs(1), task).await;
2578    }
2579
2580    /// Pin that ring-first + mark-chain_pending kicks in when the
2581    /// audit chain appender returns an error: ring still records
2582    /// the entry but flags it so chain consumers can distinguish
2583    /// "permanent gap" from "haven't replicated yet."
2584    #[tokio::test]
2585    async fn audit_ring_flags_chain_pending_on_appender_failure() {
2586        use super::super::audit_chain::{AdminAuditAppendError, AdminAuditChainAppender};
2587        use super::super::event::AdminEvent;
2588        use super::super::ice::AdminAuditRecord;
2589
2590        struct FailingAuditAppender;
2591        impl AdminAuditChainAppender for FailingAuditAppender {
2592            fn append(&self, _record: &AdminAuditRecord) -> Result<(), AdminAuditAppendError> {
2593                Err(AdminAuditAppendError {
2594                    reason: "test-injected appender failure".into(),
2595                })
2596            }
2597        }
2598
2599        let MeshOsLoopParts {
2600            mesh_loop: loop_,
2601            handle,
2602            actions_rx: _,
2603            reader,
2604        } = MeshOsLoop::new(fast_test_config());
2605        let loop_ = loop_.with_admin_audit_appender(Arc::new(FailingAuditAppender));
2606        let task = tokio::spawn(loop_.run());
2607
2608        handle
2609            .publish(MeshOsEvent::AdminEvent(AdminEvent::Cordon { node: 99 }))
2610            .await
2611            .unwrap();
2612        tokio::time::sleep(StdDuration::from_millis(60)).await;
2613
2614        let snap = reader.read();
2615        assert_eq!(
2616            snap.admin_audit.len(),
2617            1,
2618            "ring must still record the entry"
2619        );
2620        assert!(
2621            snap.admin_audit[0].chain_pending,
2622            "ring entry must be flagged chain_pending after the appender returned Err"
2623        );
2624
2625        drop(handle);
2626        let _ = tokio::time::timeout(StdDuration::from_secs(1), task).await;
2627    }
2628
2629    #[tokio::test]
2630    async fn kill_migration_dispatches_to_installed_aborter() {
2631        use super::super::event::AdminEvent;
2632        use super::super::migration_aborter::{BufferingMigrationAborter, MigrationAborter};
2633        let aborter = Arc::new(BufferingMigrationAborter::default());
2634        let MeshOsLoopParts {
2635            mesh_loop: loop_,
2636            handle,
2637            actions_rx: _,
2638            reader: _,
2639        } = MeshOsLoop::new(fast_test_config());
2640        let loop_ = loop_.with_migration_aborter(aborter.clone() as Arc<dyn MigrationAborter>);
2641        let task = tokio::spawn(loop_.run());
2642
2643        handle
2644            .publish(MeshOsEvent::AdminEvent(AdminEvent::KillMigration {
2645                migration: 0xCAFE,
2646            }))
2647            .await
2648            .unwrap();
2649        tokio::time::sleep(StdDuration::from_millis(60)).await;
2650
2651        assert_eq!(aborter.captured(), vec![0xCAFE]);
2652
2653        drop(handle);
2654        let _ = tokio::time::timeout(StdDuration::from_secs(1), task).await;
2655    }
2656
2657    #[tokio::test]
2658    async fn non_kill_admin_events_do_not_invoke_aborter() {
2659        use super::super::event::AdminEvent;
2660        use super::super::migration_aborter::{BufferingMigrationAborter, MigrationAborter};
2661        let aborter = Arc::new(BufferingMigrationAborter::default());
2662        let MeshOsLoopParts {
2663            mesh_loop: loop_,
2664            handle,
2665            actions_rx: _,
2666            reader: _,
2667        } = MeshOsLoop::new(fast_test_config());
2668        let loop_ = loop_.with_migration_aborter(aborter.clone() as Arc<dyn MigrationAborter>);
2669        let task = tokio::spawn(loop_.run());
2670
2671        handle
2672            .publish(MeshOsEvent::AdminEvent(AdminEvent::Cordon { node: 42 }))
2673            .await
2674            .unwrap();
2675        tokio::time::sleep(StdDuration::from_millis(60)).await;
2676
2677        assert!(aborter.captured().is_empty());
2678
2679        drop(handle);
2680        let _ = tokio::time::timeout(StdDuration::from_secs(1), task).await;
2681    }
2682
2683    /// `clear_inventory_probes` must drop every installed
2684    /// inventory probe (and only those) so callers swapping
2685    /// probe sources mid-flight can detach the previous set
2686    /// before installing replacements. Without this, a stale
2687    /// probe left in the append-only registry can stomp the
2688    /// new one's samples via last-writer-wins per peer.
2689    #[test]
2690    fn clear_inventory_probes_drops_installed_probes_only() {
2691        struct DummyInventoryProbe;
2692        impl super::super::probes::InventoryProbe for DummyInventoryProbe {
2693            fn inventory_samples(&self) -> Vec<(u64, super::super::probes::PeerInventory)> {
2694                vec![]
2695            }
2696        }
2697        struct DummyLocalityProbe;
2698        impl super::super::probes::LocalityProbe for DummyLocalityProbe {
2699            fn rtt_samples(&self) -> Vec<(u64, StdDuration)> {
2700                vec![]
2701            }
2702        }
2703        struct DummyHealthProbe;
2704        impl super::super::probes::HealthProbe for DummyHealthProbe {
2705            fn health_samples(&self) -> Vec<(u64, super::super::event::NodeHealth)> {
2706                vec![]
2707            }
2708        }
2709
2710        let reg = ProbeRegistry::new();
2711        reg.add_locality_probe(Arc::new(DummyLocalityProbe));
2712        reg.add_health_probe(Arc::new(DummyHealthProbe));
2713        reg.add_inventory_probe(Arc::new(DummyInventoryProbe));
2714        reg.add_inventory_probe(Arc::new(DummyInventoryProbe));
2715        assert_eq!(reg.probe_counts(), (1, 1, 2));
2716
2717        reg.clear_inventory_probes();
2718        assert_eq!(
2719            reg.probe_counts(),
2720            (1, 1, 0),
2721            "inventory cleared; locality + health untouched"
2722        );
2723
2724        // The other clear paths are symmetric — verify they
2725        // also touch only their own list.
2726        reg.clear_locality_probes();
2727        assert_eq!(reg.probe_counts(), (0, 1, 0));
2728        reg.clear_health_probes();
2729        assert_eq!(reg.probe_counts(), (0, 0, 0));
2730
2731        // Post-clear, re-install must succeed and count
2732        // independently — the underlying Vec wasn't replaced
2733        // with something half-broken.
2734        reg.add_inventory_probe(Arc::new(DummyInventoryProbe));
2735        assert_eq!(reg.probe_counts(), (0, 0, 1));
2736    }
2737}