net/adapter/net/behavior/meshos/event_loop.rs
1//! [`MeshOsLoop`] — the canonical event loop. Locked decision #1:
2//! one stream → one reconcile → consistent actions. Locked
3//! decision #4: reconcile emits, the action executor drains.
4//!
5//! Phase A wires the plumbing. The loop owns an mpsc receiver
6//! that consumes [`super::event::MeshOsEvent`]s from arbitrary
7//! sources, folds each event into [`super::state::MeshOsState`]
8//! (and routes desired-state input into
9//! [`super::state::DesiredState`]), runs
10//! [`super::reconcile::reconcile`] at most once per
11//! [`super::event::MeshOsEvent::Tick`], and pushes any emitted
12//! actions through an mpsc sender that the action executor will
13//! drain. Phase A's reconcile is a no-op so the executor sees an
14//! empty queue under steady state.
15//!
16//! Sources fan-in via converters owned by their subsystems —
17//! none ship in Phase A. Tests drive events directly through the
18//! source channel to exercise the ordering contract.
19
20use std::collections::VecDeque;
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::sync::Arc;
23
24use arc_swap::ArcSwap;
25use parking_lot::RwLock;
26use tokio::sync::mpsc;
27use tokio::time::{interval_at, Instant as TokioInstant, MissedTickBehavior};
28
29use super::action::{AllocateActionId, PendingAction};
30use super::config::MeshOsConfig;
31use super::control::{ControlSink, MeshOsControl};
32use super::event::MeshOsEvent;
33use super::maintenance::MaintenanceState;
34use super::probes::{HealthProbe, LocalityProbe};
35use super::reconcile::reconcile;
36use super::scheduler::SchedulerRegistry;
37use super::snapshot::{FailureRecord, MeshOsSnapshot};
38use super::state::{DesiredState, MeshOsState};
39
40/// Per-node MeshOS instance. Owns the actual + desired state
41/// folds, the event-source channel, and the action-executor
42/// channel. Cloneable handles (`MeshOsHandle`) hand out
43/// `mpsc::Sender<MeshOsEvent>` clones for sources to publish on;
44/// `MeshOsLoop::run` is the long-lived task.
45pub struct MeshOsLoop {
46 config: Arc<MeshOsConfig>,
47
48 /// Inbound event stream. The loop owns the receiver; sender
49 /// clones live on every [`MeshOsHandle`]. When the last
50 /// handle drops, `recv()` returns `None` and the loop exits.
51 events_rx: mpsc::Receiver<MeshOsEvent>,
52
53 /// Outbound action queue. The action executor task drains
54 /// this; Phase A drains and drops (no real subsystem
55 /// dispatch yet).
56 actions_tx: mpsc::Sender<PendingAction>,
57
58 /// Action id allocator.
59 action_ids: AllocateActionId,
60
61 /// Folded substrate state.
62 actual: MeshOsState,
63 /// Folded desired state (placement intent + future
64 /// daemon-intent feeds).
65 desired: DesiredState,
66
67 /// Most recent post-reconcile snapshot. Updated on every
68 /// Tick after the reconcile pass; readable through
69 /// [`MeshOsSnapshotReader::read`] from any other task /
70 /// thread. `ArcSwap` keeps the publish path one atomic
71 /// pointer store and the read path one atomic load + Arc
72 /// clone — no lock contention with reconcile.
73 snapshot: Arc<ArcSwap<MeshOsSnapshot>>,
74
75 /// Ring of the most-recently-emitted actions. The snapshot's
76 /// `pending` field renders this as "what reconcile recently
77 /// produced." Bounded by `action_queue_capacity`; older
78 /// entries drop FIFO when the cap is exceeded. The executor
79 /// does NOT signal completion back, so this is *not* a true
80 /// in-flight list — drained / completed actions stay in the
81 /// ring until they age out.
82 recent_emissions: Vec<PendingAction>,
83
84 /// Pull-via-tick probes the loop polls on each Tick. Shared
85 /// with the [`ProbeRegistry`] so consumers can install
86 /// probes after `MeshOsLoop::new` (the runtime in particular
87 /// spawns the loop immediately and attaches probes via the
88 /// registry).
89 probes: ProbeRegistryInner,
90
91 /// Phase D-1 scheduler registry — single-slot pluggable
92 /// [`super::scheduler::PlacementScorer`]. Shared via Arc;
93 /// install via [`SchedulerRegistry::install`] before or
94 /// after `run()`.
95 scheduler: SchedulerRegistry,
96
97 /// X-13 recovery registry — pluggable per-tick recovery
98 /// handlers for groups whose slots were marked unhealthy
99 /// after a placement failure. The tick handler calls
100 /// `try_run_all` after `poll_probes` so the very next
101 /// reconcile pass sees the recovered slots. Empty by default;
102 /// SDK consumers register handlers via
103 /// `MeshOsRuntime::recovery_registry()`.
104 recovery_registry: crate::adapter::net::compute::RecoveryRegistry,
105
106 /// Reconcile-pass counter — used by tests / diagnostics to
107 /// confirm reconcile fired exactly once per Tick.
108 reconcile_count: u64,
109
110 /// Monotonic counter the loop stamps onto every
111 /// `AdminAuditRecord` it emits. Strictly increasing across
112 /// the runtime's lifetime; the Deck SDK's audit-tail
113 /// stream depends on this for dedup across snapshot polls.
114 admin_audit_seq: u64,
115
116 /// Monotonic counter the loop stamps onto every
117 /// `LogRecord` it emits. Same pattern as
118 /// `admin_audit_seq`; the Deck SDK's log-tail stream
119 /// dedups against this.
120 log_seq: u64,
121 /// Boot-time identifier this loop stamps on every
122 /// published snapshot via
123 /// [`MeshOsSnapshot::runtime_epoch_id`]. SDK consumers
124 /// dedup'ing via `seq` values pair each watermark with this
125 /// value — when the snapshot's epoch flips, they reset.
126 runtime_epoch_id: u64,
127 /// Ring buffer of admin-commit outcomes — every admin
128 /// commit the loop observes (signed ICE bundle or unsigned
129 /// `MeshOsEvent::AdminEvent(...)`) lands here, regardless
130 /// of whether a verifier accepted, rejected, or skipped
131 /// it. Bounded to
132 /// [`super::ice::DEFAULT_MAX_ADMIN_AUDIT_RECORDS`]; the
133 /// loop drops the oldest entry FIFO when the cap is
134 /// exceeded. Lives on the loop rather than `MeshOsState`
135 /// because it's an append-only output buffer, not fold
136 /// state — `state.rs` had explicit dead arms for
137 /// `SignedIceCommit` / `SignedAdminCommit` precisely
138 /// because they don't fold.
139 admin_audit_ring: VecDeque<super::ice::AdminAuditRecord>,
140 /// Ring buffer of log records — every
141 /// `MeshOsEvent::LogLine` daemons or source converters
142 /// publish lands here. Bounded to
143 /// [`super::logs::DEFAULT_MAX_LOG_RING_RECORDS`]; older
144 /// entries drop FIFO. Lives on the loop rather than
145 /// `MeshOsState` because log lines don't fold.
146 log_ring: VecDeque<super::logs::LogRecord>,
147
148 /// Actions reconcile emitted that the action-queue
149 /// `try_send` rejected because the executor was at
150 /// `action_queue_capacity`. Cloneable counter — the runtime
151 /// surfaces it through [`MeshOsRuntime::executor_stats`] for
152 /// operator visibility.
153 dropped_actions: Arc<AtomicU64>,
154 /// Shared reference to the executor's recent-failures ring.
155 /// The executor task writes; the loop reads on every
156 /// `publish_snapshot` and copies the records into the
157 /// snapshot's `recent_failures` field. Optional so a loop
158 /// constructed without an executor (e.g. unit tests) still
159 /// publishes (with an empty ring).
160 executor_failures: Option<Arc<RwLock<VecDeque<FailureRecord>>>>,
161 /// Shared failure-seq counter — same counter the executor
162 /// uses. The loop stamps its own runtime-side failures
163 /// (e.g. migration-abort dispatcher errors) with this so
164 /// SDK consumers' dedup gate sees a single monotonic
165 /// sequence across both writers.
166 executor_failure_seq: Option<Arc<AtomicU64>>,
167 /// Shared failure-chain appender — same instance the
168 /// executor uses. The loop dual-writes runtime-side
169 /// failures to the durable chain via this so chain
170 /// history covers loop-side failures too.
171 executor_failure_appender: Option<Arc<dyn super::failure_chain::FailureChainAppender>>,
172 /// Optional control-event sink. When attached, the loop
173 /// translates this-node `local_maintenance` transitions and
174 /// future backpressure flips into [`MeshOsControl`] events
175 /// and forwards them through the sink. The SDK installs one
176 /// that fans events out to per-daemon channels via its
177 /// router; substrate-internal uses can ignore.
178 control_sink: Option<Arc<dyn ControlSink>>,
179 /// Discriminant of the most recent `local_maintenance` value
180 /// the loop observed. Updated on every `apply()` call so
181 /// state transitions caused by either an admin event or a
182 /// `MaintenanceTransitionObserved` (chain replay) surface
183 /// uniformly.
184 last_local_maintenance: MaintenanceDiscriminant,
185 /// Optional ICE-commit verifier. When installed, every
186 /// `MeshOsEvent::SignedIceCommit` is run through this gate
187 /// before the inner `AdminEvent` folds into state. When
188 /// absent, signed commits fold their inner event without
189 /// verification — useful for in-process tests where the
190 /// SDK side already gates. The substrate slice that ships
191 /// the operator-policy chain wires a registry in by default.
192 admin_verifier: Option<Arc<super::ice::AdminVerifier>>,
193
194 /// Admin audit chain appender. Production deployments
195 /// wire a `TypedRedexFile<AdminAuditRecord>` here so
196 /// security review can replay every admin commit across
197 /// the cluster's lifetime. Default is
198 /// `NoOpAdminAuditChainAppender` — the in-memory ring on
199 /// `MeshOsState.admin_audit` is the only readable surface
200 /// when no chain is wired.
201 admin_audit_appender: Arc<dyn super::audit_chain::AdminAuditChainAppender>,
202
203 /// Log chain appender. Production deployments wire a
204 /// `TypedRedexFile<LogRecord>` here so the per-daemon log
205 /// view extends to cluster-lifetime replay. Default is
206 /// `NoOpLogChainAppender` — only the in-memory ring on
207 /// `MeshOsState.log_ring` is observable when no chain is
208 /// wired.
209 log_appender: Arc<dyn super::log_chain::LogChainAppender>,
210
211 /// Migration-abort dispatcher. The loop calls this after
212 /// folding a verified
213 /// [`super::event::AdminEvent::KillMigration`]. Production
214 /// deployments wire the
215 /// [`super::migration_aborter::OrchestratorMigrationAborter`]
216 /// adapter so the local `MigrationOrchestrator` actually
217 /// aborts in-flight migrations; tests + bootstrap use the
218 /// `NoOpMigrationAborter` default and the commit remains
219 /// audit-only.
220 migration_aborter: Arc<dyn super::migration_aborter::MigrationAborter>,
221
222 /// Migration-snapshot source. The loop calls this on every
223 /// snapshot publish and embeds the result in the
224 /// snapshot's `in_flight_migrations` field, so the ICE
225 /// simulator can enumerate which daemon a `KillMigration`
226 /// target would affect. Production deployments wire the
227 /// [`super::migration_snapshot_source::OrchestratorMigrationSnapshotSource`]
228 /// adapter; bootstrap + tests use the no-op default and
229 /// the snapshot's `in_flight_migrations` reads empty.
230 migration_snapshot_source: Arc<dyn super::migration_snapshot_source::MigrationSnapshotSource>,
231}
232
233/// Cheap-to-compare snapshot of [`MaintenanceState`]'s variant
234/// (no embedded `Instant` or `String`). Used by the loop to
235/// detect transitions without holding the full state value.
236#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
237enum MaintenanceDiscriminant {
238 #[default]
239 Active,
240 EnteringMaintenance,
241 Maintenance,
242 ExitingMaintenance,
243 DrainFailed,
244 Recovery,
245}
246
247impl MaintenanceDiscriminant {
248 fn from_state(state: &MaintenanceState) -> Self {
249 match state {
250 MaintenanceState::Active => Self::Active,
251 MaintenanceState::EnteringMaintenance { .. } => Self::EnteringMaintenance,
252 MaintenanceState::Maintenance { .. } => Self::Maintenance,
253 MaintenanceState::ExitingMaintenance { .. } => Self::ExitingMaintenance,
254 MaintenanceState::DrainFailed { .. } => Self::DrainFailed,
255 MaintenanceState::Recovery { .. } => Self::Recovery,
256 }
257 }
258}
259
260/// Inner shared cell — both probe lists behind one
261/// `Arc<RwLock>` so the runtime + loop see the same set AND
262/// [`ProbeRegistry::probe_counts`] is an atomic snapshot. The
263/// install path is rare; the per-tick poll already clones the
264/// lists out under the read lock, so the single-lock pattern
265/// costs nothing on the hot path.
266#[derive(Default)]
267struct ProbeListsInner {
268 locality: Vec<Arc<dyn LocalityProbe>>,
269 health: Vec<Arc<dyn HealthProbe>>,
270 inventory: Vec<Arc<dyn super::probes::InventoryProbe>>,
271}
272
273#[derive(Clone, Default)]
274struct ProbeRegistryInner {
275 lists: Arc<parking_lot::RwLock<ProbeListsInner>>,
276}
277
278/// External handle for attaching probes to the loop after
279/// `MeshOsLoop::new` has consumed the loop (e.g. the runtime
280/// spawned it as a tokio task). Clone-shared with the loop.
281#[derive(Clone, Default)]
282pub struct ProbeRegistry {
283 inner: ProbeRegistryInner,
284}
285
286impl ProbeRegistry {
287 /// Construct an empty registry. The loop reads through this
288 /// instance after [`MeshOsLoop::with_probe_registry`]
289 /// installs it.
290 pub fn new() -> Self {
291 Self::default()
292 }
293
294 /// Install a [`LocalityProbe`]. Probes are polled by the
295 /// loop in registration order, once per Tick.
296 pub fn add_locality_probe(&self, probe: Arc<dyn LocalityProbe>) {
297 self.inner.lists.write().locality.push(probe);
298 }
299
300 /// Install a [`HealthProbe`]. Probes are polled by the loop
301 /// in registration order, once per Tick.
302 pub fn add_health_probe(&self, probe: Arc<dyn HealthProbe>) {
303 self.inner.lists.write().health.push(probe);
304 }
305
306 /// Install an [`super::probes::InventoryProbe`]. Probes are
307 /// polled by the loop in registration order, once per Tick;
308 /// partial samples merge into per-peer state (later probes
309 /// overwrite earlier on the same peer + axis).
310 pub fn add_inventory_probe(&self, probe: Arc<dyn super::probes::InventoryProbe>) {
311 self.inner.lists.write().inventory.push(probe);
312 }
313
314 /// Atomic snapshot of installed probe counts —
315 /// `(locality, health, inventory)`. One read lock covers
316 /// all three lists, so the triple is consistent even if a
317 /// concurrent installer fires between them.
318 pub fn probe_counts(&self) -> (usize, usize, usize) {
319 let guard = self.inner.lists.read();
320 (
321 guard.locality.len(),
322 guard.health.len(),
323 guard.inventory.len(),
324 )
325 }
326
327 /// Drop every installed [`LocalityProbe`]. Long-running
328 /// runtimes that swap probe sources (test harnesses,
329 /// hot-config reloaders) would otherwise accumulate dead
330 /// probes that keep firing every Tick; this lets a caller
331 /// detach the old set before installing replacements.
332 pub fn clear_locality_probes(&self) {
333 self.inner.lists.write().locality.clear();
334 }
335
336 /// Drop every installed [`HealthProbe`]. Same rationale as
337 /// [`Self::clear_locality_probes`].
338 pub fn clear_health_probes(&self) {
339 self.inner.lists.write().health.clear();
340 }
341
342 /// Drop every installed [`super::probes::InventoryProbe`].
343 /// Same rationale as [`Self::clear_locality_probes`]. Last-
344 /// writer-wins per peer means a stale probe left installed
345 /// can stomp a live replacement's samples, so callers
346 /// swapping inventory sources should clear first.
347 pub fn clear_inventory_probes(&self) {
348 self.inner.lists.write().inventory.clear();
349 }
350}
351
352/// Read-only handle to the loop's most recent snapshot.
353/// Construction returns one of these from
354/// [`MeshOsLoop::new`]; Deck / Phase F integrations clone the
355/// handle (cheap — one Arc clone) and call
356/// [`MeshOsSnapshotReader::read`] to sample the current view
357/// without entering the loop's event stream.
358#[derive(Clone, Debug)]
359pub struct MeshOsSnapshotReader {
360 snapshot: Arc<ArcSwap<MeshOsSnapshot>>,
361}
362
363impl MeshOsSnapshotReader {
364 /// Sample the most recent post-reconcile snapshot. One
365 /// atomic load + one `Arc` clone — no lock acquisition, so
366 /// reads cannot stall the loop's publish path.
367 pub fn read(&self) -> MeshOsSnapshot {
368 (**self.snapshot.load()).clone()
369 }
370
371 /// Borrow the latest snapshot through an `Arc`. Avoids the
372 /// per-call deep clone when the caller only needs a few
373 /// fields. The returned guard pins the snapshot until
374 /// dropped — keep the borrow short.
375 pub fn load(&self) -> arc_swap::Guard<Arc<MeshOsSnapshot>> {
376 self.snapshot.load()
377 }
378}
379
380/// Cloneable handle for publishing events into the loop. Cheap
381/// to clone (just clones the `mpsc::Sender`). Drop the last
382/// handle to signal end-of-events; the loop will exit after
383/// draining its current backlog.
384#[derive(Clone, Debug)]
385pub struct MeshOsHandle {
386 events: mpsc::Sender<MeshOsEvent>,
387}
388
389impl MeshOsHandle {
390 /// Publish an event into the loop's stream. Async — backs
391 /// pressure when the source channel is at
392 /// `event_queue_capacity`. Sources that need a fire-and-
393 /// forget path can `try_send` directly on the underlying
394 /// sender via `into_sender`.
395 ///
396 /// **Note on wedge risk:** if the loop is stuck (probe
397 /// holding a sync lock, dispatcher saturated, etc.) this
398 /// future never resolves. Long-lived sources that must
399 /// remain responsive should prefer
400 /// [`Self::publish_timeout`] or [`Self::try_publish`].
401 pub async fn publish(&self, event: MeshOsEvent) -> Result<(), MeshOsHandleError> {
402 self.events
403 .send(event)
404 .await
405 .map_err(|_| MeshOsHandleError::LoopClosed)
406 }
407
408 /// Publish an event with a bounded wait. Returns
409 /// [`MeshOsHandleError::QueueFull`] if the source channel
410 /// stayed at capacity for the entire `timeout` window.
411 /// Sources that can't afford to park indefinitely on a
412 /// wedged loop should call this instead of [`Self::publish`].
413 pub async fn publish_timeout(
414 &self,
415 event: MeshOsEvent,
416 timeout: std::time::Duration,
417 ) -> Result<(), MeshOsHandleError> {
418 match tokio::time::timeout(timeout, self.events.send(event)).await {
419 Ok(Ok(())) => Ok(()),
420 Ok(Err(_)) => Err(MeshOsHandleError::LoopClosed),
421 Err(_) => Err(MeshOsHandleError::QueueFull),
422 }
423 }
424
425 /// Try to publish without awaiting. Returns
426 /// `MeshOsHandleError::QueueFull` when the source channel is
427 /// at capacity.
428 pub fn try_publish(&self, event: MeshOsEvent) -> Result<(), MeshOsHandleError> {
429 self.events.try_send(event).map_err(|e| match e {
430 mpsc::error::TrySendError::Closed(_) => MeshOsHandleError::LoopClosed,
431 mpsc::error::TrySendError::Full(_) => MeshOsHandleError::QueueFull,
432 })
433 }
434
435 /// Hand out the underlying sender for sources that need to
436 /// manage their own backpressure / batching.
437 pub fn into_sender(self) -> mpsc::Sender<MeshOsEvent> {
438 self.events
439 }
440}
441
442/// Surface-side errors from [`MeshOsHandle::publish`] /
443/// [`MeshOsHandle::try_publish`]. The loop is conservative —
444/// callers decide whether to retry, drop, or apply their own
445/// backpressure.
446#[derive(Debug, Eq, PartialEq)]
447#[non_exhaustive]
448pub enum MeshOsHandleError {
449 /// The loop has exited; further publishes will be dropped.
450 LoopClosed,
451 /// The source channel is at `event_queue_capacity`. The
452 /// caller picks: back off + retry, drop the event, or apply
453 /// source-side backpressure.
454 QueueFull,
455}
456
457/// The result of [`MeshOsLoop::new`]. Held together so callers
458/// destructure the pieces they need; future fields (metrics,
459/// chain handles) can be added without breaking the
460/// constructor signature.
461pub struct MeshOsLoopParts {
462 /// The loop itself — consume by spawning [`MeshOsLoop::run`].
463 pub mesh_loop: MeshOsLoop,
464 /// Publish handle; clone for each source converter.
465 pub handle: MeshOsHandle,
466 /// Action-queue receiver, fed into [`super::executor::ActionExecutor::new`].
467 pub actions_rx: mpsc::Receiver<PendingAction>,
468 /// Snapshot reader; clone for each Deck / consumer.
469 pub reader: MeshOsSnapshotReader,
470}
471
472impl MeshOsLoop {
473 /// Construct a loop bound to the given config. Returns the
474 /// loop + its publish handle + the action-queue receiver +
475 /// the snapshot reader, bundled in [`MeshOsLoopParts`] so
476 /// future additions don't break the constructor signature.
477 #[allow(clippy::new_ret_no_self)]
478 pub fn new(config: MeshOsConfig) -> MeshOsLoopParts {
479 let config = Arc::new(config);
480 let (events_tx, events_rx) = mpsc::channel(config.event_queue_capacity);
481 let (actions_tx, actions_rx) = mpsc::channel(config.action_queue_capacity);
482 let handle = MeshOsHandle { events: events_tx };
483 // Stamp a unique runtime epoch id at construction.
484 // 64-bit random per-runtime stamp. Pre-fix this was
485 // `SystemTime::now().as_nanos() ^ static_counter.fetch_add(1)`,
486 // but the static counter resets to 1 each process start —
487 // two processes booting in the same nanosecond (CI parallel,
488 // VM resume) XOR identical `(epoch, counter)` and produced
489 // identical runtime_epoch_ids. The SDK's watermark-reset
490 // gate (snapshot's `runtime_epoch_id` vs last-seen) was then
491 // defeated: post-restart admin_audit_seq / log_seq /
492 // failure_seq start back at 1 and pass the consumer's dedup
493 // gate as "already seen," silently filtering valid post-
494 // restart audit records. A `getrandom::fill` u64 has a
495 // 2⁻⁶⁴ collision probability across all process restarts
496 // in the fleet. The fallback path on a getrandom failure
497 // preserves the prior (epoch ^ counter) shape so the SDK
498 // gate still gets a non-zero stamp under the (extremely
499 // rare) getrandom failure mode rather than panicking
500 // through the substrate's loop construction.
501 let runtime_epoch_id: u64 = {
502 let mut buf = [0u8; 8];
503 if getrandom::fill(&mut buf).is_ok() {
504 u64::from_le_bytes(buf)
505 } else {
506 // Fallback only on `getrandom` failure (vanishingly
507 // rare: seccomp-jailed environments without the
508 // syscall). Mix the process ID into the (epoch ^
509 // counter) shape so two same-nanosecond boots in
510 // a CI parallel matrix still produce distinct
511 // ids — the pre-fix shape collided here exactly.
512 // `std::process::id()` is a u32 on every supported
513 // platform; rotated into the upper bits so it
514 // doesn't trivially XOR-cancel against the counter.
515 static RUNTIME_EPOCH_COUNTER: AtomicU64 = AtomicU64::new(1);
516 let nanos = std::time::SystemTime::now()
517 .duration_since(std::time::UNIX_EPOCH)
518 .map(|d| d.as_nanos() as u64)
519 .unwrap_or(0);
520 let pid = (std::process::id() as u64).rotate_left(32);
521 nanos ^ RUNTIME_EPOCH_COUNTER.fetch_add(1, Ordering::SeqCst) ^ pid
522 }
523 };
524 let initial_snapshot = MeshOsSnapshot {
525 runtime_epoch_id,
526 ..Default::default()
527 };
528 let snapshot = Arc::new(ArcSwap::from_pointee(initial_snapshot));
529 let reader = MeshOsSnapshotReader {
530 snapshot: Arc::clone(&snapshot),
531 };
532 let me = Self {
533 config,
534 events_rx,
535 actions_tx,
536 action_ids: AllocateActionId::new(),
537 actual: MeshOsState::default(),
538 desired: DesiredState::default(),
539 snapshot,
540 recent_emissions: Vec::new(),
541 probes: ProbeRegistryInner::default(),
542 scheduler: SchedulerRegistry::new(),
543 recovery_registry: crate::adapter::net::compute::RecoveryRegistry::new(),
544 reconcile_count: 0,
545 admin_audit_seq: 0,
546 log_seq: 0,
547 runtime_epoch_id,
548 admin_audit_ring: VecDeque::with_capacity(super::ice::DEFAULT_MAX_ADMIN_AUDIT_RECORDS),
549 log_ring: VecDeque::with_capacity(super::logs::DEFAULT_MAX_LOG_RING_RECORDS),
550 dropped_actions: Arc::new(AtomicU64::new(0)),
551 executor_failures: None,
552 executor_failure_seq: None,
553 executor_failure_appender: None,
554 control_sink: None,
555 last_local_maintenance: MaintenanceDiscriminant::Active,
556 admin_verifier: None,
557 admin_audit_appender: super::audit_chain::no_op_arc(),
558 log_appender: super::log_chain::no_op_arc(),
559 migration_aborter: super::migration_aborter::no_op_arc(),
560 migration_snapshot_source: super::migration_snapshot_source::no_op_arc(),
561 };
562 MeshOsLoopParts {
563 mesh_loop: me,
564 handle,
565 actions_rx,
566 reader,
567 }
568 }
569
570 /// Clone the dropped-action counter. The runtime uses this
571 /// to surface the count through `ExecutorStatsSnapshot`;
572 /// tests can also assert against it directly.
573 pub fn dropped_actions_counter(&self) -> Arc<AtomicU64> {
574 Arc::clone(&self.dropped_actions)
575 }
576
577 /// Attach a probe registry. The loop polls each registered
578 /// probe on every Tick, before reconcile. The registry is
579 /// shareable + cloneable, so callers retain it to add probes
580 /// after `MeshOsLoop::new` returns (the loop has been moved
581 /// into the spawned task at that point).
582 pub fn with_probe_registry(mut self, registry: ProbeRegistry) -> Self {
583 self.probes = registry.inner;
584 self
585 }
586
587 /// Attach a scheduler registry. The reconcile pass reads
588 /// the registered scorer to drive Phase D-1 rebalancing.
589 /// Cloneable + shareable like the probe registry.
590 pub fn with_scheduler_registry(mut self, registry: SchedulerRegistry) -> Self {
591 self.scheduler = registry;
592 self
593 }
594
595 /// Attach a recovery registry. SDK consumers register one
596 /// `RecoveryHandler` per group whose slots can be
597 /// re-placed; the tick handler runs them all once per Tick
598 /// (after `poll_probes`, before `run_reconcile`) so the
599 /// reconcile pass sees the recovered slot states.
600 pub fn with_recovery_registry(
601 mut self,
602 registry: crate::adapter::net::compute::RecoveryRegistry,
603 ) -> Self {
604 self.recovery_registry = registry;
605 self
606 }
607
608 /// Borrow the recovery registry — SDK consumers `register`
609 /// new handlers post-build. The `MeshOsRuntime` accessor
610 /// surfaces this so callers don't have to retain a clone.
611 pub fn recovery_registry(&self) -> &crate::adapter::net::compute::RecoveryRegistry {
612 &self.recovery_registry
613 }
614
615 /// Attach the executor's recent-failures ring. The loop reads
616 /// it on every `publish_snapshot` so the snapshot's
617 /// `recent_failures` field reflects executor-side dispatch
618 /// failures (the `MeshOsSnapshotFold` chain-record path is
619 /// not the only failure surface). The runtime calls this
620 /// after `ActionExecutor::new` so both halves of the pair
621 /// share the same ring.
622 pub fn with_executor_failures(
623 mut self,
624 failures: Arc<RwLock<VecDeque<FailureRecord>>>,
625 ) -> Self {
626 self.executor_failures = Some(failures);
627 self
628 }
629
630 /// Attach the executor's failure-seq counter + chain
631 /// appender so the loop can record its own runtime-side
632 /// failures (e.g. migration-abort dispatcher errors) with
633 /// the same monotonic sequence + durable chain dual-write
634 /// the executor uses. Pair this with
635 /// [`Self::with_executor_failures`]; together the trio
636 /// makes the loop's internal `record_runtime_failure`
637 /// helper a complete dual write into the snapshot ring +
638 /// the chain.
639 pub fn with_executor_failure_writer(
640 mut self,
641 seq: Arc<AtomicU64>,
642 appender: Arc<dyn super::failure_chain::FailureChainAppender>,
643 ) -> Self {
644 self.executor_failure_seq = Some(seq);
645 self.executor_failure_appender = Some(appender);
646 self
647 }
648
649 /// Attach a [`ControlSink`]. When set, the loop translates
650 /// this-node maintenance state transitions into
651 /// [`MeshOsControl`] events and forwards them through the
652 /// sink. The SDK installs a sink that routes events to
653 /// per-daemon control channels via its router; substrate
654 /// code that doesn't need the SDK surface can leave this
655 /// unset.
656 pub fn with_control_sink(mut self, sink: Arc<dyn ControlSink>) -> Self {
657 self.control_sink = Some(sink);
658 self
659 }
660
661 /// Attach an [`super::ice::AdminVerifier`]. When set, every
662 /// [`MeshOsEvent::SignedIceCommit`] is gated on signature
663 /// verification + the cluster's signature threshold before
664 /// folding the inner [`super::event::AdminEvent`]. Verified
665 /// commits fold normally; rejected commits drop + emit a
666 /// failure record so operators see the rejection in the
667 /// snapshot's `recent_failures` ring (substrate slice that
668 /// wires the failure pipe lands alongside the SDK surface
669 /// upgrade).
670 pub fn with_admin_verifier(mut self, verifier: Arc<super::ice::AdminVerifier>) -> Self {
671 self.admin_verifier = Some(verifier);
672 self
673 }
674
675 /// Attach a [`super::audit_chain::AdminAuditChainAppender`].
676 /// The loop's `record_admin_audit` path dual-writes every
677 /// admin commit to both the in-memory ring (snapshot
678 /// readable) and this appender (chain-backed history).
679 /// Without an explicit appender the loop uses the no-op
680 /// default; only the in-memory ring is observable.
681 pub fn with_admin_audit_appender(
682 mut self,
683 appender: Arc<dyn super::audit_chain::AdminAuditChainAppender>,
684 ) -> Self {
685 self.admin_audit_appender = appender;
686 self
687 }
688
689 /// Attach a [`super::log_chain::LogChainAppender`]. The
690 /// loop's `record_log_line` path dual-writes every log
691 /// line to both the in-memory ring (snapshot readable)
692 /// and this appender (chain-backed history). Without an
693 /// explicit appender the loop uses the no-op default.
694 pub fn with_log_appender(
695 mut self,
696 appender: Arc<dyn super::log_chain::LogChainAppender>,
697 ) -> Self {
698 self.log_appender = appender;
699 self
700 }
701
702 /// Attach a [`super::migration_aborter::MigrationAborter`].
703 /// The loop calls this after folding a verified
704 /// [`super::event::AdminEvent::KillMigration`]; production
705 /// deployments wire the
706 /// [`super::migration_aborter::OrchestratorMigrationAborter`]
707 /// adapter so the cluster's local `MigrationOrchestrator`
708 /// actually aborts in-flight migrations. Without an
709 /// explicit aborter the commit lands on the audit chain
710 /// but the migration runs to completion.
711 pub fn with_migration_aborter(
712 mut self,
713 aborter: Arc<dyn super::migration_aborter::MigrationAborter>,
714 ) -> Self {
715 self.migration_aborter = aborter;
716 self
717 }
718
719 /// Attach a
720 /// [`super::migration_snapshot_source::MigrationSnapshotSource`].
721 /// The loop reads this on every snapshot publish and
722 /// embeds the result in the snapshot's
723 /// `in_flight_migrations` field — the ICE simulator
724 /// reads it to enumerate which daemon a `KillMigration`
725 /// target would affect.
726 pub fn with_migration_snapshot_source(
727 mut self,
728 source: Arc<dyn super::migration_snapshot_source::MigrationSnapshotSource>,
729 ) -> Self {
730 self.migration_snapshot_source = source;
731 self
732 }
733
734 /// Drive the loop until either:
735 /// 1. all `MeshOsHandle` clones drop and the source channel
736 /// empties (graceful end-of-events), or
737 /// 2. a `MeshOsEvent::Shutdown` is dequeued.
738 ///
739 /// Returns the final `reconcile_count` — used by tests; in
740 /// production it's diagnostic-only.
741 pub async fn run(mut self) -> u64 {
742 tracing::debug!(
743 target: "meshos",
744 this_node = self.config.this_node,
745 tick_interval_ms = self.config.tick_interval.as_millis() as u64,
746 event_queue_capacity = self.config.event_queue_capacity,
747 action_queue_capacity = self.config.action_queue_capacity,
748 "MeshOsLoop starting",
749 );
750 // Tick timer — fires every `tick_interval`. Configured
751 // to skip missed ticks rather than burst, since the
752 // reconcile pass is the slow-and-steady cadence the plan
753 // locks in.
754 let mut tick = interval_at(
755 TokioInstant::now() + self.config.tick_interval,
756 self.config.tick_interval,
757 );
758 // `Delay` over `Skip`: under load the reconcile cadence
759 // drifts but no tick is silently dropped, so the loop
760 // never falls behind on probe + reconcile passes that
761 // would surface stale state. `Burst` would amplify a
762 // backlog, which is what the locked-decision-1 "one
763 // pass per tick" guarantee is designed to prevent.
764 tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
765
766 // Bound how many events get drained between ticks so a
767 // heavy reconcile + 100 ms tick can't starve `events_rx`
768 // under the `biased;` priority below. The pre-fix
769 // `biased; tick.tick() ⇒ events_rx.recv()` ordering
770 // correctly defends event-burst-starves-tick (the original
771 // bug), but in the other direction a long `run_reconcile`
772 // that yields mid-await re-enters the select and tick
773 // wins again — events sit in the queue until the queue
774 // saturates the sender or the system idles. Drain up to
775 // `EVENTS_PER_TICK` events in a non-blocking pass after
776 // each tick so both directions make bounded progress.
777 const EVENTS_PER_TICK: usize = 32;
778 let mut shutdown = false;
779 loop {
780 tokio::select! {
781 // `biased` polls arms in source order, so a
782 // saturated `events_rx` cannot starve the reconcile
783 // tick. The previous (random) order let a sustained
784 // event burst defer reconcile / poll_probes / gc_freeze
785 // for the duration of the burst — `local_maintenance`
786 // went stale, `applied_backoffs` stuck, and
787 // `freeze_until` never GC'd because gc_freeze only
788 // runs on Tick.
789 biased;
790 _ = tick.tick() => {
791 // The Tick event drives reconcile; we route
792 // it through the same `apply` path so the
793 // `last_tick` fold field updates uniformly.
794 self.apply(&MeshOsEvent::Tick);
795 // Pull-via-tick probes — folded BEFORE
796 // reconcile so the reconcile pass sees the
797 // latest samples in this tick window.
798 self.poll_probes();
799 // X-13: run the recovery handlers between
800 // probe samples and reconcile. Probes refresh
801 // the per-peer healthy view, then recovery
802 // re-places any unhealthy slots against the
803 // current healthy node pool, then reconcile
804 // sees the post-recovery state. Empty
805 // registry → no-op; handlers that report
806 // recovered slots are logged at debug for
807 // operator visibility.
808 let recovered = self.recovery_registry.try_run_all();
809 if !recovered.is_empty() {
810 tracing::debug!(
811 target: "meshos",
812 slots = ?recovered,
813 "X-13: recovery registry placed unhealthy slots"
814 );
815 }
816 self.run_reconcile().await;
817
818 // Drain a bounded batch of events from the
819 // queue before the next tick can re-fire. This
820 // is the inverse-direction starvation guard
821 // for the `biased;` above. `try_recv` is non-
822 // blocking, so an empty queue costs one Err
823 // each iteration and the loop falls through to
824 // the next select.
825 for _ in 0..EVENTS_PER_TICK {
826 match self.events_rx.try_recv() {
827 Ok(event) => {
828 if matches!(event, MeshOsEvent::Shutdown) {
829 tracing::debug!(
830 target: "meshos",
831 reconcile_count = self.reconcile_count,
832 "MeshOsLoop exiting — Shutdown drained post-tick",
833 );
834 shutdown = true;
835 break;
836 }
837 self.apply(&event);
838 }
839 Err(mpsc::error::TryRecvError::Empty) => break,
840 Err(mpsc::error::TryRecvError::Disconnected) => {
841 tracing::debug!(
842 target: "meshos",
843 reconcile_count = self.reconcile_count,
844 "MeshOsLoop exiting — all handles dropped (drained post-tick)",
845 );
846 shutdown = true;
847 break;
848 }
849 }
850 }
851 if shutdown {
852 break;
853 }
854 }
855 event = self.events_rx.recv() => {
856 let Some(event) = event else {
857 tracing::debug!(
858 target: "meshos",
859 reconcile_count = self.reconcile_count,
860 "MeshOsLoop exiting — all handles dropped",
861 );
862 break;
863 };
864 if matches!(event, MeshOsEvent::Shutdown) {
865 tracing::debug!(
866 target: "meshos",
867 reconcile_count = self.reconcile_count,
868 "MeshOsLoop exiting — Shutdown event received",
869 );
870 break;
871 }
872 self.apply(&event);
873 }
874 }
875 }
876
877 self.reconcile_count
878 }
879
880 /// Poll every registered locality / health probe and fold
881 /// the samples into the actual-state view. Idempotent — the
882 /// folds overwrite per-peer entries, so a re-poll within
883 /// the same tick produces the same state.
884 ///
885 /// Each probe call runs inside `catch_unwind` so a panicking
886 /// probe doesn't unwind the loop task. Probes are
887 /// third-party-installed by definition (the substrate hands
888 /// out the `ProbeRegistry`), so trust-but-isolate is the
889 /// right posture.
890 fn poll_probes(&mut self) {
891 // Clone all probe lists out under a single read lock so
892 // a concurrent install between the locality / health /
893 // inventory passes doesn't see a half-applied registry.
894 let (locality, health, inventory) = {
895 let guard = self.probes.lists.read();
896 (
897 guard.locality.clone(),
898 guard.health.clone(),
899 guard.inventory.clone(),
900 )
901 };
902 for probe in &locality {
903 let probe = Arc::clone(probe);
904 let result =
905 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| probe.rtt_samples()));
906 match result {
907 Ok(samples) => {
908 for (peer, rtt) in samples {
909 self.actual.rtt.insert(peer, rtt);
910 }
911 }
912 Err(_) => {
913 tracing::error!(
914 target: "meshos",
915 "locality probe panicked — sample skipped this tick",
916 );
917 }
918 }
919 }
920 for probe in &health {
921 let probe = Arc::clone(probe);
922 let result =
923 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| probe.health_samples()));
924 match result {
925 Ok(samples) => {
926 for (peer, hc) in samples {
927 self.actual.node_health.insert(peer, hc);
928 }
929 }
930 Err(_) => {
931 tracing::error!(
932 target: "meshos",
933 "health probe panicked — sample skipped this tick",
934 );
935 }
936 }
937 }
938 // Track inventory samples seen this tick so the GC pass
939 // below drops entries for peers no probe sees anymore.
940 // The map is probe-exclusive (no `MeshOsEvent` populates
941 // it), so it's safe to authoritatively prune from the
942 // probe pass. The rtt + node_health maps are also fed by
943 // `MeshOsEvent::{RttSample, NodeHealth}` event folds, so
944 // their per-tick prune would erase event-driven samples;
945 // those maps grow with proximity churn but the inventory
946 // leak the review flagged was per-peer multi-field and
947 // strictly worse.
948 let mut peers_seen_inventory: std::collections::HashSet<super::event::NodeId> =
949 std::collections::HashSet::new();
950 let mut all_probes_succeeded = true;
951 let mut any_probe_saw_samples = false;
952 for probe in &inventory {
953 let probe = Arc::clone(probe);
954 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
955 probe.inventory_samples()
956 }));
957 match result {
958 Ok(samples) => {
959 if !samples.is_empty() {
960 any_probe_saw_samples = true;
961 }
962 for (peer, inv) in samples {
963 peers_seen_inventory.insert(peer);
964 self.actual.inventory.insert(peer, inv);
965 }
966 }
967 Err(_) => {
968 all_probes_succeeded = false;
969 tracing::error!(
970 target: "meshos",
971 "inventory probe panicked — sample skipped this tick",
972 );
973 }
974 }
975 }
976 // Two-axis guard for the GC pass:
977 //
978 // * `all_probes_succeeded` — a partial-panic pass
979 // can leave `peers_seen_inventory` short of peers
980 // that the panicking probe owned authoritatively
981 // (e.g. two inventory probes with disjoint
982 // coverage); pruning then drops the panicking
983 // probe's peers despite no probe knowing they
984 // departed. Wait for the next clean tick instead.
985 //
986 // * `any_probe_saw_samples` — the trait contract
987 // allows a probe to return `Ok(vec![])` for a
988 // transient empty-this-tick (procfs unavailable,
989 // no peers known yet at startup); pruning on an
990 // all-empty pass would wipe every previously-seen
991 // peer's inventory. The first tick at startup
992 // legitimately has no peers, so this also
993 // prevents a cold-start wipe.
994 if all_probes_succeeded && any_probe_saw_samples {
995 self.actual
996 .inventory
997 .retain(|peer, _| peers_seen_inventory.contains(peer));
998 }
999 }
1000
1001 fn apply(&mut self, event: &MeshOsEvent) {
1002 // ICE-signed commits get their own gate: verify the
1003 // bundle before folding the inner AdminEvent. The
1004 // outcome (accepted, rejected, unverified) lands on the
1005 // admin_audit ring regardless of verification result so
1006 // security review can replay every attempt.
1007 if let MeshOsEvent::SignedIceCommit {
1008 proposal,
1009 signatures,
1010 issued_at_ms,
1011 blast_hash,
1012 } = event
1013 {
1014 let outcome = match self.admin_verifier.as_ref() {
1015 Some(verifier) => {
1016 let now_ms = super::ice::now_ms_since_unix_epoch();
1017 match verifier.verify_commit(
1018 proposal,
1019 signatures,
1020 *issued_at_ms,
1021 blast_hash,
1022 now_ms,
1023 ) {
1024 Ok(()) => super::ice::VerificationOutcome::Accepted,
1025 Err(err) => super::ice::VerificationOutcome::Rejected {
1026 kind: err.kind().to_string(),
1027 message: err.to_string(),
1028 },
1029 }
1030 }
1031 None => super::ice::VerificationOutcome::Unverified,
1032 };
1033 let admin_event = proposal.to_admin_event();
1034 let operator_ids: Vec<u64> = signatures.iter().map(|s| s.operator_id).collect();
1035 self.record_admin_audit(&admin_event, operator_ids, outcome.clone());
1036 if let super::ice::VerificationOutcome::Rejected { kind, message } = &outcome {
1037 tracing::warn!(
1038 target: "meshos",
1039 kind = %kind,
1040 error = %message,
1041 "rejected SignedIceCommit — signature verification failed",
1042 );
1043 return;
1044 }
1045 // Verification passed (or no verifier installed —
1046 // tests / dev mode). Fold as if the inner AdminEvent
1047 // arrived directly. Dispatch fires AFTER actual.apply
1048 // so the dispatcher observes post-fold state — the
1049 // ordering invariant the loop preserves for every
1050 // chain-committed admin event.
1051 self.desired
1052 .apply_admin(&admin_event, self.config.this_node);
1053 let unwrapped = MeshOsEvent::AdminEvent(admin_event.clone());
1054 self.actual.apply(&unwrapped, self.config.this_node);
1055 self.dispatch_kill_migration_if_applicable(&admin_event);
1056 self.emit_maintenance_transitions();
1057 return;
1058 }
1059 // Single-signature signed-admin commits mirror the ICE
1060 // path: verify, audit, fold on success. The signature
1061 // covers `admin_event_signing_payload(event)` so the
1062 // SDK and substrate agree on the byte sequence.
1063 if let MeshOsEvent::SignedAdminCommit {
1064 event: admin_event,
1065 signature,
1066 issued_at_ms,
1067 } = event
1068 {
1069 // Freeze gate: ordinary admin commits that arrive
1070 // during an in-effect cluster freeze land on the
1071 // audit ring as Rejected with kind
1072 // "freeze_in_effect" and the inner event drops. ICE
1073 // commits (the multi-op SignedIceCommit path) bypass
1074 // by design — operators must be able to thaw the
1075 // cluster mid-freeze.
1076 let now = self
1077 .actual
1078 .last_tick
1079 .unwrap_or_else(std::time::Instant::now);
1080 if self.actual.is_frozen(now) && !admin_event.is_ice() {
1081 self.record_admin_audit(
1082 admin_event,
1083 vec![signature.operator_id],
1084 super::ice::VerificationOutcome::Rejected {
1085 kind: "freeze_in_effect".to_string(),
1086 message: "ordinary admin commits are gated during a cluster freeze; \
1087 thaw via the ICE surface to unblock"
1088 .to_string(),
1089 },
1090 );
1091 tracing::warn!(
1092 target: "meshos",
1093 kind = "freeze_in_effect",
1094 "rejected SignedAdminCommit — cluster is frozen and the event is non-ICE",
1095 );
1096 return;
1097 }
1098 let outcome = match self.admin_verifier.as_ref() {
1099 Some(verifier) => {
1100 let now_ms = super::ice::now_ms_since_unix_epoch();
1101 match verifier.verify_admin_commit(
1102 admin_event,
1103 signature,
1104 *issued_at_ms,
1105 now_ms,
1106 ) {
1107 Ok(()) => super::ice::VerificationOutcome::Accepted,
1108 Err(err) => super::ice::VerificationOutcome::Rejected {
1109 kind: err.kind().to_string(),
1110 message: err.to_string(),
1111 },
1112 }
1113 }
1114 None => super::ice::VerificationOutcome::Unverified,
1115 };
1116 self.record_admin_audit(admin_event, vec![signature.operator_id], outcome.clone());
1117 if let super::ice::VerificationOutcome::Rejected { kind, message } = &outcome {
1118 tracing::warn!(
1119 target: "meshos",
1120 kind = %kind,
1121 error = %message,
1122 "rejected SignedAdminCommit — signature verification failed",
1123 );
1124 return;
1125 }
1126 self.desired.apply_admin(admin_event, self.config.this_node);
1127 let unwrapped = MeshOsEvent::AdminEvent(admin_event.clone());
1128 self.actual.apply(&unwrapped, self.config.this_node);
1129 self.dispatch_kill_migration_if_applicable(admin_event);
1130 self.emit_maintenance_transitions();
1131 return;
1132 }
1133 // Log lines bypass the actual/desired fold entirely;
1134 // the loop stamps + pushes onto the log ring directly.
1135 if let MeshOsEvent::LogLine(line) = event {
1136 self.record_log_line(line);
1137 return;
1138 }
1139 match event {
1140 MeshOsEvent::PlacementIntent(intent) => self.desired.apply(intent),
1141 MeshOsEvent::DaemonIntentUpdate(update) => self.desired.apply_daemon_intent(update),
1142 MeshOsEvent::LocalReplicaIntent(update) => {
1143 self.desired.apply_local_replica_intent(update)
1144 }
1145 MeshOsEvent::AdminEvent(admin) => {
1146 // Freeze gate, same as the SignedAdminCommit
1147 // path: ordinary admin events drop during a
1148 // cluster freeze. ICE events bypass.
1149 let now = self
1150 .actual
1151 .last_tick
1152 .unwrap_or_else(std::time::Instant::now);
1153 if self.actual.is_frozen(now) && !admin.is_ice() {
1154 self.record_admin_audit(
1155 admin,
1156 Vec::new(),
1157 super::ice::VerificationOutcome::Rejected {
1158 kind: "freeze_in_effect".to_string(),
1159 message: "ordinary admin commits are gated during a cluster freeze; \
1160 thaw via the ICE surface to unblock"
1161 .to_string(),
1162 },
1163 );
1164 tracing::warn!(
1165 target: "meshos",
1166 kind = "freeze_in_effect",
1167 "rejected unsigned AdminEvent — cluster is frozen and the event is non-ICE",
1168 );
1169 return;
1170 }
1171 // Unsigned admin commits also land on the audit
1172 // ring with Unverified outcome — so audit
1173 // consumers see every admin attempt the cluster
1174 // observed, not just signed ICE bundles.
1175 self.record_admin_audit(
1176 admin,
1177 Vec::new(),
1178 super::ice::VerificationOutcome::Unverified,
1179 );
1180 self.desired.apply_admin(admin, self.config.this_node);
1181 }
1182 _ => {}
1183 }
1184 self.actual.apply(event, self.config.this_node);
1185 // KillMigration / future dispatchers fire AFTER
1186 // actual.apply so they observe post-fold state. Lifted
1187 // out of the AdminEvent arm above for that ordering.
1188 if let MeshOsEvent::AdminEvent(admin) = event {
1189 self.dispatch_kill_migration_if_applicable(admin);
1190 }
1191 self.emit_maintenance_transitions();
1192 }
1193
1194 /// Record an admin-commit outcome on the state's audit
1195 /// ring. Bounded by
1196 /// [`super::ice::DEFAULT_MAX_ADMIN_AUDIT_RECORDS`]; drops
1197 /// oldest FIFO when the cap is exceeded.
1198 #[expect(
1199 clippy::expect_used,
1200 reason = "u64 admin_audit_seq overflow takes ~58 million years at 100 µs/event; the documented loud-panic strategy is preferable to silent saturation that collapses every subsequent record into a single ring entry"
1201 )]
1202 fn record_admin_audit(
1203 &mut self,
1204 event: &super::event::AdminEvent,
1205 operator_ids: Vec<u64>,
1206 outcome: super::ice::VerificationOutcome,
1207 ) {
1208 use std::time::{SystemTime, UNIX_EPOCH};
1209 let committed_at_ms = SystemTime::now()
1210 .duration_since(UNIX_EPOCH)
1211 .map(|d| d.as_millis() as u64)
1212 .unwrap_or(0);
1213 // Stamp a monotonic per-runtime seq starting at 1 so a
1214 // `0` seq downstream reads as "unset." Panic on overflow:
1215 // the SDK dedup gate keys on `seq`, so a saturating add
1216 // would pin every record past `u64::MAX` at the cap and
1217 // silently collapse them all into a single ring entry —
1218 // permanent audit-loss disguised as routine. `checked_add`
1219 // is the loud signal an operator wants: at 100 µs/event
1220 // we'd reach the overflow ~58 million years in, so the
1221 // panic surfaces only a true runaway bug, not a real
1222 // production condition.
1223 self.admin_audit_seq = self
1224 .admin_audit_seq
1225 .checked_add(1)
1226 .expect("admin_audit_seq overflowed u64 — runaway producer or counter corruption");
1227 let record = super::ice::AdminAuditRecord {
1228 seq: self.admin_audit_seq,
1229 committed_at_ms,
1230 event: event.clone(),
1231 operator_ids,
1232 outcome,
1233 chain_pending: false,
1234 };
1235 // Ring-first then chain. The ring is the immediate
1236 // user-visible surface that the Deck SDK polls; the chain
1237 // is durability backup for cluster-lifetime replay. If the
1238 // chain append fails, mark the ring entry's chain_pending
1239 // flag so chain consumers (replaying after restart) can
1240 // distinguish "entry never landed" from "entry hasn't
1241 // reached me yet." Pre-fix the chain attempt ran BEFORE
1242 // the ring push and the ring entry never recorded the
1243 // chain-side outcome — chain consumers saw a gap with no
1244 // way to know it was permanent.
1245 self.admin_audit_ring.push_back(record.clone());
1246 while self.admin_audit_ring.len() > super::ice::DEFAULT_MAX_ADMIN_AUDIT_RECORDS {
1247 self.admin_audit_ring.pop_front();
1248 }
1249 if let Err(err) = self.admin_audit_appender.append(&record) {
1250 tracing::warn!(
1251 target: "meshos",
1252 seq = record.seq,
1253 error = %err,
1254 "admin-audit-chain append failed — ring entry marked chain_pending",
1255 );
1256 // Locate the just-pushed entry (back of the ring; cap
1257 // protects against an empty back() but we just pushed)
1258 // and flip its chain_pending flag so the ring surface
1259 // reports the gap.
1260 if let Some(last) = self.admin_audit_ring.back_mut() {
1261 last.chain_pending = true;
1262 }
1263 }
1264 }
1265
1266 /// Stamp + push a `LogLine` onto the per-node log ring.
1267 /// Bounded by [`super::logs::DEFAULT_MAX_LOG_RING_RECORDS`];
1268 /// drops oldest FIFO when the cap is exceeded.
1269 #[expect(
1270 clippy::expect_used,
1271 reason = "u64 log_seq overflow is astronomically distant; the documented loud-panic strategy is preferable to silent saturation"
1272 )]
1273 fn record_log_line(&mut self, line: &super::logs::LogLine) {
1274 use std::time::{SystemTime, UNIX_EPOCH};
1275 let ts_ms = SystemTime::now()
1276 .duration_since(UNIX_EPOCH)
1277 .map(|d| d.as_millis() as u64)
1278 .unwrap_or(0);
1279 // Checked add mirrors admin_audit_seq above: panic on the
1280 // (astronomical) u64::MAX boundary rather than silently
1281 // collapse every subsequent record into the saturating
1282 // cap, which the SDK dedup gate would treat as a single
1283 // duplicate.
1284 self.log_seq = self
1285 .log_seq
1286 .checked_add(1)
1287 .expect("log_seq overflowed u64 — runaway producer or counter corruption");
1288 let record = super::logs::LogRecord {
1289 seq: self.log_seq,
1290 ts_ms,
1291 level: line.level,
1292 daemon_id: line.daemon_id,
1293 node_id: Some(self.config.this_node),
1294 message: line.message.clone(),
1295 chain_pending: false,
1296 };
1297 // Ring-first then chain. Same rationale as record_admin_audit:
1298 // the ring is the immediate user surface; the chain is
1299 // durability backup. Mark chain_pending on the just-pushed
1300 // ring entry when the chain append fails so chain consumers
1301 // can distinguish "gap" from "haven't replicated yet."
1302 self.log_ring.push_back(record.clone());
1303 while self.log_ring.len() > super::logs::DEFAULT_MAX_LOG_RING_RECORDS {
1304 self.log_ring.pop_front();
1305 }
1306 if let Err(err) = self.log_appender.append(&record) {
1307 tracing::warn!(
1308 target: "meshos",
1309 seq = record.seq,
1310 error = %err,
1311 "log-chain append failed — ring entry marked chain_pending",
1312 );
1313 if let Some(last) = self.log_ring.back_mut() {
1314 last.chain_pending = true;
1315 }
1316 }
1317 }
1318
1319 /// Route a verified [`super::event::AdminEvent::KillMigration`]
1320 /// to the installed
1321 /// [`super::migration_aborter::MigrationAborter`]. No-op for
1322 /// every other admin variant — the match guards the lookup so
1323 /// callers can invoke this unconditionally after fold without
1324 /// re-pattern-matching. Errors are logged + swallowed; a
1325 /// dispatcher hiccup must never wedge the loop.
1326 fn dispatch_kill_migration_if_applicable(&self, admin: &super::event::AdminEvent) {
1327 let super::event::AdminEvent::KillMigration { migration } = admin else {
1328 return;
1329 };
1330 // No-op aborter installed with a verifier wired — this
1331 // is "production-partial" config: the chain commit
1332 // landed but the migration runs to completion because
1333 // the aborter is a no-op. Surface as a FailureRecord
1334 // so operators reading subscribe_failures see it.
1335 if self.migration_aborter.is_no_op() && self.admin_verifier.is_some() {
1336 self.record_runtime_failure(
1337 format!("kill-migration:{}", *migration),
1338 "no-op migration aborter installed while admin verifier is wired — \
1339 chain commit landed but KillMigration is a no-op on this node"
1340 .to_string(),
1341 );
1342 return;
1343 }
1344 if let Err(err) = self.migration_aborter.abort(*migration) {
1345 // Dispatch failure: log AND push to the failure
1346 // ring so the Deck SDK's subscribe_failures stream
1347 // surfaces it. A previous slice swallowed this
1348 // failure to a tracing::warn! only.
1349 tracing::warn!(
1350 target: "meshos",
1351 migration = migration,
1352 error = %err,
1353 "migration-abort dispatcher failed — chain commit landed but migration may continue",
1354 );
1355 self.record_runtime_failure(
1356 format!("kill-migration:{}", *migration),
1357 format!("migration-abort dispatcher error: {err}"),
1358 );
1359 }
1360 }
1361
1362 /// Push a runtime-side failure onto the executor's failure
1363 /// ring + chain. Used by the loop for failures that don't
1364 /// originate from an action dispatch — e.g. migration-abort
1365 /// dispatcher errors after a `KillMigration` commit. Falls
1366 /// back to a `tracing::warn!` when the executor handles
1367 /// aren't wired (in-process tests that don't run the
1368 /// executor task).
1369 fn record_runtime_failure(&self, source: String, reason: String) {
1370 let recorded_at_ms = std::time::SystemTime::now()
1371 .duration_since(std::time::UNIX_EPOCH)
1372 .map(|d| d.as_millis() as u64)
1373 .unwrap_or(0);
1374 let seq = match self.executor_failure_seq.as_ref() {
1375 Some(s) => s.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1,
1376 None => {
1377 tracing::warn!(
1378 target: "meshos",
1379 source = %source,
1380 reason = %reason,
1381 "runtime-side failure surfaced but executor handles aren't wired",
1382 );
1383 return;
1384 }
1385 };
1386 let record = FailureRecord {
1387 seq,
1388 source,
1389 reason,
1390 recorded_at_ms,
1391 };
1392 if let Some(appender) = self.executor_failure_appender.as_ref() {
1393 if let Err(err) = appender.append(&record) {
1394 tracing::warn!(
1395 target: "meshos",
1396 seq = record.seq,
1397 error = %err,
1398 "failure-chain append failed — record kept on in-memory ring only",
1399 );
1400 }
1401 }
1402 if let Some(ring) = self.executor_failures.as_ref() {
1403 let mut g = ring.write();
1404 if g.len() >= super::snapshot::RECENT_FAILURES_CAPACITY {
1405 g.pop_front();
1406 }
1407 g.push_back(record);
1408 }
1409 }
1410
1411 /// Detect `local_maintenance` discriminant transitions and
1412 /// fan-out the corresponding [`MeshOsControl`] through the
1413 /// installed sink. Idempotent: re-entering the same
1414 /// discriminant emits nothing. Forward arcs only — the fold
1415 /// rejects late-arriving backward arcs upstream.
1416 fn emit_maintenance_transitions(&mut self) {
1417 let Some(sink) = self.control_sink.as_ref() else {
1418 self.last_local_maintenance =
1419 MaintenanceDiscriminant::from_state(&self.actual.local_maintenance);
1420 return;
1421 };
1422 let current = MaintenanceDiscriminant::from_state(&self.actual.local_maintenance);
1423 if current == self.last_local_maintenance {
1424 return;
1425 }
1426 // Anchor the fall-back deadline on the actual state's
1427 // last_tick so loop-replay produces identical deadlines.
1428 // Falling back to wall-clock would break the
1429 // replay-convergence contract the fold side already
1430 // pins on `last_tick`.
1431 let anchor = self
1432 .actual
1433 .last_tick
1434 .unwrap_or_else(std::time::Instant::now);
1435 let event = match (self.last_local_maintenance, current) {
1436 (_, MaintenanceDiscriminant::EnteringMaintenance) => {
1437 // Operator opened a drain. Use the configured
1438 // deadline if the admin event carried one; else
1439 // fall back to the maintenance-config default.
1440 let deadline = match &self.actual.local_maintenance {
1441 MaintenanceState::EnteringMaintenance {
1442 deadline: Some(d), ..
1443 } => *d,
1444 _ => anchor + self.config.maintenance.default_drain_deadline,
1445 };
1446 Some(MeshOsControl::DrainStart { deadline })
1447 }
1448 (
1449 MaintenanceDiscriminant::EnteringMaintenance,
1450 MaintenanceDiscriminant::Maintenance | MaintenanceDiscriminant::DrainFailed,
1451 ) => Some(MeshOsControl::DrainFinish),
1452 _ => None,
1453 };
1454 self.last_local_maintenance = current;
1455 if let Some(event) = event {
1456 sink.emit(event);
1457 }
1458 }
1459
1460 async fn run_reconcile(&mut self) {
1461 // Anchor every per-tick timestamp on `last_tick` (set by
1462 // `apply(Tick)` immediately before this call) so two
1463 // replays of the same event stream produce identical
1464 // `last_rebalance`, `applied_backoffs`, and
1465 // `PendingAction.emitted_at` values. Bootstrap fallback to
1466 // `Instant::now()` only when no Tick has fired yet.
1467 let now = self
1468 .actual
1469 .last_tick
1470 .unwrap_or_else(std::time::Instant::now);
1471 let scorer = self.scheduler.current();
1472 let actions = reconcile(
1473 &self.actual,
1474 &self.desired,
1475 self.config.this_node,
1476 &self.config.locality,
1477 &self.config.maintenance,
1478 &self.config.scheduler,
1479 scorer.as_deref(),
1480 );
1481 // Record cooldowns for any RequestEviction we emit so
1482 // the same chain doesn't flap on the next tick; track
1483 // `ApplyBackoff` emissions so reconcile suppresses
1484 // re-emit while the daemon stays in the same backoff
1485 // window.
1486 for action in &actions {
1487 match action {
1488 super::action::MeshOsAction::RequestEviction { chain, .. } => {
1489 self.actual.last_rebalance.insert(*chain, now);
1490 }
1491 super::action::MeshOsAction::ApplyBackoff { daemon, until } => {
1492 self.actual.applied_backoffs.insert(daemon.clone(), *until);
1493 }
1494 _ => {}
1495 }
1496 }
1497 // Drain queued force-evictions: reconcile consumed them
1498 // (whether or not this node is the leader for each
1499 // chain). Clearing here unconditionally keeps non-leader
1500 // observers from accumulating stale entries.
1501 if !self.actual.forced_evictions.is_empty() {
1502 self.actual.forced_evictions.clear();
1503 }
1504 // Same pattern for force-cutovers — every node drains
1505 // unconditionally so non-leader observers don't
1506 // accumulate stale entries.
1507 if !self.actual.forced_placements.is_empty() {
1508 self.actual.forced_placements.clear();
1509 }
1510 self.reconcile_count += 1;
1511 let mut dropped_this_tick: u64 = 0;
1512 let mut first_dropped_kind: Option<&'static str> = None;
1513 for action in actions {
1514 let pending = PendingAction {
1515 id: self.action_ids.next(),
1516 action,
1517 emitted_at: now,
1518 };
1519 // Drop on backpressure rather than block reconcile —
1520 // the executor's job is to apply admit(); reconcile
1521 // staying responsive is the higher-order property.
1522 // Count + log drops so the silent-loss path is
1523 // observable. recent_emissions only gets the action
1524 // on try_send success — pre-fix every action landed in
1525 // recent_emissions (and surfaced as `recently_emitted`
1526 // on the snapshot) even when the executor's queue was
1527 // full and the action would never run, double-counting
1528 // the drop in dropped_actions while painting a misleading
1529 // "emitted" picture in the snapshot.
1530 let pending_clone = pending.clone();
1531 match self.actions_tx.try_send(pending) {
1532 Ok(()) => self.recent_emissions.push(pending_clone),
1533 Err(mpsc::error::TrySendError::Full(rejected)) => {
1534 dropped_this_tick += 1;
1535 if first_dropped_kind.is_none() {
1536 first_dropped_kind =
1537 Some(super::snapshot::action_kind_str(&rejected.action));
1538 }
1539 }
1540 Err(mpsc::error::TrySendError::Closed(_)) => {
1541 // Executor task is gone — count as dropped and
1542 // skip recent_emissions; subsequent reconciles
1543 // will keep counting until the loop tears down.
1544 dropped_this_tick += 1;
1545 }
1546 }
1547 }
1548 if dropped_this_tick > 0 {
1549 self.dropped_actions
1550 .fetch_add(dropped_this_tick, Ordering::Relaxed);
1551 tracing::warn!(
1552 target: "meshos",
1553 dropped = dropped_this_tick,
1554 first_kind = first_dropped_kind.unwrap_or("?"),
1555 queue_capacity = self.config.action_queue_capacity,
1556 "reconcile output dropped — action queue full",
1557 );
1558 }
1559 self.publish_snapshot();
1560 // Bound the in-loop pending mirror so a backed-up
1561 // executor doesn't let snapshot pending grow unbounded.
1562 // Action queue capacity is the natural bound.
1563 if self.recent_emissions.len() > self.config.action_queue_capacity {
1564 let overflow = self.recent_emissions.len() - self.config.action_queue_capacity;
1565 self.recent_emissions.drain(..overflow);
1566 }
1567 }
1568
1569 fn publish_snapshot(&self) {
1570 // Read the executor's failures ring under a short read
1571 // lock and copy it into the snapshot. The lock is held
1572 // only across the clone to keep the executor's write
1573 // path (record_failure) responsive.
1574 let failures: Vec<FailureRecord> = match self.executor_failures.as_ref() {
1575 Some(ring) => ring.read().iter().cloned().collect(),
1576 None => Vec::new(),
1577 };
1578 let in_flight_migrations = self.migration_snapshot_source.list();
1579 let mut snap = MeshOsSnapshot::from_state(
1580 &self.actual,
1581 &self.desired,
1582 &self.recent_emissions,
1583 &failures,
1584 in_flight_migrations,
1585 &self.admin_audit_ring,
1586 &self.log_ring,
1587 self.config.this_node,
1588 );
1589 snap.runtime_epoch_id = self.runtime_epoch_id;
1590 self.snapshot.store(Arc::new(snap));
1591 }
1592}
1593
1594/// Convenience: a config with a very short `tick_interval` for
1595/// tests, so the reconcile pass fires quickly. Not exported
1596/// outside the crate.
1597#[cfg(test)]
1598pub(crate) fn fast_test_config() -> MeshOsConfig {
1599 MeshOsConfig {
1600 this_node: 1,
1601 tick_interval: std::time::Duration::from_millis(10),
1602 event_queue_capacity: 64,
1603 action_queue_capacity: 64,
1604 backpressure: Default::default(),
1605 locality: Default::default(),
1606 maintenance: Default::default(),
1607 scheduler: Default::default(),
1608 }
1609}
1610
1611#[cfg(test)]
1612mod tests {
1613 #![allow(
1614 clippy::disallowed_methods,
1615 reason = "test code legitimately uses std::sync::{Mutex,RwLock} for SUT setup; tests have no real poison concern"
1616 )]
1617 use std::time::Duration as StdDuration;
1618
1619 use super::super::event::{
1620 ChainId, LocalReplicaIntent, LocalReplicaIntentUpdate, MeshOsEvent, ReplicaUpdate,
1621 };
1622 use super::*;
1623
1624 #[tokio::test]
1625 async fn loop_exits_cleanly_when_all_handles_drop() {
1626 let MeshOsLoopParts {
1627 mesh_loop: loop_,
1628 handle,
1629 mut actions_rx,
1630 reader: _,
1631 } = MeshOsLoop::new(fast_test_config());
1632 let task = tokio::spawn(loop_.run());
1633 drop(handle);
1634 // Loop should drain quickly. `run` returns the
1635 // reconcile count.
1636 let count = tokio::time::timeout(StdDuration::from_secs(1), task)
1637 .await
1638 .expect("loop did not exit after all handles dropped")
1639 .expect("join");
1640 // Zero or more ticks may have fired before we dropped;
1641 // assert we got at least zero (compiles + ran).
1642 let _ = count;
1643 // The action queue must be empty under Phase A.
1644 assert!(actions_rx.try_recv().is_err());
1645 }
1646
1647 #[tokio::test]
1648 async fn loop_exits_on_shutdown_event() {
1649 let MeshOsLoopParts {
1650 mesh_loop: loop_,
1651 handle,
1652 actions_rx: _,
1653 reader: _,
1654 } = MeshOsLoop::new(fast_test_config());
1655 let task = tokio::spawn(loop_.run());
1656 handle.publish(MeshOsEvent::Shutdown).await.unwrap();
1657 let count = tokio::time::timeout(StdDuration::from_secs(1), task)
1658 .await
1659 .expect("loop did not exit after Shutdown")
1660 .expect("join");
1661 let _ = count;
1662 }
1663
1664 #[tokio::test]
1665 async fn publish_timeout_returns_queue_full_when_loop_is_wedged() {
1666 // Regression for I11: `publish` parks indefinitely on a
1667 // wedged loop. `publish_timeout` surfaces
1668 // QueueFull after the configured window so sources don't
1669 // stall.
1670 let cfg = MeshOsConfig {
1671 event_queue_capacity: 1,
1672 ..fast_test_config()
1673 };
1674 let MeshOsLoopParts {
1675 mesh_loop: _loop_,
1676 handle,
1677 actions_rx: _actions_rx,
1678 reader: _reader,
1679 } = MeshOsLoop::new(cfg);
1680 // Don't spawn the loop — handle is alive but the
1681 // receiver is parked inside `_loop_`. First publish
1682 // fills the capacity-1 channel; second blocks
1683 // indefinitely. publish_timeout surfaces QueueFull.
1684 handle
1685 .publish(MeshOsEvent::Tick)
1686 .await
1687 .expect("first send fits in the single-slot channel");
1688 let started = std::time::Instant::now();
1689 let err = handle
1690 .publish_timeout(MeshOsEvent::Tick, StdDuration::from_millis(50))
1691 .await
1692 .expect_err("second send should time out");
1693 assert!(matches!(err, MeshOsHandleError::QueueFull));
1694 assert!(
1695 started.elapsed() < StdDuration::from_millis(500),
1696 "publish_timeout must honor its budget",
1697 );
1698 }
1699
1700 #[tokio::test]
1701 async fn panicking_probe_does_not_kill_the_loop() {
1702 // Regression for I6: a panicking probe used to unwind
1703 // the loop task. The catch_unwind wrapper in
1704 // `poll_probes` now logs + skips the bad sample so
1705 // reconcile keeps running.
1706 struct PanickyProbe;
1707 impl super::super::probes::LocalityProbe for PanickyProbe {
1708 fn rtt_samples(&self) -> Vec<(u64, std::time::Duration)> {
1709 panic!("boom from probe");
1710 }
1711 }
1712 let registry = ProbeRegistry::new();
1713 registry.add_locality_probe(Arc::new(PanickyProbe));
1714 let cfg = MeshOsConfig {
1715 tick_interval: StdDuration::from_millis(10),
1716 ..fast_test_config()
1717 };
1718 let MeshOsLoopParts {
1719 mesh_loop: loop_,
1720 handle,
1721 actions_rx: _actions_rx,
1722 reader: _reader,
1723 } = MeshOsLoop::new(cfg);
1724 let loop_ = loop_.with_probe_registry(registry);
1725 let task = tokio::spawn(loop_.run());
1726 // Let several ticks fire — each one will invoke the
1727 // panicking probe.
1728 tokio::time::sleep(StdDuration::from_millis(60)).await;
1729 handle.publish(MeshOsEvent::Shutdown).await.unwrap();
1730 let count = tokio::time::timeout(StdDuration::from_secs(2), task)
1731 .await
1732 .expect("loop should still exit cleanly")
1733 .expect("loop task survived probe panics");
1734 assert!(
1735 count >= 1,
1736 "loop should have completed at least one reconcile pass despite probe panics",
1737 );
1738 }
1739
1740 #[tokio::test]
1741 async fn inventory_gc_drops_peers_no_probe_samples_anymore() {
1742 // poll_probes used to insert per-peer inventory samples
1743 // and never remove them. A peer that departed from
1744 // proximity would leak into actual.inventory forever,
1745 // surfacing in every snapshot until the process
1746 // restarted. Pin the per-tick GC: an inventory probe
1747 // whose sample set shrinks across ticks should leave
1748 // actual.inventory pruned to match.
1749 use std::sync::atomic::{AtomicUsize, Ordering as AOrdering};
1750 use std::sync::Mutex;
1751 struct ShrinkingProbe {
1752 tick: AtomicUsize,
1753 // Mutex so the trait method can mutate freely
1754 // without &mut self.
1755 samples_per_tick: Mutex<Vec<Vec<(u64, super::super::probes::PeerInventory)>>>,
1756 }
1757 impl super::super::probes::InventoryProbe for ShrinkingProbe {
1758 fn inventory_samples(&self) -> Vec<(u64, super::super::probes::PeerInventory)> {
1759 let t = self.tick.fetch_add(1, AOrdering::Relaxed);
1760 let guard = self.samples_per_tick.lock().unwrap();
1761 // Past the end of the scripted set, stick on the
1762 // final entry so the steady state stays observed.
1763 let idx = t.min(guard.len().saturating_sub(1));
1764 guard.get(idx).cloned().unwrap_or_default()
1765 }
1766 }
1767 let inv_with_cpu = |cpu: f64| super::super::probes::PeerInventory {
1768 cpu_load_1m: Some(cpu),
1769 ..Default::default()
1770 };
1771 let probe = Arc::new(ShrinkingProbe {
1772 tick: AtomicUsize::new(0),
1773 samples_per_tick: Mutex::new(vec![
1774 vec![
1775 (0xA, inv_with_cpu(0.1)),
1776 (0xB, inv_with_cpu(0.2)),
1777 (0xC, inv_with_cpu(0.3)),
1778 ],
1779 vec![(0xB, inv_with_cpu(0.2)), (0xC, inv_with_cpu(0.3))],
1780 vec![(0xB, inv_with_cpu(0.2)), (0xC, inv_with_cpu(0.3))],
1781 ]),
1782 });
1783 let registry = ProbeRegistry::new();
1784 registry.add_inventory_probe(probe);
1785 let cfg = MeshOsConfig {
1786 tick_interval: StdDuration::from_millis(10),
1787 ..fast_test_config()
1788 };
1789 let MeshOsLoopParts {
1790 mesh_loop: loop_,
1791 handle,
1792 actions_rx: _actions_rx,
1793 reader,
1794 } = MeshOsLoop::new(cfg);
1795 let loop_ = loop_.with_probe_registry(registry);
1796 let task = tokio::spawn(loop_.run());
1797 // Wait long enough for at least two ticks to fire so
1798 // the second sample set (without peer 0xA) lands.
1799 tokio::time::sleep(StdDuration::from_millis(80)).await;
1800 let snap = reader.read();
1801 let inv_peers: std::collections::BTreeSet<u64> = snap
1802 .peers
1803 .iter()
1804 .filter(|(_, p)| p.cpu_load_1m.is_some())
1805 .map(|(id, _)| *id)
1806 .collect();
1807 handle.publish(MeshOsEvent::Shutdown).await.unwrap();
1808 let _ = tokio::time::timeout(StdDuration::from_secs(2), task).await;
1809 assert!(
1810 !inv_peers.contains(&0xA),
1811 "peer 0xA stopped being sampled but still appears in the inventory: {inv_peers:?}",
1812 );
1813 assert!(inv_peers.contains(&0xB), "peer 0xB should still be sampled");
1814 assert!(inv_peers.contains(&0xC), "peer 0xC should still be sampled");
1815 }
1816
1817 #[tokio::test]
1818 async fn inventory_gc_does_not_wipe_peers_when_one_of_two_probes_panics() {
1819 // Multi-probe partial-panic case: probe A reports peer
1820 // 0xA, probe B reports peer 0xB. On tick 2, probe B
1821 // panics; the GC pass must not drop 0xB just because A
1822 // didn't sample it. Without the `all_probes_succeeded`
1823 // guard, `peers_seen_inventory = {0xA}` and `retain`
1824 // would wipe 0xB even though no probe authoritatively
1825 // observed its departure.
1826 use std::sync::atomic::{AtomicUsize, Ordering as AOrdering};
1827 struct SteadyProbe {
1828 peer: u64,
1829 cpu: f64,
1830 }
1831 impl super::super::probes::InventoryProbe for SteadyProbe {
1832 fn inventory_samples(&self) -> Vec<(u64, super::super::probes::PeerInventory)> {
1833 vec![(
1834 self.peer,
1835 super::super::probes::PeerInventory {
1836 cpu_load_1m: Some(self.cpu),
1837 ..Default::default()
1838 },
1839 )]
1840 }
1841 }
1842 struct SometimesPanickingProbe {
1843 tick: AtomicUsize,
1844 peer: u64,
1845 cpu: f64,
1846 }
1847 impl super::super::probes::InventoryProbe for SometimesPanickingProbe {
1848 fn inventory_samples(&self) -> Vec<(u64, super::super::probes::PeerInventory)> {
1849 let t = self.tick.fetch_add(1, AOrdering::Relaxed);
1850 if t == 0 {
1851 vec![(
1852 self.peer,
1853 super::super::probes::PeerInventory {
1854 cpu_load_1m: Some(self.cpu),
1855 ..Default::default()
1856 },
1857 )]
1858 } else {
1859 panic!("probe B panicked on tick {t}");
1860 }
1861 }
1862 }
1863 let registry = ProbeRegistry::new();
1864 registry.add_inventory_probe(Arc::new(SteadyProbe {
1865 peer: 0xA,
1866 cpu: 0.1,
1867 }));
1868 registry.add_inventory_probe(Arc::new(SometimesPanickingProbe {
1869 tick: AtomicUsize::new(0),
1870 peer: 0xB,
1871 cpu: 0.2,
1872 }));
1873 let cfg = MeshOsConfig {
1874 tick_interval: StdDuration::from_millis(10),
1875 ..fast_test_config()
1876 };
1877 let MeshOsLoopParts {
1878 mesh_loop: loop_,
1879 handle,
1880 actions_rx: _actions_rx,
1881 reader,
1882 } = MeshOsLoop::new(cfg);
1883 let loop_ = loop_.with_probe_registry(registry);
1884 let task = tokio::spawn(loop_.run());
1885 // Wait for the first tick (both probes succeed) and
1886 // several subsequent ticks (probe B panics).
1887 tokio::time::sleep(StdDuration::from_millis(80)).await;
1888 let snap = reader.read();
1889 let inv_peers: std::collections::BTreeSet<u64> = snap
1890 .peers
1891 .iter()
1892 .filter(|(_, p)| p.cpu_load_1m.is_some())
1893 .map(|(id, _)| *id)
1894 .collect();
1895 handle.publish(MeshOsEvent::Shutdown).await.unwrap();
1896 let _ = tokio::time::timeout(StdDuration::from_secs(2), task).await;
1897 assert!(
1898 inv_peers.contains(&0xA),
1899 "probe A's peer should still be in inventory: {inv_peers:?}",
1900 );
1901 assert!(
1902 inv_peers.contains(&0xB),
1903 "probe B's peer must survive probe B's panic — partial-panic ticks are not authoritative for the panicking probe's peers: {inv_peers:?}",
1904 );
1905 }
1906
1907 #[tokio::test]
1908 async fn inventory_gc_does_not_wipe_on_a_transient_empty_probe() {
1909 // The InventoryProbe trait permits an `Ok(vec![])`
1910 // return ("no peers this tick" — transient procfs
1911 // unavailability, cold start, etc.). Without the
1912 // `any_probe_saw_samples` guard the GC pass would
1913 // treat the empty return as "no peers authoritatively
1914 // exist anymore" and wipe every previously-seen peer's
1915 // inventory.
1916 use std::sync::atomic::{AtomicUsize, Ordering as AOrdering};
1917 struct FlakyProbe {
1918 tick: AtomicUsize,
1919 }
1920 impl super::super::probes::InventoryProbe for FlakyProbe {
1921 fn inventory_samples(&self) -> Vec<(u64, super::super::probes::PeerInventory)> {
1922 let t = self.tick.fetch_add(1, AOrdering::Relaxed);
1923 // Tick 0: populate. Subsequent ticks: empty
1924 // (simulate the resource probe losing its
1925 // procfs handle for a few cycles).
1926 if t == 0 {
1927 vec![
1928 (
1929 0xA,
1930 super::super::probes::PeerInventory {
1931 cpu_load_1m: Some(0.1),
1932 ..Default::default()
1933 },
1934 ),
1935 (
1936 0xB,
1937 super::super::probes::PeerInventory {
1938 cpu_load_1m: Some(0.2),
1939 ..Default::default()
1940 },
1941 ),
1942 ]
1943 } else {
1944 vec![]
1945 }
1946 }
1947 }
1948 let registry = ProbeRegistry::new();
1949 registry.add_inventory_probe(Arc::new(FlakyProbe {
1950 tick: AtomicUsize::new(0),
1951 }));
1952 let cfg = MeshOsConfig {
1953 tick_interval: StdDuration::from_millis(10),
1954 ..fast_test_config()
1955 };
1956 let MeshOsLoopParts {
1957 mesh_loop: loop_,
1958 handle,
1959 actions_rx: _actions_rx,
1960 reader,
1961 } = MeshOsLoop::new(cfg);
1962 let loop_ = loop_.with_probe_registry(registry);
1963 let task = tokio::spawn(loop_.run());
1964 // Wait long enough for the populating tick + several
1965 // empty ticks.
1966 tokio::time::sleep(StdDuration::from_millis(80)).await;
1967 let snap = reader.read();
1968 let inv_peers: std::collections::BTreeSet<u64> = snap
1969 .peers
1970 .iter()
1971 .filter(|(_, p)| p.cpu_load_1m.is_some())
1972 .map(|(id, _)| *id)
1973 .collect();
1974 handle.publish(MeshOsEvent::Shutdown).await.unwrap();
1975 let _ = tokio::time::timeout(StdDuration::from_secs(2), task).await;
1976 assert!(
1977 inv_peers.contains(&0xA),
1978 "transient empty probe return must not wipe inventory: {inv_peers:?}",
1979 );
1980 assert!(
1981 inv_peers.contains(&0xB),
1982 "transient empty probe return must not wipe inventory: {inv_peers:?}",
1983 );
1984 }
1985
1986 #[tokio::test]
1987 async fn snapshot_reader_does_not_stall_under_concurrent_reads() {
1988 // With ArcSwap, publish is a pointer store and read is
1989 // a pointer load + Arc clone; concurrent readers cannot
1990 // stall the publisher. Smoke-test by spawning many
1991 // readers polling the snapshot in a tight loop while
1992 // the loop ticks repeatedly.
1993 let cfg = MeshOsConfig {
1994 tick_interval: StdDuration::from_millis(5),
1995 ..fast_test_config()
1996 };
1997 let MeshOsLoopParts {
1998 mesh_loop: loop_,
1999 handle,
2000 actions_rx: _actions_rx,
2001 reader,
2002 } = MeshOsLoop::new(cfg);
2003 let task = tokio::spawn(loop_.run());
2004
2005 let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
2006 let mut readers = Vec::new();
2007 for _ in 0..8 {
2008 let reader = reader.clone();
2009 let stop = Arc::clone(&stop);
2010 readers.push(tokio::spawn(async move {
2011 let mut count = 0u64;
2012 while !stop.load(Ordering::Relaxed) {
2013 let _snap = reader.read();
2014 count += 1;
2015 tokio::task::yield_now().await;
2016 }
2017 count
2018 }));
2019 }
2020 // Let the loop fire ~10 ticks.
2021 tokio::time::sleep(StdDuration::from_millis(60)).await;
2022 stop.store(true, Ordering::Relaxed);
2023 handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2024 let _loop_count = task.await.expect("loop join");
2025 let mut total = 0u64;
2026 for r in readers {
2027 total += r.await.expect("reader join");
2028 }
2029 assert!(
2030 total > 0,
2031 "readers should have made progress while the loop published",
2032 );
2033 }
2034
2035 #[tokio::test]
2036 async fn dropped_actions_counter_increments_when_action_queue_is_full() {
2037 // Regression for C4: reconcile output silently dropped
2038 // when the action mpsc is at capacity. Make the queue
2039 // tiny, hold the receiver without draining, project a
2040 // reconcile pass that emits multiple actions, assert the
2041 // counter caught the drop.
2042 let cfg = MeshOsConfig {
2043 action_queue_capacity: 1,
2044 tick_interval: StdDuration::from_millis(10),
2045 ..fast_test_config()
2046 };
2047 let this_node = cfg.this_node;
2048 let MeshOsLoopParts {
2049 mesh_loop: loop_,
2050 handle,
2051 actions_rx: _actions_rx,
2052 reader: _reader,
2053 } = MeshOsLoop::new(cfg);
2054 let counter = loop_.dropped_actions_counter();
2055 let task = tokio::spawn(loop_.run());
2056 // Put `this_node` as a holder of five chains.
2057 for chain in 1..=5u64 {
2058 handle
2059 .publish(MeshOsEvent::ReplicaUpdate(ReplicaUpdate::Added {
2060 chain: chain as ChainId,
2061 holder: this_node,
2062 }))
2063 .await
2064 .unwrap();
2065 }
2066 // Project local Drop intent → reconcile emits one
2067 // DropReplica per chain on the next tick.
2068 for chain in 1..=5u64 {
2069 handle
2070 .publish(MeshOsEvent::LocalReplicaIntent(LocalReplicaIntentUpdate {
2071 chain: chain as ChainId,
2072 intent: LocalReplicaIntent::Drop,
2073 }))
2074 .await
2075 .unwrap();
2076 }
2077 // Let several ticks fire.
2078 tokio::time::sleep(StdDuration::from_millis(80)).await;
2079 handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2080 let _ = task.await;
2081 let dropped = counter.load(Ordering::Relaxed);
2082 assert!(
2083 dropped >= 1,
2084 "expected at least one dropped reconcile action with \
2085 action_queue_capacity = 1; got {dropped}",
2086 );
2087 }
2088
2089 #[tokio::test]
2090 async fn ticks_drive_reconcile_passes() {
2091 let cfg = MeshOsConfig {
2092 tick_interval: StdDuration::from_millis(20),
2093 ..fast_test_config()
2094 };
2095 let MeshOsLoopParts {
2096 mesh_loop: loop_,
2097 handle,
2098 actions_rx: _,
2099 reader: _,
2100 } = MeshOsLoop::new(cfg);
2101 let task = tokio::spawn(loop_.run());
2102 // Let several ticks fire.
2103 tokio::time::sleep(StdDuration::from_millis(120)).await;
2104 handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2105 let count = task.await.expect("join");
2106 // At a 20 ms tick over a 120 ms window we expect roughly
2107 // 4–7 reconcile passes; require at least 2 so a slow CI
2108 // host doesn't flake.
2109 assert!(
2110 count >= 2,
2111 "expected at least 2 reconcile passes, got {count}"
2112 );
2113 }
2114
2115 #[tokio::test]
2116 async fn locality_probe_samples_fold_into_actual_rtt_via_snapshot() {
2117 // A LocalityProbe is polled on every Tick; its samples
2118 // land in MeshOsState::rtt; the snapshot's peers map
2119 // surfaces them.
2120 struct FixedProbe(Vec<(u64, std::time::Duration)>);
2121 impl super::super::probes::LocalityProbe for FixedProbe {
2122 fn rtt_samples(&self) -> Vec<(u64, std::time::Duration)> {
2123 self.0.clone()
2124 }
2125 }
2126
2127 let registry = ProbeRegistry::new();
2128 registry.add_locality_probe(std::sync::Arc::new(FixedProbe(vec![
2129 (10, std::time::Duration::from_millis(33)),
2130 (11, std::time::Duration::from_millis(150)),
2131 ])));
2132 let MeshOsLoopParts {
2133 mesh_loop: loop_,
2134 handle,
2135 actions_rx: _,
2136 reader,
2137 } = MeshOsLoop::new(fast_test_config());
2138 let loop_ = loop_.with_probe_registry(registry);
2139 let task = tokio::spawn(loop_.run());
2140
2141 tokio::time::sleep(StdDuration::from_millis(80)).await;
2142 let snap = reader.read();
2143 // Peer 10 surfaces with 33 ms.
2144 let p10 = snap.peers.get(&10).expect("peer 10 in snapshot");
2145 assert_eq!(p10.rtt_ms, Some(33));
2146 let p11 = snap.peers.get(&11).expect("peer 11 in snapshot");
2147 assert_eq!(p11.rtt_ms, Some(150));
2148
2149 handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2150 let _ = task.await;
2151 }
2152
2153 #[tokio::test]
2154 async fn health_probe_samples_fold_into_actual_health_via_snapshot() {
2155 struct FixedProbe(Vec<(u64, super::super::event::NodeHealth)>);
2156 impl super::super::probes::HealthProbe for FixedProbe {
2157 fn health_samples(&self) -> Vec<(u64, super::super::event::NodeHealth)> {
2158 self.0.clone()
2159 }
2160 }
2161
2162 let registry = ProbeRegistry::new();
2163 registry.add_health_probe(std::sync::Arc::new(FixedProbe(vec![(
2164 5,
2165 super::super::event::NodeHealth::Unreachable,
2166 )])));
2167 let MeshOsLoopParts {
2168 mesh_loop: loop_,
2169 handle,
2170 actions_rx: _,
2171 reader,
2172 } = MeshOsLoop::new(fast_test_config());
2173 let loop_ = loop_.with_probe_registry(registry);
2174 let task = tokio::spawn(loop_.run());
2175
2176 tokio::time::sleep(StdDuration::from_millis(80)).await;
2177 let snap = reader.read();
2178 let p5 = snap.peers.get(&5).expect("peer 5 in snapshot");
2179 // Wire form differs from the enum form but the
2180 // discriminator matches.
2181 assert!(matches!(
2182 p5.health,
2183 Some(super::super::snapshot::PeerHealthSnapshot::Unreachable)
2184 ));
2185
2186 handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2187 let _ = task.await;
2188 }
2189
2190 #[tokio::test]
2191 async fn probe_registry_attached_post_construction_is_polled_on_next_tick() {
2192 // The runtime / production pattern: construct the loop,
2193 // pass a shared registry through `with_probe_registry`,
2194 // THEN install probes on the registry. The next Tick
2195 // picks them up because both ends share the same Arc.
2196 struct FixedProbe;
2197 impl super::super::probes::LocalityProbe for FixedProbe {
2198 fn rtt_samples(&self) -> Vec<(u64, std::time::Duration)> {
2199 vec![(99, std::time::Duration::from_millis(7))]
2200 }
2201 }
2202
2203 let registry = ProbeRegistry::new();
2204 let MeshOsLoopParts {
2205 mesh_loop: loop_,
2206 handle,
2207 actions_rx: _,
2208 reader,
2209 } = MeshOsLoop::new(fast_test_config());
2210 let loop_ = loop_.with_probe_registry(registry.clone());
2211 let task = tokio::spawn(loop_.run());
2212
2213 // Add the probe AFTER spawning the loop — the shared
2214 // Arc means the loop sees it on the next Tick.
2215 registry.add_locality_probe(std::sync::Arc::new(FixedProbe));
2216 tokio::time::sleep(StdDuration::from_millis(80)).await;
2217
2218 let snap = reader.read();
2219 let p99 = snap.peers.get(&99).expect("peer 99 in snapshot");
2220 assert_eq!(p99.rtt_ms, Some(7));
2221
2222 handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2223 let _ = task.await;
2224 }
2225
2226 #[tokio::test]
2227 async fn snapshot_reader_returns_updated_state_after_each_tick() {
2228 // The reader should reflect the most recent
2229 // post-reconcile snapshot. Fire some events that change
2230 // state, let ticks fire, sample the reader.
2231 let MeshOsLoopParts {
2232 mesh_loop: loop_,
2233 handle,
2234 actions_rx: _,
2235 reader,
2236 } = MeshOsLoop::new(fast_test_config());
2237 let task = tokio::spawn(loop_.run());
2238
2239 // Add a replica observation.
2240 handle
2241 .publish(MeshOsEvent::ReplicaUpdate(ReplicaUpdate::Added {
2242 chain: 0xC0FFEE,
2243 holder: 7,
2244 }))
2245 .await
2246 .unwrap();
2247 // Give ticks time to fire + reconcile + publish.
2248 tokio::time::sleep(StdDuration::from_millis(100)).await;
2249
2250 let snap = reader.read();
2251 let entry = snap
2252 .replicas
2253 .get(&0xC0FFEE)
2254 .expect("snapshot should carry the replica");
2255 assert_eq!(entry.holders, vec![7]);
2256
2257 handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2258 let _ = task.await;
2259 }
2260
2261 #[tokio::test]
2262 async fn snapshot_reader_is_cloneable_and_sees_same_state() {
2263 let MeshOsLoopParts {
2264 mesh_loop: loop_,
2265 handle,
2266 actions_rx: _,
2267 reader: reader_a,
2268 } = MeshOsLoop::new(fast_test_config());
2269 let reader_b = reader_a.clone();
2270 let task = tokio::spawn(loop_.run());
2271
2272 handle
2273 .publish(MeshOsEvent::ReplicaUpdate(ReplicaUpdate::Added {
2274 chain: 1,
2275 holder: 1,
2276 }))
2277 .await
2278 .unwrap();
2279 tokio::time::sleep(StdDuration::from_millis(100)).await;
2280
2281 let snap_a = reader_a.read();
2282 let snap_b = reader_b.read();
2283 assert_eq!(snap_a, snap_b);
2284
2285 handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2286 let _ = task.await;
2287 }
2288
2289 #[tokio::test]
2290 async fn loop_drains_event_burst_without_panicking() {
2291 // Smoke test: the loop accepts a burst of arbitrary
2292 // events and exits cleanly when shutdown is published.
2293 // The fold-side ordering property is asserted directly
2294 // on `MeshOsState::apply` in `state::tests` — that
2295 // covers the substantive ordering guarantee without
2296 // having to crack open the consumed-loop's state.
2297 let MeshOsLoopParts {
2298 mesh_loop: loop_,
2299 handle,
2300 actions_rx: _,
2301 reader: _,
2302 } = MeshOsLoop::new(fast_test_config());
2303
2304 let chain: ChainId = 0xC0FFEE;
2305 let probe = tokio::spawn(async move {
2306 handle
2307 .publish(MeshOsEvent::ReplicaUpdate(ReplicaUpdate::Added {
2308 chain,
2309 holder: 11,
2310 }))
2311 .await
2312 .unwrap();
2313 handle
2314 .publish(MeshOsEvent::ReplicaUpdate(ReplicaUpdate::Added {
2315 chain,
2316 holder: 12,
2317 }))
2318 .await
2319 .unwrap();
2320 handle
2321 .publish(MeshOsEvent::ReplicaUpdate(ReplicaUpdate::Removed {
2322 chain,
2323 holder: 11,
2324 }))
2325 .await
2326 .unwrap();
2327 handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2328 });
2329
2330 let task = tokio::spawn(loop_.run());
2331 probe.await.expect("publisher panicked");
2332 let _count = tokio::time::timeout(StdDuration::from_millis(200), task)
2333 .await
2334 .expect("loop did not exit")
2335 .expect("join");
2336 }
2337
2338 #[test]
2339 fn loop_construction_returns_handle_and_actions_receiver() {
2340 // Compile + type-check: `new` returns the triple, the
2341 // handle is cloneable, the actions receiver is the
2342 // documented type.
2343 let MeshOsLoopParts {
2344 mesh_loop: _loop_,
2345 handle,
2346 actions_rx: _actions_rx,
2347 reader: _,
2348 } = MeshOsLoop::new(MeshOsConfig::default());
2349 let _clone = handle.clone();
2350 }
2351
2352 #[tokio::test]
2353 async fn try_publish_surfaces_queue_full_under_saturation() {
2354 // Capacity-1 channel, loop not yet running — second
2355 // try_publish should hit QueueFull rather than block.
2356 let cfg = MeshOsConfig {
2357 event_queue_capacity: 1,
2358 ..fast_test_config()
2359 };
2360 let MeshOsLoopParts {
2361 mesh_loop: loop_,
2362 handle,
2363 actions_rx: _,
2364 reader: _,
2365 } = MeshOsLoop::new(cfg);
2366 handle.try_publish(MeshOsEvent::Tick).unwrap();
2367 match handle.try_publish(MeshOsEvent::Tick) {
2368 Err(MeshOsHandleError::QueueFull) => {}
2369 other => panic!("expected QueueFull, got {other:?}"),
2370 }
2371 drop(handle);
2372 // Drain so the loop exits.
2373 let _ = loop_.run().await;
2374 }
2375
2376 #[tokio::test]
2377 async fn shutdown_event_short_circuits_pending_events_after_it() {
2378 // Per the loop contract: Shutdown breaks the loop the
2379 // moment it's dequeued. Events queued behind Shutdown
2380 // must NOT mutate state — the post-Shutdown
2381 // ReplicaUpdate below should leave the snapshot's
2382 // replica fold empty. Use a long tick interval so the
2383 // reconcile arm doesn't race the assertion.
2384 let cfg = MeshOsConfig {
2385 tick_interval: StdDuration::from_secs(60),
2386 ..fast_test_config()
2387 };
2388 let MeshOsLoopParts {
2389 mesh_loop: loop_,
2390 handle,
2391 actions_rx: _,
2392 reader,
2393 } = MeshOsLoop::new(cfg);
2394 handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2395 // Post-Shutdown event: enqueued behind Shutdown. The
2396 // loop must NOT apply it before exiting.
2397 handle
2398 .publish(MeshOsEvent::ReplicaUpdate(ReplicaUpdate::Added {
2399 chain: 1,
2400 holder: 1,
2401 }))
2402 .await
2403 .unwrap();
2404 let count = tokio::time::timeout(StdDuration::from_secs(1), loop_.run())
2405 .await
2406 .expect("loop did not exit on Shutdown");
2407 // Reconcile never fired (Shutdown short-circuits the
2408 // loop before any tick).
2409 assert_eq!(count, 0, "Shutdown must break before reconcile fires");
2410 // The post-Shutdown ReplicaUpdate must NOT have applied —
2411 // the snapshot's replica fold stays empty.
2412 let snap = reader.read();
2413 assert!(
2414 snap.replicas.is_empty(),
2415 "post-Shutdown ReplicaUpdate must not enter the fold; saw {} entries",
2416 snap.replicas.len(),
2417 );
2418 }
2419
2420 #[tokio::test]
2421 async fn reconcile_emitted_at_anchored_on_last_tick_for_replay_determinism() {
2422 // Reconcile must stamp every PendingAction.emitted_at on
2423 // the same Instant the Tick fold wrote into
2424 // actual.last_tick — not on a fresh Instant::now(). The
2425 // snapshot derives `age_ms` from
2426 // `now.saturating_duration_since(emitted_at)` against the
2427 // same anchor, so an action emitted in the latest tick
2428 // renders with age_ms == 0.
2429 //
2430 // We use a long tick interval and shut down right after
2431 // the first tick fires so the read sees actions whose
2432 // emitted_at matches the last_tick the snapshot is built
2433 // against.
2434 use super::super::event::AdminEvent;
2435 let cfg = MeshOsConfig {
2436 tick_interval: StdDuration::from_millis(80),
2437 ..fast_test_config()
2438 };
2439 let MeshOsLoopParts {
2440 mesh_loop: loop_,
2441 handle,
2442 // Keep the receiver alive so the reconciler's try_send
2443 // succeeds and the action lands in recent_emissions —
2444 // pre-fix recent_emissions populated regardless of
2445 // try_send outcome (an actions_tx.Closed would still
2446 // mark the action as "emitted" in the snapshot).
2447 actions_rx: _actions_rx,
2448 reader,
2449 } = MeshOsLoop::new(cfg);
2450 let task = tokio::spawn(loop_.run());
2451 // Drive an EnterMaintenance — reconcile emits a
2452 // CommitMaintenanceTransition that lands in the
2453 // snapshot's `pending` mirror.
2454 handle
2455 .publish(MeshOsEvent::AdminEvent(AdminEvent::EnterMaintenance {
2456 node: 1,
2457 drain_for: None,
2458 }))
2459 .await
2460 .unwrap();
2461 // Wait long enough for ONE tick to fire (80 ms), then
2462 // shutdown before a second tick.
2463 tokio::time::sleep(StdDuration::from_millis(100)).await;
2464 handle.publish(MeshOsEvent::Shutdown).await.unwrap();
2465 let _ = task.await;
2466 let snap = reader.read();
2467 assert!(
2468 !snap.recently_emitted.is_empty(),
2469 "expected at least one recently-emitted action; saw none",
2470 );
2471 // The latest tick anchors the snapshot's now. Actions
2472 // emitted in that tick render with age_ms == 0 only when
2473 // reconcile uses last_tick for emitted_at.
2474 let zero_age_count = snap
2475 .recently_emitted
2476 .iter()
2477 .filter(|p| p.age_ms == 0)
2478 .count();
2479 assert!(
2480 zero_age_count >= 1,
2481 "expected at least one action emitted in the snapshot's tick to render \
2482 age_ms == 0 (anchored on last_tick); recently_emitted = {:?}",
2483 snap.recently_emitted,
2484 );
2485 }
2486
2487 #[tokio::test]
2488 async fn log_chain_appender_receives_every_published_log_line() {
2489 use super::super::log_chain::BufferingLogChainAppender;
2490 use super::super::logs::{LogLevel, LogLine};
2491 let appender = Arc::new(BufferingLogChainAppender::default());
2492 let MeshOsLoopParts {
2493 mesh_loop: loop_,
2494 handle,
2495 actions_rx: _,
2496 reader,
2497 } = MeshOsLoop::new(fast_test_config());
2498 let loop_ = loop_.with_log_appender(
2499 appender.clone() as Arc<dyn super::super::log_chain::LogChainAppender>
2500 );
2501 let task = tokio::spawn(loop_.run());
2502
2503 for (i, level) in [LogLevel::Info, LogLevel::Warn, LogLevel::Error]
2504 .into_iter()
2505 .enumerate()
2506 {
2507 handle
2508 .publish(MeshOsEvent::LogLine(LogLine {
2509 level,
2510 daemon_id: Some(7),
2511 message: format!("msg {i}"),
2512 }))
2513 .await
2514 .unwrap();
2515 }
2516 tokio::time::sleep(StdDuration::from_millis(60)).await;
2517
2518 let captured = appender.captured();
2519 assert_eq!(captured.len(), 3, "appender should see three records");
2520 let snap = reader.read();
2521 assert_eq!(snap.log_ring.len(), 3, "ring should hold three records");
2522 // Appender + ring see the SAME records (seq + content match
2523 // for each).
2524 for (i, captured_record) in captured.iter().enumerate().take(3) {
2525 assert_eq!(snap.log_ring[i].seq, captured_record.seq);
2526 assert_eq!(snap.log_ring[i].message, captured_record.message);
2527 }
2528
2529 drop(handle);
2530 let _ = tokio::time::timeout(StdDuration::from_secs(1), task).await;
2531 }
2532
2533 #[tokio::test]
2534 async fn admin_audit_chain_appender_receives_every_recorded_commit() {
2535 // Wire a buffering appender into the loop; drive an
2536 // admin event through; the appender + the snapshot
2537 // ring should each carry one record with the same seq.
2538 use super::super::audit_chain::BufferingAdminAuditChainAppender;
2539 use super::super::event::AdminEvent;
2540 let appender = Arc::new(BufferingAdminAuditChainAppender::default());
2541 let MeshOsLoopParts {
2542 mesh_loop: loop_,
2543 handle,
2544 actions_rx: _,
2545 reader,
2546 } = MeshOsLoop::new(fast_test_config());
2547 let loop_ = loop_.with_admin_audit_appender(
2548 appender.clone() as Arc<dyn super::super::audit_chain::AdminAuditChainAppender>
2549 );
2550 let task = tokio::spawn(loop_.run());
2551
2552 // Publish an unsigned admin event — fold records it on
2553 // the ring with Unverified outcome AND fires the
2554 // appender.
2555 handle
2556 .publish(MeshOsEvent::AdminEvent(AdminEvent::Cordon { node: 42 }))
2557 .await
2558 .unwrap();
2559 tokio::time::sleep(StdDuration::from_millis(60)).await;
2560
2561 let captured = appender.captured();
2562 assert_eq!(captured.len(), 1, "appender should see one record");
2563 assert_eq!(captured[0].event, AdminEvent::Cordon { node: 42 });
2564
2565 let snap = reader.read();
2566 assert_eq!(snap.admin_audit.len(), 1, "ring should hold one record");
2567 // The appender + ring see the SAME record (seq + content match).
2568 assert_eq!(snap.admin_audit[0].seq, captured[0].seq);
2569 // Successful append → ring entry has chain_pending == false.
2570 assert!(
2571 !snap.admin_audit[0].chain_pending,
2572 "ring entry must NOT be marked chain_pending when the appender succeeded"
2573 );
2574
2575 // Tidy.
2576 drop(handle);
2577 let _ = tokio::time::timeout(StdDuration::from_secs(1), task).await;
2578 }
2579
2580 /// Pin that ring-first + mark-chain_pending kicks in when the
2581 /// audit chain appender returns an error: ring still records
2582 /// the entry but flags it so chain consumers can distinguish
2583 /// "permanent gap" from "haven't replicated yet."
2584 #[tokio::test]
2585 async fn audit_ring_flags_chain_pending_on_appender_failure() {
2586 use super::super::audit_chain::{AdminAuditAppendError, AdminAuditChainAppender};
2587 use super::super::event::AdminEvent;
2588 use super::super::ice::AdminAuditRecord;
2589
2590 struct FailingAuditAppender;
2591 impl AdminAuditChainAppender for FailingAuditAppender {
2592 fn append(&self, _record: &AdminAuditRecord) -> Result<(), AdminAuditAppendError> {
2593 Err(AdminAuditAppendError {
2594 reason: "test-injected appender failure".into(),
2595 })
2596 }
2597 }
2598
2599 let MeshOsLoopParts {
2600 mesh_loop: loop_,
2601 handle,
2602 actions_rx: _,
2603 reader,
2604 } = MeshOsLoop::new(fast_test_config());
2605 let loop_ = loop_.with_admin_audit_appender(Arc::new(FailingAuditAppender));
2606 let task = tokio::spawn(loop_.run());
2607
2608 handle
2609 .publish(MeshOsEvent::AdminEvent(AdminEvent::Cordon { node: 99 }))
2610 .await
2611 .unwrap();
2612 tokio::time::sleep(StdDuration::from_millis(60)).await;
2613
2614 let snap = reader.read();
2615 assert_eq!(
2616 snap.admin_audit.len(),
2617 1,
2618 "ring must still record the entry"
2619 );
2620 assert!(
2621 snap.admin_audit[0].chain_pending,
2622 "ring entry must be flagged chain_pending after the appender returned Err"
2623 );
2624
2625 drop(handle);
2626 let _ = tokio::time::timeout(StdDuration::from_secs(1), task).await;
2627 }
2628
2629 #[tokio::test]
2630 async fn kill_migration_dispatches_to_installed_aborter() {
2631 use super::super::event::AdminEvent;
2632 use super::super::migration_aborter::{BufferingMigrationAborter, MigrationAborter};
2633 let aborter = Arc::new(BufferingMigrationAborter::default());
2634 let MeshOsLoopParts {
2635 mesh_loop: loop_,
2636 handle,
2637 actions_rx: _,
2638 reader: _,
2639 } = MeshOsLoop::new(fast_test_config());
2640 let loop_ = loop_.with_migration_aborter(aborter.clone() as Arc<dyn MigrationAborter>);
2641 let task = tokio::spawn(loop_.run());
2642
2643 handle
2644 .publish(MeshOsEvent::AdminEvent(AdminEvent::KillMigration {
2645 migration: 0xCAFE,
2646 }))
2647 .await
2648 .unwrap();
2649 tokio::time::sleep(StdDuration::from_millis(60)).await;
2650
2651 assert_eq!(aborter.captured(), vec![0xCAFE]);
2652
2653 drop(handle);
2654 let _ = tokio::time::timeout(StdDuration::from_secs(1), task).await;
2655 }
2656
2657 #[tokio::test]
2658 async fn non_kill_admin_events_do_not_invoke_aborter() {
2659 use super::super::event::AdminEvent;
2660 use super::super::migration_aborter::{BufferingMigrationAborter, MigrationAborter};
2661 let aborter = Arc::new(BufferingMigrationAborter::default());
2662 let MeshOsLoopParts {
2663 mesh_loop: loop_,
2664 handle,
2665 actions_rx: _,
2666 reader: _,
2667 } = MeshOsLoop::new(fast_test_config());
2668 let loop_ = loop_.with_migration_aborter(aborter.clone() as Arc<dyn MigrationAborter>);
2669 let task = tokio::spawn(loop_.run());
2670
2671 handle
2672 .publish(MeshOsEvent::AdminEvent(AdminEvent::Cordon { node: 42 }))
2673 .await
2674 .unwrap();
2675 tokio::time::sleep(StdDuration::from_millis(60)).await;
2676
2677 assert!(aborter.captured().is_empty());
2678
2679 drop(handle);
2680 let _ = tokio::time::timeout(StdDuration::from_secs(1), task).await;
2681 }
2682
2683 /// `clear_inventory_probes` must drop every installed
2684 /// inventory probe (and only those) so callers swapping
2685 /// probe sources mid-flight can detach the previous set
2686 /// before installing replacements. Without this, a stale
2687 /// probe left in the append-only registry can stomp the
2688 /// new one's samples via last-writer-wins per peer.
2689 #[test]
2690 fn clear_inventory_probes_drops_installed_probes_only() {
2691 struct DummyInventoryProbe;
2692 impl super::super::probes::InventoryProbe for DummyInventoryProbe {
2693 fn inventory_samples(&self) -> Vec<(u64, super::super::probes::PeerInventory)> {
2694 vec![]
2695 }
2696 }
2697 struct DummyLocalityProbe;
2698 impl super::super::probes::LocalityProbe for DummyLocalityProbe {
2699 fn rtt_samples(&self) -> Vec<(u64, StdDuration)> {
2700 vec![]
2701 }
2702 }
2703 struct DummyHealthProbe;
2704 impl super::super::probes::HealthProbe for DummyHealthProbe {
2705 fn health_samples(&self) -> Vec<(u64, super::super::event::NodeHealth)> {
2706 vec![]
2707 }
2708 }
2709
2710 let reg = ProbeRegistry::new();
2711 reg.add_locality_probe(Arc::new(DummyLocalityProbe));
2712 reg.add_health_probe(Arc::new(DummyHealthProbe));
2713 reg.add_inventory_probe(Arc::new(DummyInventoryProbe));
2714 reg.add_inventory_probe(Arc::new(DummyInventoryProbe));
2715 assert_eq!(reg.probe_counts(), (1, 1, 2));
2716
2717 reg.clear_inventory_probes();
2718 assert_eq!(
2719 reg.probe_counts(),
2720 (1, 1, 0),
2721 "inventory cleared; locality + health untouched"
2722 );
2723
2724 // The other clear paths are symmetric — verify they
2725 // also touch only their own list.
2726 reg.clear_locality_probes();
2727 assert_eq!(reg.probe_counts(), (0, 1, 0));
2728 reg.clear_health_probes();
2729 assert_eq!(reg.probe_counts(), (0, 0, 0));
2730
2731 // Post-clear, re-install must succeed and count
2732 // independently — the underlying Vec wasn't replaced
2733 // with something half-broken.
2734 reg.add_inventory_probe(Arc::new(DummyInventoryProbe));
2735 assert_eq!(reg.probe_counts(), (0, 0, 1));
2736 }
2737}