Skip to main content

meerkat_runtime/
ops_lifecycle.rs

1//! In-memory runtime implementation of the shared async-operation lifecycle seam.
2//!
3//! Per-operation canonical lifecycle state lives in the MeerkatMachine DSL
4//! authority (`op_statuses`, `op_terminal_outcomes`, `op_kinds`,
5//! `op_sources`, `op_peer_ready`, `op_progress_counts`, `active_op_count`,
6//! `wait_active`, `wait_operation_ids`). This shell layer owns pure mechanics: watcher
7//! channels, timestamps, peer handles, snapshot assembly, FIFO eviction
8//! bookkeeping, the completion feed buffer, and typed delivery of generated
9//! admission/rejection feedback.
10//!
11//! Per-transition legality ("is `CompleteOp` legal on a `Provisioning` op?")
12//! is NOT owned by the shell — it lives in the DSL's `from_status_valid`
13//! guards on each op-lifecycle transition. The shell's only job on a
14//! `GuardRejected` rejection is to ask the generated rejection resolver for
15//! the public result class.
16
17use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
18use std::future::Future;
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
21use std::task::{Context, Poll};
22
23use meerkat_core::completion_feed::{
24    CompletionBatch, CompletionEntry, CompletionFeed, CompletionSeq,
25};
26
27#[cfg(target_arch = "wasm32")]
28use crate::tokio;
29use meerkat_core::lifecycle::{RunId, WaitRequestId};
30use meerkat_core::ops_lifecycle::{
31    CompletionCursorConsumer, DEFAULT_MAX_COMPLETED, OperationCompletionWakeClass,
32    OperationCompletionWatch, OperationId, OperationKind, OperationLifecycleAction,
33    OperationLifecycleSnapshot, OperationPeerHandle, OperationProgressUpdate,
34    OperationPublicResultClass, OperationResult, OperationSource, OperationSpec, OperationStatus,
35    OperationTerminalOutcome, OpsLifecycleError, OpsLifecycleRegistry, WaitAllResult,
36    WaitAllSatisfied,
37};
38use meerkat_core::time_compat::{Instant, SystemTime, UNIX_EPOCH};
39use meerkat_core::types::SessionId;
40
41use crate::meerkat_machine::dsl as mm_dsl;
42
43// ---------------------------------------------------------------------------
44// Serde-only persisted canonical state shells
45// ---------------------------------------------------------------------------
46//
47// These structures preserve the persisted fact shape of `PersistedOpsSnapshot`.
48// They are pure serde shells — no methods beyond read-only field accessors,
49// no authority behavior. Optional generated facts that can be explicitly
50// absent still serialize as present `null` so recovery can distinguish
51// generated "none" from a missing persisted fact.
52
53fn deserialize_required_operation_source<'de, D>(
54    deserializer: D,
55) -> Result<Option<OperationSource>, D::Error>
56where
57    D: serde::Deserializer<'de>,
58{
59    <Option<OperationSource> as serde::Deserialize>::deserialize(deserializer)
60}
61
62/// Canonical per-operation state as captured in persisted snapshots.
63#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
64pub struct OperationCanonicalState {
65    status: OperationStatus,
66    kind: OperationKind,
67    #[serde(deserialize_with = "deserialize_required_operation_source")]
68    operation_source: Option<OperationSource>,
69    peer_ready: bool,
70    progress_count: u32,
71    watcher_count: u32,
72    terminal_outcome: Option<OperationTerminalOutcome>,
73    #[serde(default, skip_serializing_if = "Option::is_none")]
74    completion_sequence: Option<CompletionSeq>,
75    terminal_buffered: bool,
76}
77
78/// Generated-owned public completion feed fact captured from DSL authority.
79#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
80pub struct CompletionFeedCanonicalState {
81    seq: CompletionSeq,
82    kind: OperationKind,
83    terminal_outcome: OperationTerminalOutcome,
84}
85
86/// Canonical registry-level state as captured in persisted snapshots.
87#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
88pub struct RegistryCanonicalState {
89    operations: HashMap<OperationId, OperationCanonicalState>,
90    completion_feed_entries: HashMap<OperationId, CompletionFeedCanonicalState>,
91    completed_order: VecDeque<OperationId>,
92    max_completed: usize,
93    max_concurrent: Option<usize>,
94    active_count: usize,
95    wait_request_id: Option<WaitRequestId>,
96    wait_operation_ids: Vec<OperationId>,
97    next_completion_seq: CompletionSeq,
98}
99
100impl RegistryCanonicalState {
101    /// Maximum completed operations retained at capture time.
102    pub fn max_completed(&self) -> usize {
103        self.max_completed
104    }
105
106    /// Maximum concurrent non-terminal operations at capture time.
107    pub fn max_concurrent(&self) -> Option<usize> {
108        self.max_concurrent
109    }
110
111    /// Number of operations captured in the snapshot.
112    pub fn operation_count(&self) -> usize {
113        self.operations.len()
114    }
115
116    /// Number of generated-owned public completion feed entries captured.
117    pub fn completion_feed_count(&self) -> usize {
118        self.completion_feed_entries.len()
119    }
120}
121
122// ---------------------------------------------------------------------------
123// Serializable snapshot for persistence
124// ---------------------------------------------------------------------------
125
126/// Serializable snapshot of the ops lifecycle registry state.
127///
128/// Captured on terminal transitions for durable persistence. Contains
129/// canonical state, operation specs, persisted completion feed entries, and
130/// consumer cursor values. Wire format preserved verbatim from legacy
131/// runtime versions for backward compatibility.
132#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
133pub struct PersistedOpsSnapshot {
134    /// Epoch identity at capture time.
135    pub epoch_id: meerkat_core::RuntimeEpochId,
136    /// Canonical machine-owned state at capture time.
137    pub authority_state: RegistryCanonicalState,
138    /// Per-operation specs for shell record reconstruction.
139    pub operation_specs: HashMap<OperationId, meerkat_core::ops_lifecycle::OperationSpec>,
140    /// Persisted completion feed projection metadata. Canonical feed truth is
141    /// captured in `authority_state.completion_feed_entries`.
142    pub completion_entries: Vec<CompletionEntry>,
143    /// Consumer cursor snapshot at capture time.
144    pub cursors: meerkat_core::EpochCursorSnapshot,
145}
146
147#[derive(Debug)]
148pub struct OpsLifecyclePersistenceRequest {
149    snapshot: PersistedOpsSnapshot,
150    result_tx: std::sync::mpsc::SyncSender<Result<(), OpsLifecycleError>>,
151}
152
153impl OpsLifecyclePersistenceRequest {
154    pub fn snapshot(&self) -> &PersistedOpsSnapshot {
155        &self.snapshot
156    }
157
158    pub fn complete(self, result: Result<(), OpsLifecycleError>) {
159        let _ = self.result_tx.send(result);
160    }
161}
162
163// ---------------------------------------------------------------------------
164// Concrete completion feed buffer
165// ---------------------------------------------------------------------------
166
167/// Shared inner state of the completion feed buffer.
168///
169/// Protected by the registry's `RwLock<ShellState>` for writes, and by its
170/// own `RwLock` for reads by external consumers (agent boundary, idle wake).
171#[derive(Debug)]
172struct FeedBufferInner {
173    entries: VecDeque<CompletionEntry>,
174    watermark: CompletionSeq,
175    max_retained: usize,
176}
177
178/// Shared completion feed buffer owned by the runtime registry.
179///
180/// The registry writes entries under its own write lock. External consumers
181/// read through the [`RuntimeCompletionFeed`] handle.
182#[derive(Debug)]
183struct FeedBuffer {
184    inner: RwLock<FeedBufferInner>,
185    /// Atomic mirror of watermark for lock-free `watermark()` reads.
186    watermark_atomic: AtomicU64,
187    /// Notifies all waiters when new entries are appended.
188    notify: tokio::sync::Notify,
189}
190
191impl FeedBuffer {
192    fn new(max_retained: usize) -> Self {
193        Self {
194            inner: RwLock::new(FeedBufferInner {
195                entries: VecDeque::new(),
196                watermark: 0,
197                max_retained,
198            }),
199            watermark_atomic: AtomicU64::new(0),
200            notify: tokio::sync::Notify::new(),
201        }
202    }
203
204    fn push(&self, entry: CompletionEntry) {
205        let mut inner = self
206            .inner
207            .write()
208            .unwrap_or_else(std::sync::PoisonError::into_inner);
209        let seq = entry.seq;
210        inner.entries.push_back(entry);
211        inner.watermark = seq;
212
213        // Evict oldest if over capacity.
214        while inner.entries.len() > inner.max_retained {
215            inner.entries.pop_front();
216        }
217
218        drop(inner);
219
220        self.watermark_atomic.store(seq, Ordering::Release);
221        self.notify.notify_waiters();
222    }
223}
224
225/// Read-only handle to the runtime completion feed.
226///
227/// Implements [`CompletionFeed`] for external consumers. Obtained via
228/// [`RuntimeOpsLifecycleRegistry::completion_feed()`].
229#[derive(Debug, Clone)]
230pub struct RuntimeCompletionFeed {
231    buffer: Arc<FeedBuffer>,
232}
233
234impl CompletionFeed for RuntimeCompletionFeed {
235    fn watermark(&self) -> CompletionSeq {
236        self.buffer.watermark_atomic.load(Ordering::Acquire)
237    }
238
239    fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch {
240        let inner = self
241            .buffer
242            .inner
243            .read()
244            .unwrap_or_else(std::sync::PoisonError::into_inner);
245        let entries: Vec<CompletionEntry> = inner
246            .entries
247            .iter()
248            .filter(|e| e.seq > after_seq)
249            .cloned()
250            .collect();
251        let watermark = inner.watermark;
252        CompletionBatch { entries, watermark }
253    }
254
255    fn wait_for_advance(
256        &self,
257        after_seq: CompletionSeq,
258    ) -> std::pin::Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>> {
259        Box::pin(async move {
260            loop {
261                // Register the waiter BEFORE reading the watermark.
262                // notify_waiters() in push() only wakes already-registered
263                // listeners — if we read first and push() lands between the
264                // read and notified().await, the wake is lost.
265                let notified = self.buffer.notify.notified();
266                let current = self.buffer.watermark_atomic.load(Ordering::Acquire);
267                if current > after_seq {
268                    return current;
269                }
270                notified.await;
271            }
272        })
273    }
274}
275
276// ---------------------------------------------------------------------------
277// Shell-only per-operation record (not part of canonical machine state)
278// ---------------------------------------------------------------------------
279
280#[derive(Debug)]
281struct OperationCompletionNotifier {
282    tx: tokio::sync::oneshot::Sender<OperationTerminalOutcome>,
283}
284
285impl OperationCompletionNotifier {
286    fn new(tx: tokio::sync::oneshot::Sender<OperationTerminalOutcome>) -> Self {
287        Self { tx }
288    }
289
290    fn notify_after_generated_terminal(self, outcome: &OperationTerminalOutcome) {
291        let _ = self.tx.send(outcome.clone());
292    }
293}
294
295fn operation_completion_watch_from_receiver(
296    rx: tokio::sync::oneshot::Receiver<OperationTerminalOutcome>,
297) -> OperationCompletionWatch {
298    Box::pin(async move {
299        rx.await
300            .map_err(|_| meerkat_core::ops_lifecycle::OperationCompletionWatchError::ChannelClosed)
301    })
302}
303
304fn resolved_operation_completion_watch(
305    outcome: OperationTerminalOutcome,
306) -> OperationCompletionWatch {
307    Box::pin(async move { Ok(outcome) })
308}
309
310/// Shell-owned data for a single operation. Canonical lifecycle state lives in
311/// the DSL authority; this struct holds I/O concerns that the DSL has no
312/// knowledge of.
313#[derive(Debug)]
314struct ShellRecord {
315    spec: OperationSpec,
316    peer_handle: Option<OperationPeerHandle>,
317    /// Private waiter plumbing. Notifiers are drained only from
318    /// `finalize_terminal()` after the generated authority has accepted and
319    /// stored a terminal outcome.
320    watchers: Vec<OperationCompletionNotifier>,
321    // Monotonic timestamps for elapsed computation
322    created_at: Instant,
323    started_at: Option<Instant>,
324    completed_at: Option<Instant>,
325    // Wall-clock anchor captured at creation for epoch millis
326    created_at_wall: SystemTime,
327}
328
329#[derive(Debug)]
330struct PendingWaitState {
331    wait_request_id: WaitRequestId,
332    sender: tokio::sync::oneshot::Sender<WaitAllSatisfied>,
333}
334
335enum WaitAllAuthorityPlan {
336    AlreadySatisfied(WaitAllSatisfied),
337    ActivateBarrier,
338}
339
340#[derive(Debug, Clone, Copy, PartialEq, Eq)]
341enum RecoveredOperationRecordDisposition {
342    Retain,
343    Discard,
344}
345
346impl ShellRecord {
347    fn new(spec: OperationSpec) -> Self {
348        Self {
349            spec,
350            peer_handle: None,
351            watchers: Vec::new(),
352            created_at: Instant::now(),
353            started_at: None,
354            completed_at: None,
355            created_at_wall: SystemTime::now(),
356        }
357    }
358
359    fn epoch_millis(wall_anchor: &SystemTime) -> u64 {
360        wall_anchor
361            .duration_since(UNIX_EPOCH)
362            .map(|d| d.as_millis() as u64)
363            .unwrap_or(0)
364    }
365
366    fn epoch_millis_for_instant(&self, instant: Instant) -> u64 {
367        // Compute wall time for a given instant using the wall-clock anchor:
368        // wall_time = created_at_wall + (instant - created_at)
369        let offset = instant.saturating_duration_since(self.created_at);
370        let wall = self.created_at_wall + offset;
371        Self::epoch_millis(&wall)
372    }
373
374    /// Notify all watchers with the given terminal outcome and drain the list.
375    fn notify_watchers(&mut self, outcome: &OperationTerminalOutcome) {
376        for watcher in std::mem::take(&mut self.watchers) {
377            watcher.notify_after_generated_terminal(outcome);
378        }
379    }
380
381    /// Mark the completion timestamp.
382    fn mark_completed(&mut self) {
383        self.completed_at = Some(Instant::now());
384    }
385}
386
387// ---------------------------------------------------------------------------
388// Combined shell state: DSL authority + shell records
389// ---------------------------------------------------------------------------
390
391#[derive(Debug)]
392struct ShellState {
393    /// DSL authority — sole source of truth for per-op canonical state.
394    dsl: DslAuthority,
395    /// Shell-owned per-operation records (specs, watchers, timestamps, peer handles).
396    records: HashMap<OperationId, ShellRecord>,
397    /// Pending wait-all coordination (oneshot channel).
398    pending_wait: Option<PendingWaitState>,
399    /// FIFO ordering of completed operation IDs for bounded eviction.
400    completed_order: VecDeque<OperationId>,
401    /// Maximum completed operations to retain.
402    max_completed: usize,
403    /// Maximum concurrent non-terminal operations (None = unlimited).
404    max_concurrent: Option<usize>,
405    /// Oneshot correlation id for the currently-pending `wait_all` future.
406    ///
407    /// Barrier membership (`wait_operation_ids`) and activation (`wait_active`)
408    /// are DSL-owned. This field is pure transport mechanics — the identity
409    /// the oneshot sender is tagged with so `Drop` can correlate cancellation.
410    wait_request_id: Option<WaitRequestId>,
411    /// Shared feed buffer for completion events.
412    feed_buffer: Arc<FeedBuffer>,
413    /// Persistence channel for durable snapshot writes (set via `set_persistence_channel`).
414    persist_tx: Option<crate::tokio::sync::mpsc::UnboundedSender<OpsLifecyclePersistenceRequest>>,
415    /// Epoch ID for persistence snapshots.
416    persist_epoch_id: Option<meerkat_core::RuntimeEpochId>,
417    /// Shared cursor state for persistence snapshots.
418    persist_cursor_state: Option<Arc<meerkat_core::EpochCursorState>>,
419}
420
421/// Wrapper around the DSL authority that provides `Debug` output.
422///
423/// The generated `MeerkatMachineAuthority` does not derive `Debug`, but
424/// `ShellState` requires it. This wrapper delegates to the inner state's
425/// `Debug` impl.
426struct DslAuthority(Box<mm_dsl::MeerkatMachineAuthority>);
427
428impl std::fmt::Debug for DslAuthority {
429    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
430        f.debug_struct("DslAuthority")
431            .field("state", self.0.state())
432            .finish()
433    }
434}
435
436/// Create a DSL authority initialized through generated authority. Per-op
437/// transitions guard only on `op_statuses.contains_key(operation_id)`, so the
438/// phase stays in `Idle` permanently (they all `to Idle`).
439fn new_ops_dsl_authority() -> DslAuthority {
440    DslAuthority(Box::new(
441        crate::meerkat_machine::dsl_authority::new_initialized_authority(
442            "ops lifecycle DSL Initialize must be accepted",
443        ),
444    ))
445}
446
447impl ShellState {
448    fn new(max_completed: usize, max_concurrent: Option<usize>) -> Self {
449        tracing::info!("RuntimeOpsLifecycleRegistry::ShellState creating dsl");
450        let dsl = new_ops_dsl_authority();
451        tracing::info!("RuntimeOpsLifecycleRegistry::ShellState created dsl");
452        let feed_capacity = max_completed.saturating_mul(4).max(1024);
453        tracing::info!(
454            feed_capacity,
455            "RuntimeOpsLifecycleRegistry::ShellState creating feed buffer"
456        );
457        let feed_buffer = Arc::new(FeedBuffer::new(feed_capacity));
458        tracing::info!("RuntimeOpsLifecycleRegistry::ShellState created feed buffer");
459        Self {
460            dsl,
461            records: HashMap::new(),
462            pending_wait: None,
463            completed_order: VecDeque::new(),
464            max_completed,
465            max_concurrent,
466            wait_request_id: None,
467            // Feed buffer is larger than max_completed to absorb bursts.
468            // Entries are only evicted by buffer capacity, not by consumer cursor,
469            // so the buffer must be large enough that consumers drain before
470            // the oldest entry is evicted.
471            feed_buffer,
472            persist_tx: None,
473            persist_epoch_id: None,
474            persist_cursor_state: None,
475        }
476    }
477
478    /// Apply a DSL input, mapping transition errors into
479    /// [`OpsLifecycleError::Internal`]. Callers that need to distinguish
480    /// guard rejections (legal-transition violations) from internal desync
481    /// should use [`Self::dsl_apply_raw`] and classify the error themselves;
482    /// this helper is for DSL inputs whose preconditions the caller has
483    /// already fully validated (e.g., `RequestWaitAll`, `SatisfyWaitAll`).
484    fn dsl_apply(
485        &mut self,
486        input: mm_dsl::MeerkatMachineInput,
487        context: &str,
488    ) -> Result<(), OpsLifecycleError> {
489        self.dsl_apply_raw(input).map_err(|err| {
490            OpsLifecycleError::Internal(format!("DSL rejected ops transition ({context}): {err:?}"))
491        })
492    }
493
494    /// Apply a DSL input, returning the raw kernel-level rejection so callers
495    /// can distinguish `GuardRejected` (a legitimate legality violation, e.g.,
496    /// `complete_operation` on a `Provisioning` op) from
497    /// `NoMatchingTransition` (a shell/DSL desync). Op-lifecycle entry points
498    /// feed guard rejections into generated rejection feedback before surfacing
499    /// public result classes.
500    fn dsl_apply_raw(
501        &mut self,
502        input: mm_dsl::MeerkatMachineInput,
503    ) -> Result<(), mm_dsl::MeerkatMachineTransitionError> {
504        mm_dsl::MeerkatMachineMutator::apply(&mut *self.dsl.0, input).map(|_transition| ())
505    }
506
507    fn dsl_apply_with_effects(
508        &mut self,
509        input: mm_dsl::MeerkatMachineInput,
510        context: &str,
511    ) -> Result<Vec<mm_dsl::MeerkatMachineEffect>, OpsLifecycleError> {
512        let transition =
513            mm_dsl::MeerkatMachineMutator::apply(&mut *self.dsl.0, input).map_err(|err| {
514                OpsLifecycleError::Internal(format!(
515                    "DSL rejected ops transition ({context}): {err:?}"
516                ))
517            })?;
518        Ok(transition.into_effects())
519    }
520
521    /// Fail-closed read of a typed terminal payload entry: the payload IS the
522    /// domain [`OperationTerminalOutcome`] (K8b fold — no JSON codec), but the
523    /// shell still refuses to surface a payload whose variant disagrees with
524    /// the recorded discriminant.
525    fn checked_terminal_payload(
526        kind: mm_dsl::OperationTerminalOutcomeKind,
527        payload: &OperationTerminalOutcome,
528        authority: &str,
529        operation_id: &str,
530    ) -> Result<OperationTerminalOutcome, OpsLifecycleError> {
531        if mm_dsl::OperationTerminalOutcomeKind::from(payload) != kind {
532            return Err(OpsLifecycleError::Internal(format!(
533                "{authority} payload variant for {operation_id} does not match terminal outcome discriminant"
534            )));
535        }
536        Ok(payload.clone())
537    }
538
539    /// Read the DSL operation status for `id`, or `None` if not registered.
540    fn status(&self, id: &OperationId) -> Option<OperationStatus> {
541        let id_key = mm_dsl::OperationId::from_domain(id).0;
542        self.dsl
543            .0
544            .state()
545            .op_statuses
546            .get(&id_key)
547            .copied()
548            .map(OperationStatus::from)
549    }
550
551    fn require_status(&self, id: &OperationId) -> Result<OperationStatus, OpsLifecycleError> {
552        self.status(id).ok_or_else(|| {
553            OpsLifecycleError::Internal(format!(
554                "generated op lifecycle authority missing status for {id}"
555            ))
556        })
557    }
558
559    /// Read the DSL operation kind for `id`, or `None` if not registered.
560    fn kind(&self, id: &OperationId) -> Option<OperationKind> {
561        let id_key = mm_dsl::OperationId::from_domain(id).0;
562        self.dsl
563            .0
564            .state()
565            .op_kinds
566            .get(&id_key)
567            .copied()
568            .map(OperationKind::from)
569    }
570
571    fn require_kind(&self, id: &OperationId) -> Result<OperationKind, OpsLifecycleError> {
572        self.kind(id).ok_or_else(|| {
573            OpsLifecycleError::Internal(format!(
574                "generated op lifecycle authority missing kind for {id}"
575            ))
576        })
577    }
578
579    fn operation_source(
580        &self,
581        id: &OperationId,
582    ) -> Result<Option<OperationSource>, OpsLifecycleError> {
583        let id_key = mm_dsl::OperationId::from_domain(id).0;
584        self.dsl
585            .0
586            .state()
587            .op_sources
588            .get(&id_key)
589            .map(|source| {
590                source.to_domain().map_err(|error| {
591                    OpsLifecycleError::Internal(format!(
592                        "generated operation source authority has invalid source for {id}: {error}"
593                    ))
594                })
595            })
596            .transpose()
597    }
598
599    fn child_session_id_from_operation_source(
600        operation_source: Option<&OperationSource>,
601    ) -> Option<SessionId> {
602        match operation_source {
603            Some(OperationSource::SessionChild { session_id }) => Some(session_id.clone()),
604            Some(OperationSource::BackendPeer { .. }) | None => None,
605        }
606    }
607
608    fn align_spec_child_session_id_to_source(
609        spec: &mut OperationSpec,
610        operation_source: Option<&OperationSource>,
611    ) {
612        spec.child_session_id = Self::child_session_id_from_operation_source(operation_source);
613    }
614
615    /// Read the peer-ready flag for `id`.
616    fn peer_ready(&self, id: &OperationId) -> Option<bool> {
617        let id_key = mm_dsl::OperationId::from_domain(id).0;
618        self.dsl.0.state().op_peer_ready.get(&id_key).copied()
619    }
620
621    fn require_peer_ready(&self, id: &OperationId) -> Result<bool, OpsLifecycleError> {
622        self.peer_ready(id).ok_or_else(|| {
623            OpsLifecycleError::Internal(format!(
624                "generated op peer wiring authority missing peer-ready fact for {id}"
625            ))
626        })
627    }
628
629    /// Read the progress counter for `id`.
630    fn progress_count(&self, id: &OperationId) -> Option<u32> {
631        let id_key = mm_dsl::OperationId::from_domain(id).0;
632        self.dsl
633            .0
634            .state()
635            .op_progress_counts
636            .get(&id_key)
637            .map(|v| (*v).min(u32::MAX as u64) as u32)
638    }
639
640    fn require_progress_count(&self, id: &OperationId) -> Result<u32, OpsLifecycleError> {
641        self.progress_count(id).ok_or_else(|| {
642            OpsLifecycleError::Internal(format!(
643                "generated op progress authority missing progress count for {id}"
644            ))
645        })
646    }
647
648    /// Read the terminal outcome for `id` by pairing the DSL's typed
649    /// discriminant with the companion payload JSON. Returns `None` when the
650    /// op has no recorded terminal discriminant.
651    fn terminal_outcome(
652        &self,
653        id: &OperationId,
654    ) -> Result<Option<OperationTerminalOutcome>, OpsLifecycleError> {
655        let id_key = mm_dsl::OperationId::from_domain(id).0;
656        let state = self.dsl.0.state();
657        let status = self.status(id);
658        let terminal = match status {
659            Some(status) => Self::operation_status_is_terminal(id, status)?,
660            None => false,
661        };
662        let kind = state.op_terminal_outcomes.get(&id_key).copied();
663        let Some(kind) = kind else {
664            if terminal {
665                return Err(OpsLifecycleError::Internal(format!(
666                    "generated op terminal authority missing terminal outcome for {id}"
667                )));
668            }
669            return Ok(None);
670        };
671        if !terminal {
672            return Err(OpsLifecycleError::Internal(format!(
673                "generated op terminal authority has terminal outcome for non-terminal {id}"
674            )));
675        }
676        let payload = state.op_terminal_payload.get(&id_key).ok_or_else(|| {
677            OpsLifecycleError::Internal(format!(
678                "generated op terminal authority missing terminal payload for {id}"
679            ))
680        })?;
681        Self::checked_terminal_payload(kind, payload, "generated op terminal authority", &id_key)
682            .map(Some)
683    }
684
685    /// Whether the operation is currently tracked in DSL state.
686    fn contains(&self, id: &OperationId) -> bool {
687        let id_key = mm_dsl::OperationId::from_domain(id).0;
688        self.dsl.0.state().op_statuses.contains_key(&id_key)
689    }
690
691    /// Number of non-terminal operations (derived from DSL state).
692    fn active_count(&self) -> usize {
693        self.dsl.0.state().active_op_count as usize
694    }
695
696    /// Number of operations currently tracked (including terminal).
697    fn operation_count(&self) -> usize {
698        self.dsl.0.state().op_statuses.len()
699    }
700
701    /// Iterate over all tracked operation IDs (DSL keys converted to domain).
702    fn operation_ids(&self) -> Result<Vec<OperationId>, OpsLifecycleError> {
703        let mut ids = BTreeSet::new();
704        let state = self.dsl.0.state();
705        Self::collect_operation_id_keys(&mut ids, "op_statuses", state.op_statuses.keys())?;
706        Self::collect_operation_id_keys(&mut ids, "op_kinds", state.op_kinds.keys())?;
707        Self::collect_operation_id_keys(&mut ids, "op_sources", state.op_sources.keys())?;
708        Self::collect_operation_id_keys(&mut ids, "op_peer_ready", state.op_peer_ready.keys())?;
709        Self::collect_operation_id_keys(
710            &mut ids,
711            "op_progress_counts",
712            state.op_progress_counts.keys(),
713        )?;
714        Self::collect_operation_id_keys(
715            &mut ids,
716            "op_terminal_outcomes",
717            state.op_terminal_outcomes.keys(),
718        )?;
719        Self::collect_operation_id_keys(
720            &mut ids,
721            "op_terminal_payload",
722            state.op_terminal_payload.keys(),
723        )?;
724        Self::collect_operation_id_keys(
725            &mut ids,
726            "op_completion_seq",
727            state.op_completion_seq.keys(),
728        )?;
729        ids.extend(self.records.keys().cloned());
730        Ok(ids.into_iter().collect())
731    }
732
733    fn collect_operation_id_keys<'a, I>(
734        ids: &mut BTreeSet<OperationId>,
735        field: &str,
736        keys: I,
737    ) -> Result<(), OpsLifecycleError>
738    where
739        I: IntoIterator<Item = &'a String>,
740    {
741        for key in keys {
742            let id = serde_json::from_str::<OperationId>(key).map_err(|error| {
743                OpsLifecycleError::Internal(format!(
744                    "generated operation identity authority used invalid operation id key in {field}: {key}: {error}"
745                ))
746            })?;
747            ids.insert(id);
748        }
749        Ok(())
750    }
751
752    fn has_generated_operation_record_fact(&self, id: &OperationId) -> bool {
753        let id_key = mm_dsl::OperationId::from_domain(id).0;
754        let state = self.dsl.0.state();
755        state.op_statuses.contains_key(&id_key)
756            || state.op_kinds.contains_key(&id_key)
757            || state.op_sources.contains_key(&id_key)
758            || state.op_peer_ready.contains_key(&id_key)
759            || state.op_progress_counts.contains_key(&id_key)
760            || state.op_terminal_outcomes.contains_key(&id_key)
761            || state.op_terminal_payload.contains_key(&id_key)
762            || state.op_completion_seq.contains_key(&id_key)
763    }
764
765    /// Read the DSL-minted completion sequence for a terminal operation.
766    fn completion_sequence(&self, id: &OperationId) -> Option<CompletionSeq> {
767        let id_key = mm_dsl::OperationId::from_domain(id).0;
768        self.dsl.0.state().op_completion_seq.get(&id_key).copied()
769    }
770
771    fn completion_feed_authority_entries(
772        &self,
773    ) -> Result<HashMap<OperationId, CompletionFeedCanonicalState>, OpsLifecycleError> {
774        let state = self.dsl.0.state();
775        let sequence_keys: BTreeSet<String> =
776            state.completion_feed_sequences.keys().cloned().collect();
777        let companion_domains: [(&str, BTreeSet<String>); 3] = [
778            (
779                "completion_feed_kinds",
780                state.completion_feed_kinds.keys().cloned().collect(),
781            ),
782            (
783                "completion_feed_terminal_outcomes",
784                state
785                    .completion_feed_terminal_outcomes
786                    .keys()
787                    .cloned()
788                    .collect(),
789            ),
790            (
791                "completion_feed_terminal_payload",
792                state
793                    .completion_feed_terminal_payload
794                    .keys()
795                    .cloned()
796                    .collect(),
797            ),
798        ];
799        for (field, keys) in companion_domains {
800            if keys != sequence_keys {
801                return Err(OpsLifecycleError::Internal(format!(
802                    "generated completion feed authority has mismatched {field} domain"
803                )));
804            }
805        }
806
807        let mut entries = HashMap::new();
808        for (id_key, seq) in &state.completion_feed_sequences {
809            if !state.completion_sequence_claims.contains(seq) {
810                return Err(OpsLifecycleError::Internal(format!(
811                    "generated completion feed authority sequence {seq} for {id_key} is not claimed"
812                )));
813            }
814            let operation_id = serde_json::from_str::<OperationId>(id_key).map_err(|error| {
815                OpsLifecycleError::Internal(format!(
816                    "generated completion feed authority used invalid operation id key {id_key}: {error}"
817                ))
818            })?;
819            let kind = state
820                .completion_feed_kinds
821                .get(id_key)
822                .copied()
823                .map(OperationKind::from)
824                .ok_or_else(|| {
825                    OpsLifecycleError::Internal(format!(
826                        "generated completion feed authority missing kind for {id_key}"
827                    ))
828                })?;
829            let outcome_kind = state
830                .completion_feed_terminal_outcomes
831                .get(id_key)
832                .copied()
833                .ok_or_else(|| {
834                    OpsLifecycleError::Internal(format!(
835                        "generated completion feed authority missing terminal outcome for {id_key}"
836                    ))
837                })?;
838            let payload = state
839                .completion_feed_terminal_payload
840                .get(id_key)
841                .ok_or_else(|| {
842                    OpsLifecycleError::Internal(format!(
843                        "generated completion feed authority missing terminal payload for {id_key}"
844                    ))
845                })?;
846            let terminal_outcome = Self::checked_terminal_payload(
847                outcome_kind,
848                payload,
849                "generated completion feed authority",
850                id_key,
851            )?;
852            entries.insert(
853                operation_id,
854                CompletionFeedCanonicalState {
855                    seq: *seq,
856                    kind,
857                    terminal_outcome,
858                },
859            );
860        }
861        Ok(entries)
862    }
863
864    fn completion_cursor(&self, consumer: CompletionCursorConsumer) -> CompletionSeq {
865        let state = self.dsl.0.state();
866        match consumer {
867            CompletionCursorConsumer::AgentApplied => state.completion_agent_applied_cursor,
868            CompletionCursorConsumer::RuntimeObserved => state.completion_runtime_observed_cursor,
869            CompletionCursorConsumer::RuntimeInjected => state.completion_runtime_injected_cursor,
870        }
871    }
872
873    fn completion_cursor_snapshot(&self) -> meerkat_core::EpochCursorSnapshot {
874        meerkat_core::EpochCursorSnapshot {
875            agent_applied_cursor: self.completion_cursor(CompletionCursorConsumer::AgentApplied),
876            runtime_observed_seq: self.completion_cursor(CompletionCursorConsumer::RuntimeObserved),
877            runtime_last_injected_seq: self
878                .completion_cursor(CompletionCursorConsumer::RuntimeInjected),
879        }
880    }
881
882    /// Build a snapshot from DSL state + shell record.
883    fn snapshot(
884        &self,
885        id: &OperationId,
886    ) -> Result<Option<OperationLifecycleSnapshot>, OpsLifecycleError> {
887        let Some(shell) = self.records.get(id) else {
888            if self.has_generated_operation_record_fact(id) {
889                return Err(OpsLifecycleError::Internal(format!(
890                    "generated op lifecycle authority has operation facts without shell projection record for {id}"
891                )));
892            }
893            return Ok(None);
894        };
895        let kind = self.require_kind(id)?;
896        let status = self.require_status(id)?;
897        let terminal = Self::operation_status_is_terminal(id, status)?;
898        let public_result_class = Self::operation_public_result_class(id, status)?;
899        let peer_ready = self.require_peer_ready(id)?;
900        let progress_count = self.require_progress_count(id)?;
901        let operation_source = self.operation_source(id)?;
902        let terminal_outcome = self.terminal_outcome(id)?;
903
904        let created_at_ms = ShellRecord::epoch_millis(&shell.created_at_wall);
905        let started_at_ms = shell.started_at.map(|i| shell.epoch_millis_for_instant(i));
906        let completed_at_ms = shell
907            .completed_at
908            .map(|i| shell.epoch_millis_for_instant(i));
909        let elapsed_ms = shell.completed_at.map(|completed| {
910            completed
911                .saturating_duration_since(shell.created_at)
912                .as_millis() as u64
913        });
914
915        Ok(Some(OperationLifecycleSnapshot {
916            id: shell.spec.id.clone(),
917            kind,
918            display_name: shell.spec.display_name.clone(),
919            child_session_id: Self::child_session_id_from_operation_source(
920                operation_source.as_ref(),
921            ),
922            operation_source,
923            status,
924            terminal,
925            public_result_class,
926            peer_ready,
927            progress_count,
928            watcher_count: shell.watchers.len() as u32,
929            terminal_outcome,
930            peer_handle: shell.peer_handle.clone(),
931            created_at_ms,
932            started_at_ms,
933            completed_at_ms,
934            elapsed_ms,
935        }))
936    }
937
938    /// Emit shell-side mechanics for a terminal transition: notify watchers,
939    /// push generated-authorized CompletionEntry rows, retain in FIFO, evict
940    /// as needed. Called AFTER the DSL transition has already persisted the
941    /// terminal status + outcome.
942    fn finalize_terminal(&mut self, id: &OperationId) -> Result<(), OpsLifecycleError> {
943        let outcome = self.terminal_outcome(id)?.ok_or_else(|| {
944            OpsLifecycleError::Internal(format!(
945                "generated op terminal transition did not mint terminal outcome for {id}"
946            ))
947        })?;
948        let kind = self.require_kind(id)?;
949
950        // Notify watchers and mark completion timestamp.
951        if let Some(shell) = self.records.get_mut(id) {
952            shell.notify_watchers(&outcome);
953            shell.mark_completed();
954        }
955
956        if Self::operation_durability_class(id, kind)? == mm_dsl::OperationDurabilityClass::Discard
957        {
958            self.dsl_apply(
959                mm_dsl::MeerkatMachineInput::CollectCompletedOp {
960                    operation_id: mm_dsl::OperationId::from_domain(id).0,
961                },
962                "CollectCompletedOp",
963            )?;
964            self.records.remove(id);
965            self.completed_order.retain(|queued| queued != id);
966            return Ok(());
967        }
968
969        if Self::operation_completion_feed_class(id, kind)?
970            == mm_dsl::OperationCompletionFeedClass::Emit
971        {
972            let feed_authority = self
973                .completion_feed_authority_entries()?
974                .remove(id)
975                .ok_or_else(|| {
976                    OpsLifecycleError::Internal(format!(
977                        "generated op terminal transition did not mint completion feed authority for {id}"
978                    ))
979                })?;
980            if feed_authority.kind != kind || feed_authority.terminal_outcome != outcome {
981                return Err(OpsLifecycleError::Internal(format!(
982                    "generated completion feed authority drifted from terminal op authority for {id}"
983                )));
984            }
985            let seq = self.completion_sequence(id).ok_or_else(|| {
986                OpsLifecycleError::Internal(format!(
987                    "generated op terminal transition did not mint completion sequence for {id}"
988                ))
989            })?;
990            if feed_authority.seq != seq {
991                return Err(OpsLifecycleError::Internal(format!(
992                    "generated completion feed authority sequence drifted for {id}"
993                )));
994            }
995            let display_name = self
996                .records
997                .get(id)
998                .map(|r| r.spec.display_name.clone())
999                .unwrap_or_default();
1000            let completed_at_ms = self
1001                .records
1002                .get(id)
1003                .and_then(|r| r.completed_at.map(|i| r.epoch_millis_for_instant(i)));
1004            self.feed_buffer.push(CompletionEntry {
1005                seq: feed_authority.seq,
1006                operation_id: id.clone(),
1007                kind: feed_authority.kind,
1008                display_name,
1009                terminal_outcome: feed_authority.terminal_outcome,
1010                completed_at_ms,
1011            });
1012        }
1013
1014        // FIFO retention + eviction.
1015        self.completed_order.push_back(id.clone());
1016        while self.completed_order.len() > self.max_completed {
1017            if let Some(evicted) = self.completed_order.pop_front() {
1018                self.dsl_apply(
1019                    mm_dsl::MeerkatMachineInput::EvictCompletedOp {
1020                        operation_id: mm_dsl::OperationId::from_domain(&evicted).0,
1021                    },
1022                    "EvictCompletedOp",
1023                )?;
1024                self.records.remove(&evicted);
1025            }
1026        }
1027
1028        // Satisfy a pending wait request if all its ops are now terminal. On
1029        // authority-invariant corruption this propagates the typed fault (and
1030        // drops the barrier oneshot so the waiter resolves to Err) instead of
1031        // reporting the op terminal with a silently-hung barrier.
1032        self.maybe_satisfy_wait()?;
1033        Ok(())
1034    }
1035
1036    /// Read barrier membership from DSL state (sole owner).
1037    fn wait_operation_ids(&self) -> Result<Vec<OperationId>, OpsLifecycleError> {
1038        self.dsl
1039            .0
1040            .state()
1041            .wait_operation_ids
1042            .iter()
1043            .map(|key| {
1044                serde_json::from_str::<OperationId>(key).map_err(|error| {
1045                    OpsLifecycleError::Internal(format!(
1046                        "generated wait operation identity authority used invalid operation id key {key}: {error}"
1047                    ))
1048                })
1049            })
1050            .collect()
1051    }
1052
1053    /// Whether the DSL has a barrier wait active.
1054    #[cfg(test)]
1055    fn wait_active(&self) -> bool {
1056        self.dsl.0.state().wait_active
1057    }
1058
1059    fn wait_all_satisfied_from_effects(
1060        effects: &[mm_dsl::MeerkatMachineEffect],
1061    ) -> Result<Option<WaitAllSatisfied>, OpsLifecycleError> {
1062        let mut satisfied = None;
1063        for effect in effects {
1064            let mm_dsl::MeerkatMachineEffect::WaitAllSatisfied {
1065                wait_request_id,
1066                run_id,
1067                operation_ids,
1068            } = effect
1069            else {
1070                continue;
1071            };
1072            if satisfied.is_some() {
1073                return Err(OpsLifecycleError::Internal(
1074                    "generated wait_all authority emitted multiple satisfaction effects".into(),
1075                ));
1076            }
1077            let wait_uuid = uuid::Uuid::parse_str(&wait_request_id.0).map_err(|err| {
1078                OpsLifecycleError::Internal(format!(
1079                    "generated wait_all authority emitted invalid wait request id '{}': {err}",
1080                    wait_request_id.0
1081                ))
1082            })?;
1083            let mut ids = Vec::with_capacity(operation_ids.len());
1084            for operation_id in operation_ids {
1085                ids.push(
1086                    serde_json::from_str::<OperationId>(&operation_id.0).map_err(|err| {
1087                        OpsLifecycleError::Internal(format!(
1088                            "generated wait_all authority emitted invalid operation id '{}': {err}",
1089                            operation_id.0
1090                        ))
1091                    })?,
1092                );
1093            }
1094            satisfied = Some(WaitAllSatisfied {
1095                wait_request_id: WaitRequestId::from_uuid(wait_uuid),
1096                run_id: RunId::from_uuid(uuid::Uuid::parse_str(&run_id.0).map_err(|err| {
1097                    OpsLifecycleError::Internal(format!(
1098                        "generated wait_all authority emitted invalid run id '{}': {err}",
1099                        run_id.0
1100                    ))
1101                })?),
1102                operation_ids: ids,
1103            });
1104        }
1105        Ok(satisfied)
1106    }
1107
1108    fn parse_wait_all_operation_id(
1109        raw: &str,
1110        context: &str,
1111    ) -> Result<OperationId, OpsLifecycleError> {
1112        serde_json::from_str::<OperationId>(raw).map_err(|err| {
1113            OpsLifecycleError::Internal(format!(
1114                "generated wait_all authority emitted invalid {context} operation id '{raw}': {err}"
1115            ))
1116        })
1117    }
1118
1119    fn duplicate_wait_operation_id(operation_ids: &[OperationId]) -> Option<OperationId> {
1120        let mut seen = HashSet::new();
1121        operation_ids
1122            .iter()
1123            .find(|operation_id| !seen.insert((*operation_id).clone()))
1124            .cloned()
1125    }
1126
1127    fn wait_all_admission_error_from_effects(
1128        wait_request_id: &WaitRequestId,
1129        effects: &[mm_dsl::MeerkatMachineEffect],
1130    ) -> Result<Option<OpsLifecycleError>, OpsLifecycleError> {
1131        let mut admission = None;
1132        for effect in effects {
1133            let mm_dsl::MeerkatMachineEffect::WaitAllAdmissionResolved {
1134                wait_request_id: resolved_wait_request_id,
1135                result,
1136                reject_reason,
1137                rejected_operation_id,
1138            } = effect
1139            else {
1140                continue;
1141            };
1142            if admission.is_some() {
1143                return Err(OpsLifecycleError::Internal(
1144                    "generated wait_all authority emitted multiple admission results".into(),
1145                ));
1146            }
1147            let resolved_uuid =
1148                uuid::Uuid::parse_str(&resolved_wait_request_id.0).map_err(|err| {
1149                    OpsLifecycleError::Internal(format!(
1150                        "generated wait_all authority emitted invalid wait request id '{}': {err}",
1151                        resolved_wait_request_id.0
1152                    ))
1153                })?;
1154            let resolved_wait_request_id = WaitRequestId::from_uuid(resolved_uuid);
1155            if &resolved_wait_request_id != wait_request_id {
1156                return Err(OpsLifecycleError::Internal(format!(
1157                    "generated wait_all authority resolved wait request {resolved_wait_request_id} while shell requested {wait_request_id}"
1158                )));
1159            }
1160            admission = Some(match result {
1161                mm_dsl::WaitAllAdmissionResultKind::Accept => {
1162                    if reject_reason.is_some() || rejected_operation_id.is_some() {
1163                        return Err(OpsLifecycleError::Internal(
1164                            "generated wait_all authority accepted with rejection payload".into(),
1165                        ));
1166                    }
1167                    None
1168                }
1169                mm_dsl::WaitAllAdmissionResultKind::Reject => {
1170                    let reason = reject_reason.ok_or_else(|| {
1171                        OpsLifecycleError::Internal(
1172                            "generated wait_all authority rejected without reason".into(),
1173                        )
1174                    })?;
1175                    let error = match reason {
1176                        mm_dsl::WaitAllRejectReasonKind::DuplicateOperation => {
1177                            let raw = rejected_operation_id.as_deref().ok_or_else(|| {
1178                                OpsLifecycleError::Internal(
1179                                    "generated wait_all authority rejected duplicate without operation id"
1180                                        .into(),
1181                                )
1182                            })?;
1183                            OpsLifecycleError::DuplicateWaitOperation(
1184                                Self::parse_wait_all_operation_id(raw, "duplicate")?,
1185                            )
1186                        }
1187                        mm_dsl::WaitAllRejectReasonKind::WaitAlreadyActive => {
1188                            if rejected_operation_id.is_some() {
1189                                return Err(OpsLifecycleError::Internal(
1190                                    "generated wait_all authority rejected active wait with operation id"
1191                                        .into(),
1192                                ));
1193                            }
1194                            OpsLifecycleError::WaitAlreadyActive
1195                        }
1196                        mm_dsl::WaitAllRejectReasonKind::OperationNotFound => {
1197                            let raw = rejected_operation_id.as_deref().ok_or_else(|| {
1198                                OpsLifecycleError::Internal(
1199                                    "generated wait_all authority rejected missing operation without operation id"
1200                                        .into(),
1201                                )
1202                            })?;
1203                            OpsLifecycleError::NotFound(Self::parse_wait_all_operation_id(
1204                                raw, "missing",
1205                            )?)
1206                        }
1207                    };
1208                    Some(error)
1209                }
1210            });
1211        }
1212        admission.ok_or_else(|| {
1213            OpsLifecycleError::Internal(
1214                "generated wait_all authority emitted no admission result".into(),
1215            )
1216        })
1217    }
1218
1219    fn resolve_wait_all_admission(
1220        &mut self,
1221        wait_request_id: &WaitRequestId,
1222        operation_ids: &[OperationId],
1223        dsl_ids: &BTreeSet<String>,
1224        dsl_id_tokens: &BTreeSet<mm_dsl::OperationId>,
1225        operation_token_by_id: &BTreeMap<String, mm_dsl::OperationId>,
1226        operation_id_by_token: &BTreeMap<mm_dsl::OperationId, String>,
1227    ) -> Result<(), OpsLifecycleError> {
1228        let duplicate = Self::duplicate_wait_operation_id(operation_ids)
1229            .map(|operation_id| mm_dsl::OperationId::from_domain(&operation_id).0);
1230        let not_found = operation_ids
1231            .iter()
1232            .find(|operation_id| !self.contains(operation_id))
1233            .map(|operation_id| mm_dsl::OperationId::from_domain(operation_id).0);
1234        let dsl_id_sequence: Vec<String> = operation_ids
1235            .iter()
1236            .map(|id| mm_dsl::OperationId::from_domain(id).0)
1237            .collect();
1238        let effects = self.dsl_apply_with_effects(
1239            mm_dsl::MeerkatMachineInput::ResolveWaitAllAdmission {
1240                wait_request_id: mm_dsl::WaitRequestId::from_domain(wait_request_id),
1241                operation_id_sequence: dsl_id_sequence,
1242                operation_ids: dsl_ids.clone(),
1243                operation_id_tokens: dsl_id_tokens.clone(),
1244                operation_token_by_id: operation_token_by_id.clone(),
1245                operation_id_by_token: operation_id_by_token.clone(),
1246                duplicate_operation_id: duplicate,
1247                not_found_operation_id: not_found,
1248            },
1249            "ResolveWaitAllAdmission",
1250        )?;
1251        if let Some(error) = Self::wait_all_admission_error_from_effects(wait_request_id, &effects)?
1252        {
1253            return Err(error);
1254        }
1255        Ok(())
1256    }
1257
1258    fn try_satisfy_wait_all_authority(
1259        &mut self,
1260    ) -> Result<Option<WaitAllSatisfied>, OpsLifecycleError> {
1261        let Some(dsl_wait_request_id) = self.dsl.0.state().wait_request_id.clone() else {
1262            return Ok(None);
1263        };
1264        let Some(dsl_run_id) = self.dsl.0.state().wait_run_id.clone() else {
1265            return Err(OpsLifecycleError::Internal(
1266                "generated wait_all authority has active wait without run id".into(),
1267            ));
1268        };
1269        let dsl_operation_id_tokens = self.dsl.0.state().wait_operation_id_tokens.clone();
1270        let transition = match mm_dsl::MeerkatMachineMutator::apply(
1271            &mut *self.dsl.0,
1272            mm_dsl::MeerkatMachineInput::SatisfyWaitAll {
1273                wait_request_id: dsl_wait_request_id,
1274                run_id: dsl_run_id,
1275                operation_id_tokens: dsl_operation_id_tokens,
1276            },
1277        ) {
1278            Ok(transition) => transition,
1279            Err(mm_dsl::MeerkatMachineTransitionError::GuardRejected { .. }) => return Ok(None),
1280            Err(err) => {
1281                return Err(OpsLifecycleError::Internal(format!(
1282                    "DSL rejected ops transition (SatisfyWaitAll): {err:?}"
1283                )));
1284            }
1285        };
1286        Self::wait_all_satisfied_from_effects(transition.effects())?
1287            .ok_or_else(|| {
1288                OpsLifecycleError::Internal(
1289                    "generated wait_all authority accepted satisfaction without effect".into(),
1290                )
1291            })
1292            .map(Some)
1293    }
1294
1295    fn begin_wait_all_authority(
1296        &mut self,
1297        run_id: &RunId,
1298        wait_request_id: &WaitRequestId,
1299        operation_ids: &[OperationId],
1300    ) -> Result<WaitAllAuthorityPlan, OpsLifecycleError> {
1301        let mut dsl_ids = BTreeSet::new();
1302        let mut dsl_id_tokens = BTreeSet::new();
1303        let mut operation_token_by_id = BTreeMap::new();
1304        let mut operation_id_by_token = BTreeMap::new();
1305        for id in operation_ids {
1306            let token = mm_dsl::OperationId::from_domain(id);
1307            let raw_id = token.0.clone();
1308            dsl_ids.insert(raw_id.clone());
1309            dsl_id_tokens.insert(token.clone());
1310            operation_token_by_id.insert(raw_id.clone(), token.clone());
1311            operation_id_by_token.insert(token, raw_id);
1312        }
1313        self.resolve_wait_all_admission(
1314            wait_request_id,
1315            operation_ids,
1316            &dsl_ids,
1317            &dsl_id_tokens,
1318            &operation_token_by_id,
1319            &operation_id_by_token,
1320        )?;
1321        self.dsl_apply(
1322            mm_dsl::MeerkatMachineInput::RequestWaitAll {
1323                run_id: mm_dsl::RunId::from_domain(run_id),
1324                wait_request_id: mm_dsl::WaitRequestId::from_domain(wait_request_id),
1325                operation_id_sequence: operation_ids
1326                    .iter()
1327                    .map(|id| mm_dsl::OperationId::from_domain(id).0)
1328                    .collect(),
1329                operation_ids: dsl_ids,
1330                operation_id_tokens: dsl_id_tokens,
1331                operation_token_by_id,
1332                operation_id_by_token,
1333            },
1334            "RequestWaitAll",
1335        )?;
1336        if let Some(satisfied) = self.try_satisfy_wait_all_authority()? {
1337            return Ok(WaitAllAuthorityPlan::AlreadySatisfied(satisfied));
1338        }
1339        Ok(WaitAllAuthorityPlan::ActivateBarrier)
1340    }
1341
1342    fn owner_termination_targets(
1343        &self,
1344    ) -> Result<Vec<(OperationId, OperationStatus)>, OpsLifecycleError> {
1345        let mut targets = Vec::new();
1346        for id in self.operation_ids()? {
1347            let status = self.require_status(&id)?;
1348            if !Self::operation_status_is_terminal(&id, status)? {
1349                targets.push((id, status));
1350            }
1351        }
1352        Ok(targets)
1353    }
1354
1355    fn operation_status_is_terminal(
1356        operation_id: &OperationId,
1357        status: OperationStatus,
1358    ) -> Result<bool, OpsLifecycleError> {
1359        let operation_id_key = mm_dsl::OperationId::from_domain(operation_id).0;
1360        let effects = Self::apply_stateless_classifier(
1361            mm_dsl::MeerkatMachineInput::ClassifyOperationTerminality {
1362                operation_id: operation_id_key.clone(),
1363                status: mm_dsl::OperationStatus::from(status),
1364            },
1365            "ClassifyOperationTerminality",
1366        )?;
1367        let mut terminal = None;
1368        for effect in effects {
1369            match effect {
1370                mm_dsl::MeerkatMachineEffect::OperationTerminal { operation_id }
1371                    if operation_id == operation_id_key =>
1372                {
1373                    terminal = Some(true);
1374                }
1375                mm_dsl::MeerkatMachineEffect::OperationNonTerminal { operation_id }
1376                    if operation_id == operation_id_key =>
1377                {
1378                    terminal = Some(false);
1379                }
1380                other => {
1381                    return Err(OpsLifecycleError::Internal(format!(
1382                        "unexpected generated operation terminality effect: {other:?}"
1383                    )));
1384                }
1385            }
1386        }
1387        terminal.ok_or_else(|| {
1388            OpsLifecycleError::Internal(format!(
1389                "generated operation terminality authority emitted no effect for {operation_id}"
1390            ))
1391        })
1392    }
1393
1394    fn operation_public_result_class(
1395        operation_id: &OperationId,
1396        status: OperationStatus,
1397    ) -> Result<OperationPublicResultClass, OpsLifecycleError> {
1398        let operation_id_key = mm_dsl::OperationId::from_domain(operation_id).0;
1399        let effects = Self::apply_stateless_classifier(
1400            mm_dsl::MeerkatMachineInput::ClassifyOperationPublicResult {
1401                operation_id: operation_id_key.clone(),
1402                status: mm_dsl::OperationStatus::from(status),
1403            },
1404            "ClassifyOperationPublicResult",
1405        )?;
1406        let mut result = None;
1407        for effect in effects {
1408            match effect {
1409                mm_dsl::MeerkatMachineEffect::OperationPublicResultClassified {
1410                    operation_id,
1411                    result: classified,
1412                } if operation_id == operation_id_key => {
1413                    result = Some(OperationPublicResultClass::from(classified));
1414                }
1415                other => {
1416                    return Err(OpsLifecycleError::Internal(format!(
1417                        "unexpected generated operation public-result effect: {other:?}"
1418                    )));
1419                }
1420            }
1421        }
1422        result.ok_or_else(|| {
1423            OpsLifecycleError::Internal(format!(
1424                "generated operation public-result authority emitted no effect for {operation_id}"
1425            ))
1426        })
1427    }
1428
1429    fn operation_transition_rejection_is_idempotent(
1430        operation_id: &OperationId,
1431        action: OperationLifecycleAction,
1432        status: OperationStatus,
1433    ) -> Result<bool, OpsLifecycleError> {
1434        let operation_id_key = mm_dsl::OperationId::from_domain(operation_id).0;
1435        let action = mm_dsl::OpLifecycleActionKind::from(action);
1436        let status = mm_dsl::OperationStatus::from(status);
1437        let effects = Self::apply_stateless_classifier(
1438            mm_dsl::MeerkatMachineInput::ClassifyOperationTransitionIdempotence {
1439                operation_id: operation_id_key.clone(),
1440                action,
1441                status,
1442            },
1443            "ClassifyOperationTransitionIdempotence",
1444        )?;
1445        let mut idempotent = None;
1446        for effect in effects {
1447            match effect {
1448                mm_dsl::MeerkatMachineEffect::OperationTransitionIdempotentSuccess {
1449                    operation_id,
1450                    action: effect_action,
1451                    status: effect_status,
1452                } if operation_id == operation_id_key
1453                    && effect_action == action
1454                    && effect_status == status =>
1455                {
1456                    idempotent = Some(true);
1457                }
1458                mm_dsl::MeerkatMachineEffect::OperationTransitionNotIdempotent {
1459                    operation_id,
1460                    action: effect_action,
1461                    status: effect_status,
1462                } if operation_id == operation_id_key
1463                    && effect_action == action
1464                    && effect_status == status =>
1465                {
1466                    idempotent = Some(false);
1467                }
1468                other => {
1469                    return Err(OpsLifecycleError::Internal(format!(
1470                        "unexpected generated operation transition-idempotence effect: {other:?}"
1471                    )));
1472                }
1473            }
1474        }
1475        idempotent.ok_or_else(|| {
1476            OpsLifecycleError::Internal(format!(
1477                "generated operation transition-idempotence authority emitted no effect for {operation_id}"
1478            ))
1479        })
1480    }
1481
1482    fn operation_completion_feed_class(
1483        operation_id: &OperationId,
1484        kind: OperationKind,
1485    ) -> Result<mm_dsl::OperationCompletionFeedClass, OpsLifecycleError> {
1486        let operation_id_key = mm_dsl::OperationId::from_domain(operation_id).0;
1487        let kind = mm_dsl::OperationKind::from(kind);
1488        let effects = Self::apply_stateless_classifier(
1489            mm_dsl::MeerkatMachineInput::ClassifyOperationCompletionFeed {
1490                operation_id: operation_id_key.clone(),
1491                kind,
1492            },
1493            "ClassifyOperationCompletionFeed",
1494        )?;
1495        let mut class = None;
1496        for effect in effects {
1497            match effect {
1498                mm_dsl::MeerkatMachineEffect::OperationCompletionFeedClassified {
1499                    operation_id,
1500                    result,
1501                } if operation_id == operation_id_key => {
1502                    class = Some(result);
1503                }
1504                other => {
1505                    return Err(OpsLifecycleError::Internal(format!(
1506                        "unexpected generated operation completion-feed effect: {other:?}"
1507                    )));
1508                }
1509            }
1510        }
1511        class.ok_or_else(|| {
1512            OpsLifecycleError::Internal(format!(
1513                "generated operation completion-feed authority emitted no effect for {operation_id}"
1514            ))
1515        })
1516    }
1517
1518    fn operation_completion_wake_class(
1519        operation_id: &OperationId,
1520        kind: OperationKind,
1521    ) -> Result<OperationCompletionWakeClass, OpsLifecycleError> {
1522        let operation_id_key = mm_dsl::OperationId::from_domain(operation_id).0;
1523        let kind = mm_dsl::OperationKind::from(kind);
1524        let effects = Self::apply_stateless_classifier(
1525            mm_dsl::MeerkatMachineInput::ClassifyOperationCompletionWake {
1526                operation_id: operation_id_key.clone(),
1527                kind,
1528            },
1529            "ClassifyOperationCompletionWake",
1530        )?;
1531        let mut class = None;
1532        for effect in effects {
1533            match effect {
1534                mm_dsl::MeerkatMachineEffect::OperationCompletionWakeClassified {
1535                    operation_id,
1536                    result,
1537                } if operation_id == operation_id_key => {
1538                    class = Some(OperationCompletionWakeClass::from(result));
1539                }
1540                other => {
1541                    return Err(OpsLifecycleError::Internal(format!(
1542                        "unexpected generated operation completion-wake effect: {other:?}"
1543                    )));
1544                }
1545            }
1546        }
1547        class.ok_or_else(|| {
1548            OpsLifecycleError::Internal(format!(
1549                "generated operation completion-wake authority emitted no effect for {operation_id}"
1550            ))
1551        })
1552    }
1553
1554    fn operation_durability_class(
1555        operation_id: &OperationId,
1556        kind: OperationKind,
1557    ) -> Result<mm_dsl::OperationDurabilityClass, OpsLifecycleError> {
1558        let operation_id_key = mm_dsl::OperationId::from_domain(operation_id).0;
1559        let kind = mm_dsl::OperationKind::from(kind);
1560        let effects = Self::apply_stateless_classifier(
1561            mm_dsl::MeerkatMachineInput::ClassifyOperationDurability {
1562                operation_id: operation_id_key.clone(),
1563                kind,
1564            },
1565            "ClassifyOperationDurability",
1566        )?;
1567        let mut class = None;
1568        for effect in effects {
1569            match effect {
1570                mm_dsl::MeerkatMachineEffect::OperationDurabilityClassified {
1571                    operation_id,
1572                    result,
1573                } if operation_id == operation_id_key => {
1574                    class = Some(result);
1575                }
1576                other => {
1577                    return Err(OpsLifecycleError::Internal(format!(
1578                        "unexpected generated operation durability effect: {other:?}"
1579                    )));
1580                }
1581            }
1582        }
1583        class.ok_or_else(|| {
1584            OpsLifecycleError::Internal(format!(
1585                "generated operation durability authority emitted no effect for {operation_id}"
1586            ))
1587        })
1588    }
1589
1590    fn recovered_operation_record_disposition(
1591        operation_id: &OperationId,
1592        status: OperationStatus,
1593        kind: OperationKind,
1594        terminal_outcome_present: bool,
1595        terminal_payload_present: bool,
1596        completion_sequence_present: bool,
1597    ) -> Result<RecoveredOperationRecordDisposition, OpsLifecycleError> {
1598        let operation_id_key = mm_dsl::OperationId::from_domain(operation_id).0;
1599        let effects = Self::apply_stateless_classifier(
1600            mm_dsl::MeerkatMachineInput::ClassifyRecoveredOperationRecord {
1601                operation_id: operation_id_key.clone(),
1602                status: mm_dsl::OperationStatus::from(status),
1603                kind: mm_dsl::OperationKind::from(kind),
1604                terminal_outcome_present,
1605                terminal_payload_present,
1606                completion_sequence_present,
1607            },
1608            "ClassifyRecoveredOperationRecord",
1609        )?;
1610        let mut disposition = None;
1611        for effect in effects {
1612            match effect {
1613                mm_dsl::MeerkatMachineEffect::RetainTerminalRecord { operation_id }
1614                    if operation_id == operation_id_key =>
1615                {
1616                    disposition = Some(RecoveredOperationRecordDisposition::Retain);
1617                }
1618                mm_dsl::MeerkatMachineEffect::DiscardRecoveredOperationRecord { operation_id }
1619                    if operation_id == operation_id_key =>
1620                {
1621                    disposition = Some(RecoveredOperationRecordDisposition::Discard);
1622                }
1623                other => {
1624                    return Err(OpsLifecycleError::Internal(format!(
1625                        "unexpected generated recovered-operation classification effect: {other:?}"
1626                    )));
1627                }
1628            }
1629        }
1630        disposition.ok_or_else(|| {
1631            OpsLifecycleError::Internal(format!(
1632                "generated recovered-operation classifier emitted no effect for {operation_id}"
1633            ))
1634        })
1635    }
1636
1637    fn apply_stateless_classifier(
1638        input: mm_dsl::MeerkatMachineInput,
1639        label: &'static str,
1640    ) -> Result<Vec<mm_dsl::MeerkatMachineEffect>, OpsLifecycleError> {
1641        let mut authority = crate::meerkat_machine::dsl_authority::new_initialized_authority(
1642            "ops stateless classifier Initialize must be accepted",
1643        );
1644        let transition =
1645            mm_dsl::MeerkatMachineMutator::apply(&mut authority, input).map_err(|err| {
1646                OpsLifecycleError::Internal(format!(
1647                    "DSL rejected ops transition ({label}): {err:?}"
1648                ))
1649            })?;
1650        Ok(transition.into_effects())
1651    }
1652
1653    /// Check whether a pending barrier wait is now satisfied and resolve it.
1654    ///
1655    /// Barrier membership and the "all members terminal" decision both live
1656    /// in the DSL: `wait_operation_ids` carries the set, `wait_active`
1657    /// signals a pending barrier, and `SatisfyWaitAll`'s
1658    /// `all_members_terminal` guard owns the fixed-point test. The shell
1659    /// echoes the DSL-owned request id and typed operation tokens into
1660    /// `SatisfyWaitAll` so the transition can clear the barrier before
1661    /// rendering the handoff effect. It still fires idempotently on every
1662    /// terminal transition and lets the DSL guard reject early firings as a
1663    /// no-op. On acceptance (transition returns `Ok`), the shell selects the
1664    /// correlated oneshot and delivers the `WaitAllSatisfied` obligation
1665    /// token.
1666    ///
1667    /// `wait_request_id` is the shell-owned oneshot correlation id that
1668    /// selects which sender to notify; when the DSL barrier satisfies
1669    /// without a live correlation (post-recovery, or duplicate resolution),
1670    /// the oneshot simply remains pending. That benign no-correlation case is
1671    /// the only path that leaves the oneshot pending: on authority-invariant
1672    /// corruption the pending sender is dropped so the waiter's
1673    /// `WaitAllFuture` resolves to `Err` and the typed
1674    /// [`OpsLifecycleError`] propagates to the terminal transition caller
1675    /// instead of reporting the op complete with a silently-hung barrier.
1676    fn maybe_satisfy_wait(&mut self) -> Result<(), OpsLifecycleError> {
1677        let satisfied = match self.try_satisfy_wait_all_authority() {
1678            Ok(Some(satisfied)) => satisfied,
1679            Ok(None) => return Ok(()),
1680            Err(err) => {
1681                // Authority-invariant corruption: do NOT report the op complete
1682                // with a silently-pending barrier oneshot. Drop the sender so
1683                // the waiter's `WaitAllFuture` resolves to `Err` via its
1684                // `Poll::Ready(Err(_))` arm (the same mechanism
1685                // `cancel_wait_all_internal` uses), then propagate the typed
1686                // fault. The invariant is already broken, so we do not attempt
1687                // a `CancelWaitAll` rollback on the corrupt machine.
1688                if let Some(pending) = self.pending_wait.take() {
1689                    drop(pending.sender);
1690                }
1691                self.wait_request_id = None;
1692                return Err(err);
1693            }
1694        };
1695        let shell_wait_id = self.wait_request_id.take();
1696        if shell_wait_id
1697            .as_ref()
1698            .is_some_and(|id| id != &satisfied.wait_request_id)
1699        {
1700            tracing::error!(
1701                shell_wait_request_id = ?shell_wait_id,
1702                authority_wait_request_id = %satisfied.wait_request_id,
1703                "generated wait_all authority satisfied a different wait request"
1704            );
1705        }
1706        if let Some(pending) = self.pending_wait.take() {
1707            if pending.wait_request_id == satisfied.wait_request_id {
1708                let _ = pending.sender.send(satisfied);
1709            } else if let Some(shell_wait_id) = shell_wait_id {
1710                tracing::error!(
1711                    shell_wait_request_id = %shell_wait_id,
1712                    pending_wait_request_id = %pending.wait_request_id,
1713                    authority_wait_request_id = %satisfied.wait_request_id,
1714                    "generated wait_all authority satisfied without a matching pending waiter"
1715                );
1716            }
1717        }
1718        Ok(())
1719    }
1720
1721    /// Persist a terminal snapshot if a persistence channel is wired.
1722    ///
1723    /// Called after terminal transitions. Captures authority + entries + cursors under the write
1724    /// lock (caller already holds it), submits a persistence request, and waits for the worker's
1725    /// durable-store result. Returning success only means the snapshot write itself succeeded.
1726    fn maybe_persist(&self) -> Result<(), OpsLifecycleError> {
1727        let (tx, epoch_id, cursor_state) = match (
1728            &self.persist_tx,
1729            &self.persist_epoch_id,
1730            &self.persist_cursor_state,
1731        ) {
1732            (Some(tx), Some(epoch_id), Some(cs)) => (tx, epoch_id, cs),
1733            _ => return Ok(()),
1734        };
1735
1736        let snapshot = self.capture_snapshot(epoch_id.clone(), cursor_state)?;
1737        let (result_tx, result_rx) = std::sync::mpsc::sync_channel(1);
1738        let request = OpsLifecyclePersistenceRequest {
1739            snapshot,
1740            result_tx,
1741        };
1742
1743        tx.send(request).map_err(|_| {
1744            OpsLifecycleError::Internal(
1745                "ops lifecycle persistence channel closed before terminal snapshot could be queued"
1746                    .into(),
1747            )
1748        })?;
1749        result_rx.recv().map_err(|_| {
1750            OpsLifecycleError::Internal(
1751                "ops lifecycle persistence worker dropped terminal snapshot before confirming durability"
1752                    .into(),
1753            )
1754        })?
1755    }
1756
1757    /// Capture the full persisted snapshot for the current state.
1758    fn capture_snapshot(
1759        &self,
1760        epoch_id: meerkat_core::RuntimeEpochId,
1761        _cursor_state: &meerkat_core::EpochCursorState,
1762    ) -> Result<PersistedOpsSnapshot, OpsLifecycleError> {
1763        let mut operations: HashMap<OperationId, OperationCanonicalState> = HashMap::new();
1764        for op_id in self.operation_ids()? {
1765            let status = self.require_status(&op_id)?;
1766            let kind = self.require_kind(&op_id)?;
1767            if Self::operation_durability_class(&op_id, kind)?
1768                != mm_dsl::OperationDurabilityClass::Retain
1769            {
1770                continue;
1771            }
1772            let peer_ready = self.require_peer_ready(&op_id)?;
1773            let progress_count = self.require_progress_count(&op_id)?;
1774            let operation_source = self.operation_source(&op_id)?;
1775            let terminal_outcome = self.terminal_outcome(&op_id)?;
1776            let completion_sequence = self.completion_sequence(&op_id);
1777            if terminal_outcome.is_some() && completion_sequence.is_none() {
1778                return Err(OpsLifecycleError::Internal(format!(
1779                    "generated op terminal authority missing completion sequence for retained terminal {op_id}"
1780                )));
1781            }
1782            if terminal_outcome.is_none() && completion_sequence.is_some() {
1783                return Err(OpsLifecycleError::Internal(format!(
1784                    "generated op terminal authority has completion sequence for non-terminal {op_id}"
1785                )));
1786            }
1787            let terminal_buffered = terminal_outcome.is_some();
1788            let watcher_count = self
1789                .records
1790                .get(&op_id)
1791                .map(|r| r.watchers.len() as u32)
1792                .unwrap_or(0);
1793            operations.insert(
1794                op_id,
1795                OperationCanonicalState {
1796                    status,
1797                    kind,
1798                    operation_source,
1799                    peer_ready,
1800                    progress_count,
1801                    watcher_count,
1802                    terminal_outcome,
1803                    completion_sequence,
1804                    terminal_buffered,
1805                },
1806            );
1807        }
1808        let operation_specs: HashMap<OperationId, OperationSpec> = self
1809            .records
1810            .iter()
1811            .filter(|(id, _)| operations.contains_key(*id))
1812            .map(|(id, record)| {
1813                let mut spec = record.spec.clone();
1814                let operation_source = operations
1815                    .get(id)
1816                    .and_then(|state| state.operation_source.as_ref());
1817                Self::align_spec_child_session_id_to_source(&mut spec, operation_source);
1818                (id.clone(), spec)
1819            })
1820            .collect();
1821        let completed_order: VecDeque<OperationId> = self
1822            .completed_order
1823            .iter()
1824            .filter(|id| operations.contains_key(*id))
1825            .cloned()
1826            .collect();
1827        let active_count = operations
1828            .iter()
1829            .filter(|(id, state)| {
1830                matches!(
1831                    Self::operation_status_is_terminal(id, state.status),
1832                    Ok(false)
1833                )
1834            })
1835            .count();
1836        let completion_entries = {
1837            let inner = self
1838                .feed_buffer
1839                .inner
1840                .read()
1841                .unwrap_or_else(std::sync::PoisonError::into_inner);
1842            inner.entries.iter().cloned().collect()
1843        };
1844
1845        let authority_state = RegistryCanonicalState {
1846            operations,
1847            completion_feed_entries: self.completion_feed_authority_entries()?,
1848            completed_order,
1849            max_completed: self.max_completed,
1850            max_concurrent: self.max_concurrent,
1851            active_count,
1852            wait_request_id: self.wait_request_id.clone(),
1853            wait_operation_ids: self.wait_operation_ids()?,
1854            next_completion_seq: self.dsl.0.state().next_completion_seq,
1855        };
1856
1857        Ok(PersistedOpsSnapshot {
1858            epoch_id,
1859            authority_state,
1860            operation_specs,
1861            completion_entries,
1862            cursors: self.completion_cursor_snapshot(),
1863        })
1864    }
1865
1866    fn shell_record_mut(
1867        &mut self,
1868        id: &OperationId,
1869    ) -> Result<&mut ShellRecord, OpsLifecycleError> {
1870        self.records
1871            .get_mut(id)
1872            .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))
1873    }
1874
1875    fn collect_wait_outcomes(
1876        &self,
1877        operation_ids: &[OperationId],
1878    ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
1879        operation_ids
1880            .iter()
1881            .map(|operation_id| {
1882                let outcome = self.terminal_outcome(operation_id)?.ok_or_else(|| {
1883                    OpsLifecycleError::Internal(format!(
1884                        "wait_all completed without terminal outcome for {operation_id}"
1885                    ))
1886                })?;
1887                Ok((operation_id.clone(), outcome))
1888            })
1889            .collect()
1890    }
1891}
1892
1893impl Default for ShellState {
1894    fn default() -> Self {
1895        Self::new(DEFAULT_MAX_COMPLETED, None)
1896    }
1897}
1898
1899// ---------------------------------------------------------------------------
1900// Public configuration & registry
1901// ---------------------------------------------------------------------------
1902
1903/// Configuration for [`RuntimeOpsLifecycleRegistry`].
1904#[derive(Debug, Clone)]
1905pub struct OpsLifecycleConfig {
1906    /// Maximum number of completed operations to retain (default: 256).
1907    pub max_completed: usize,
1908    /// Maximum concurrent non-terminal operations (None = unlimited).
1909    pub max_concurrent: Option<usize>,
1910}
1911
1912impl Default for OpsLifecycleConfig {
1913    fn default() -> Self {
1914        Self {
1915            max_completed: DEFAULT_MAX_COMPLETED,
1916            max_concurrent: None,
1917        }
1918    }
1919}
1920
1921/// Per-runtime shared registry for async operation lifecycle truth.
1922///
1923/// Per-operation canonical lifecycle state is owned by the DSL authority
1924/// embedded in the shell. This struct manages I/O concerns: watcher
1925/// channels, timestamps, peer handles, snapshot assembly, FIFO eviction,
1926/// and the completion feed buffer.
1927#[derive(Debug)]
1928pub struct RuntimeOpsLifecycleRegistry {
1929    state: RwLock<ShellState>,
1930}
1931
1932#[derive(Debug, Clone)]
1933pub(crate) struct RuntimeOpsDiagnosticSnapshot {
1934    pub operation_count: usize,
1935    pub active_count: usize,
1936    pub wait_request_id: Option<WaitRequestId>,
1937    pub pending_wait_present: bool,
1938    pub pending_wait_request_id: Option<WaitRequestId>,
1939    pub wait_operation_ids: Vec<OperationId>,
1940    pub operations: Vec<OperationLifecycleSnapshot>,
1941}
1942
1943impl Default for RuntimeOpsLifecycleRegistry {
1944    fn default() -> Self {
1945        Self {
1946            state: RwLock::new(ShellState::default()),
1947        }
1948    }
1949}
1950
1951impl RuntimeOpsLifecycleRegistry {
1952    pub fn new() -> Self {
1953        let dsl = new_ops_dsl_authority();
1954        let feed_capacity = DEFAULT_MAX_COMPLETED.saturating_mul(4).max(1024);
1955        let feed_buffer = Arc::new(FeedBuffer::new(feed_capacity));
1956        Self {
1957            state: RwLock::new(ShellState {
1958                dsl,
1959                records: HashMap::new(),
1960                pending_wait: None,
1961                completed_order: VecDeque::new(),
1962                max_completed: DEFAULT_MAX_COMPLETED,
1963                max_concurrent: None,
1964                wait_request_id: None,
1965                feed_buffer,
1966                persist_tx: None,
1967                persist_epoch_id: None,
1968                persist_cursor_state: None,
1969            }),
1970        }
1971    }
1972
1973    pub fn with_config(config: OpsLifecycleConfig) -> Self {
1974        Self {
1975            state: RwLock::new(ShellState::new(config.max_completed, config.max_concurrent)),
1976        }
1977    }
1978
1979    fn recover_completion_feed_entry(
1980        shell: &mut ShellState,
1981        operation_id: &OperationId,
1982        entry: &CompletionFeedCanonicalState,
1983    ) -> Result<(), OpsLifecycleError> {
1984        let expected_operation_id = mm_dsl::OperationId::from_domain(operation_id).0;
1985        let terminal_outcome_kind =
1986            mm_dsl::OperationTerminalOutcomeKind::from(&entry.terminal_outcome);
1987        let effects = shell.dsl_apply_with_effects(
1988            mm_dsl::MeerkatMachineInput::RecoverCompletionFeedEntry {
1989                operation_id: expected_operation_id.clone(),
1990                kind: mm_dsl::OperationKind::from(entry.kind),
1991                terminal_outcome: terminal_outcome_kind,
1992                terminal_payload: entry.terminal_outcome.clone(),
1993                completion_sequence: entry.seq,
1994            },
1995            "RecoverCompletionFeedEntry",
1996        )?;
1997        let recovered = effects.iter().find_map(|effect| match effect {
1998            mm_dsl::MeerkatMachineEffect::CompletionFeedEntryRecovered {
1999                operation_id,
2000                seq,
2001                kind,
2002                terminal_outcome,
2003                terminal_payload,
2004            } => Some((
2005                operation_id,
2006                *seq,
2007                OperationKind::from(*kind),
2008                *terminal_outcome,
2009                terminal_payload,
2010            )),
2011            _ => None,
2012        });
2013        let Some((operation_id, seq, kind, terminal_outcome, terminal_payload)) = recovered else {
2014            return Err(OpsLifecycleError::Internal(
2015                "generated completion-feed recovery emitted no recovered entry".into(),
2016            ));
2017        };
2018        if operation_id != &expected_operation_id
2019            || seq != entry.seq
2020            || kind != entry.kind
2021            || terminal_outcome != terminal_outcome_kind
2022            || terminal_payload != &entry.terminal_outcome
2023        {
2024            return Err(OpsLifecycleError::Internal(format!(
2025                "generated completion-feed recovery drifted for {operation_id}"
2026            )));
2027        }
2028        Ok(())
2029    }
2030
2031    /// Wire a persistence channel for durable snapshot writes.
2032    ///
2033    /// After this call, terminal transitions (complete/fail/cancel/abort)
2034    /// capture a snapshot and queue it to the channel. A dedicated
2035    /// persistence task should drain the channel and write to the store.
2036    pub fn set_persistence_channel(
2037        &self,
2038        tx: crate::tokio::sync::mpsc::UnboundedSender<OpsLifecyclePersistenceRequest>,
2039        epoch_id: meerkat_core::RuntimeEpochId,
2040        cursor_state: Arc<meerkat_core::EpochCursorState>,
2041    ) {
2042        if let Ok(mut state) = self.state.write() {
2043            state.persist_tx = Some(tx);
2044            state.persist_epoch_id = Some(epoch_id);
2045            state.persist_cursor_state = Some(cursor_state);
2046        }
2047    }
2048
2049    /// Recover from a persisted snapshot.
2050    ///
2051    /// Rebuilds DSL state (stripping non-terminal ops — only terminals
2052    /// survive recovery), creates fresh shell records from specs, and seeds
2053    /// the feed buffer only with completion entries accepted by generated
2054    /// recovery authority.
2055    pub fn from_recovered(snapshot: PersistedOpsSnapshot) -> Result<Self, OpsLifecycleError> {
2056        let PersistedOpsSnapshot {
2057            authority_state,
2058            operation_specs,
2059            completion_entries,
2060            cursors,
2061            ..
2062        } = snapshot;
2063        let max_completed = authority_state.max_completed;
2064        let max_concurrent = authority_state.max_concurrent;
2065        let next_completion_seq = authority_state.next_completion_seq;
2066        let authority_completion_entries = authority_state.completion_feed_entries;
2067        let authority_operations = authority_state.operations;
2068        let mut shell = ShellState::new(max_completed, max_concurrent);
2069
2070        // Replay every persisted op through generated recovery authority.
2071        // The transition accepts only terminal records with outcome and
2072        // completion-sequence witnesses. Volatile non-terminal rows are not
2073        // recovered; terminal/corrupt rows must fail closed instead of being
2074        // projected into shell/public feed state.
2075        let mut retained_ids: HashSet<OperationId> = HashSet::new();
2076        for (op_id, op_state) in authority_operations {
2077            let terminal_outcome = op_state
2078                .terminal_outcome
2079                .as_ref()
2080                .map(mm_dsl::OperationTerminalOutcomeKind::from);
2081            let terminal_payload = op_state.terminal_outcome.clone();
2082            let disposition = ShellState::recovered_operation_record_disposition(
2083                &op_id,
2084                op_state.status,
2085                op_state.kind,
2086                terminal_outcome.is_some(),
2087                terminal_payload.is_some(),
2088                op_state.completion_sequence.is_some(),
2089            )?;
2090            if disposition == RecoveredOperationRecordDisposition::Discard {
2091                continue;
2092            }
2093            if let Some(spec_source) = operation_specs
2094                .get(&op_id)
2095                .and_then(|spec| spec.operation_source.as_ref())
2096                && op_state.operation_source.as_ref() != Some(spec_source)
2097            {
2098                return Err(OpsLifecycleError::Internal(format!(
2099                    "persisted operation source mirror for {op_id} drifted from generated authority"
2100                )));
2101            }
2102            let recovery = mm_dsl::MeerkatMachineInput::RecoverOpRecord {
2103                operation_id: mm_dsl::OperationId::from_domain(&op_id).0,
2104                status: mm_dsl::OperationStatus::from(op_state.status),
2105                kind: mm_dsl::OperationKind::from(op_state.kind),
2106                source: op_state
2107                    .operation_source
2108                    .as_ref()
2109                    .map(mm_dsl::OperationSource::from_domain),
2110                peer_ready: op_state.peer_ready,
2111                progress_count: u64::from(op_state.progress_count),
2112                terminal_outcome,
2113                terminal_payload,
2114                completion_sequence: op_state.completion_sequence,
2115            };
2116            shell.dsl_apply(recovery, "RecoverOpRecord")?;
2117            let recovered_seq = shell.completion_sequence(&op_id).ok_or_else(|| {
2118                OpsLifecycleError::Internal(format!(
2119                    "generated op recovery accepted {op_id} without completion sequence"
2120                ))
2121            })?;
2122            if op_state.completion_sequence != Some(recovered_seq) {
2123                return Err(OpsLifecycleError::Internal(format!(
2124                    "generated op recovery completion sequence mismatch for {op_id}"
2125                )));
2126            }
2127            retained_ids.insert(op_id);
2128        }
2129        shell.dsl_apply(
2130            mm_dsl::MeerkatMachineInput::RecoverOpsCompletionCursor {
2131                next_completion_seq,
2132            },
2133            "RecoverOpsCompletionCursor",
2134        )?;
2135        shell.dsl_apply(
2136            mm_dsl::MeerkatMachineInput::RecoverCompletionConsumerCursors {
2137                agent_applied_cursor: cursors.agent_applied_cursor,
2138                runtime_observed_cursor: cursors.runtime_observed_seq,
2139                runtime_injected_cursor: cursors.runtime_last_injected_seq,
2140            },
2141            "RecoverCompletionConsumerCursors",
2142        )?;
2143
2144        // Rebuild completed_order from generated completion-sequence truth,
2145        // never from the persisted shell ordering mirror.
2146        let mut recovered_completed: Vec<(CompletionSeq, OperationId)> = retained_ids
2147            .iter()
2148            .filter_map(|id| shell.completion_sequence(id).map(|seq| (seq, id.clone())))
2149            .collect();
2150        recovered_completed.sort_by_key(|(seq, _)| *seq);
2151        shell.completed_order = recovered_completed.into_iter().map(|(_, id)| id).collect();
2152
2153        // Recover generated-owned feed authority for entries whose operation
2154        // record is no longer retained. Retained records already wrote their
2155        // feed authority through RecoverOpRecord above.
2156        for (operation_id, entry) in &authority_completion_entries {
2157            if !retained_ids.contains(operation_id) {
2158                Self::recover_completion_feed_entry(&mut shell, operation_id, entry)?;
2159            }
2160        }
2161
2162        let canonical_feed_entries = shell.completion_feed_authority_entries()?;
2163        for (operation_id, entry) in &authority_completion_entries {
2164            let Some(recovered_entry) = canonical_feed_entries.get(operation_id) else {
2165                return Err(OpsLifecycleError::Internal(format!(
2166                    "persisted completion feed authority for {operation_id} was not recovered"
2167                )));
2168            };
2169            if recovered_entry != entry {
2170                return Err(OpsLifecycleError::Internal(format!(
2171                    "persisted completion feed authority drifted from generated recovery for {operation_id}"
2172                )));
2173            }
2174        }
2175
2176        // Projection rows may carry display metadata only after generated
2177        // feed authority has decided the operation id, sequence, kind, and
2178        // terminal outcome. Any semantic drift fails closed.
2179        let mut projection_entries_by_id: HashMap<OperationId, CompletionEntry> = HashMap::new();
2180        for entry in completion_entries {
2181            let Some(authority_entry) = canonical_feed_entries.get(&entry.operation_id) else {
2182                return Err(OpsLifecycleError::Internal(format!(
2183                    "persisted completion feed projection for {} has no generated feed authority",
2184                    entry.operation_id
2185                )));
2186            };
2187            if authority_entry.seq != entry.seq
2188                || authority_entry.kind != entry.kind
2189                || authority_entry.terminal_outcome != entry.terminal_outcome
2190            {
2191                return Err(OpsLifecycleError::Internal(format!(
2192                    "persisted completion feed projection for {} drifted from generated feed authority",
2193                    entry.operation_id
2194                )));
2195            }
2196            projection_entries_by_id.insert(entry.operation_id.clone(), entry);
2197        }
2198
2199        let mut recovered_entries: Vec<(OperationId, CompletionFeedCanonicalState)> =
2200            canonical_feed_entries.into_iter().collect();
2201        recovered_entries.sort_by_key(|(_, entry)| entry.seq);
2202        for (operation_id, entry) in recovered_entries {
2203            let projection = projection_entries_by_id.get(&operation_id);
2204            let display_name = operation_specs
2205                .get(&operation_id)
2206                .map(|spec| spec.display_name.clone())
2207                .or_else(|| projection.map(|entry| entry.display_name.clone()))
2208                .unwrap_or_default();
2209            let completed_at_ms = projection.and_then(|entry| entry.completed_at_ms);
2210            shell.feed_buffer.push(CompletionEntry {
2211                seq: entry.seq,
2212                operation_id,
2213                kind: entry.kind,
2214                display_name,
2215                terminal_outcome: entry.terminal_outcome,
2216                completed_at_ms,
2217            });
2218        }
2219
2220        // Rebuild shell records from specs (fresh timestamps, no watchers)
2221        // — only for operations still retained in the DSL state.
2222        for (op_id, spec) in operation_specs {
2223            if retained_ids.contains(&op_id) {
2224                let mut spec = spec;
2225                let operation_source = shell.operation_source(&op_id)?;
2226                ShellState::align_spec_child_session_id_to_source(
2227                    &mut spec,
2228                    operation_source.as_ref(),
2229                );
2230                shell.records.insert(
2231                    op_id,
2232                    ShellRecord {
2233                        spec,
2234                        peer_handle: None,
2235                        watchers: Vec::new(),
2236                        created_at: Instant::now(),
2237                        started_at: None,
2238                        completed_at: None,
2239                        created_at_wall: SystemTime::now(),
2240                    },
2241                );
2242            }
2243        }
2244
2245        Ok(Self {
2246            state: RwLock::new(shell),
2247        })
2248    }
2249
2250    /// Capture a serializable snapshot of the current state for persistence.
2251    ///
2252    /// Includes authority state, operation specs, completion entries, and
2253    /// generated completion-consumer cursor values.
2254    pub fn capture_persistence_snapshot(
2255        &self,
2256        epoch_id: meerkat_core::RuntimeEpochId,
2257        cursor_state: &meerkat_core::EpochCursorState,
2258    ) -> Result<PersistedOpsSnapshot, OpsLifecycleError> {
2259        let state = self
2260            .state
2261            .read()
2262            .unwrap_or_else(std::sync::PoisonError::into_inner);
2263        state.capture_snapshot(epoch_id, cursor_state)
2264    }
2265
2266    /// Snapshot generated completion-consumer cursor state.
2267    pub fn completion_cursor_snapshot(&self) -> meerkat_core::EpochCursorSnapshot {
2268        let state = self
2269            .state
2270            .read()
2271            .unwrap_or_else(std::sync::PoisonError::into_inner);
2272        state.completion_cursor_snapshot()
2273    }
2274
2275    /// Return a read handle to the completion feed.
2276    pub fn completion_feed_handle(&self) -> Arc<dyn CompletionFeed> {
2277        let state = self
2278            .state
2279            .read()
2280            .unwrap_or_else(std::sync::PoisonError::into_inner);
2281        Arc::new(RuntimeCompletionFeed {
2282            buffer: Arc::clone(&state.feed_buffer),
2283        })
2284    }
2285
2286    /// Capture a stable diagnostic snapshot of the canonical ops lifecycle state.
2287    pub(crate) fn diagnostic_snapshot(
2288        &self,
2289    ) -> Result<RuntimeOpsDiagnosticSnapshot, OpsLifecycleError> {
2290        let state = self
2291            .state
2292            .read()
2293            .unwrap_or_else(std::sync::PoisonError::into_inner);
2294        let mut operations = state
2295            .operation_ids()?
2296            .into_iter()
2297            .map(|id| state.snapshot(&id))
2298            .collect::<Result<Vec<_>, _>>()?
2299            .into_iter()
2300            .flatten()
2301            .collect::<Vec<_>>();
2302        operations.sort_by(|left, right| left.display_name.cmp(&right.display_name));
2303        Ok(RuntimeOpsDiagnosticSnapshot {
2304            operation_count: state.operation_count(),
2305            active_count: state.active_count(),
2306            wait_request_id: state.wait_request_id.clone(),
2307            pending_wait_present: state.pending_wait.is_some(),
2308            pending_wait_request_id: state
2309                .pending_wait
2310                .as_ref()
2311                .map(|pending_wait| pending_wait.wait_request_id.clone()),
2312            wait_operation_ids: state.wait_operation_ids()?,
2313            operations,
2314        })
2315    }
2316
2317    fn read_state(&self) -> Result<RwLockReadGuard<'_, ShellState>, OpsLifecycleError> {
2318        self.state
2319            .read()
2320            .map_err(|_| OpsLifecycleError::Internal("ops lifecycle registry poisoned".into()))
2321    }
2322
2323    fn write_state(&self) -> Result<RwLockWriteGuard<'_, ShellState>, OpsLifecycleError> {
2324        self.state
2325            .write()
2326            .map_err(|_| OpsLifecycleError::Internal("ops lifecycle registry poisoned".into()))
2327    }
2328
2329    fn cancel_wait_all_internal(
2330        &self,
2331        wait_request_id: &WaitRequestId,
2332    ) -> Result<(), OpsLifecycleError> {
2333        let mut state = self.write_state()?;
2334        match state.wait_request_id.as_ref() {
2335            Some(active) if active == wait_request_id => {
2336                // Clear the DSL barrier via the dedicated `CancelWaitAll`
2337                // transition. Unlike `SatisfyWaitAll`, it does not require
2338                // every member to be terminal (the request was dropped, not
2339                // resolved) and does not emit the `WaitAllSatisfied`.
2340                state.dsl_apply(
2341                    mm_dsl::MeerkatMachineInput::CancelWaitAll,
2342                    "CancelWaitAll(cancel)",
2343                )?;
2344                state.wait_request_id = None;
2345                if state
2346                    .pending_wait
2347                    .as_ref()
2348                    .is_some_and(|pending| pending.wait_request_id == *wait_request_id)
2349                {
2350                    state.pending_wait = None;
2351                }
2352                Ok(())
2353            }
2354            _ => {
2355                if state
2356                    .pending_wait
2357                    .as_ref()
2358                    .is_some_and(|pending| pending.wait_request_id == *wait_request_id)
2359                {
2360                    state.pending_wait = None;
2361                }
2362                Ok(())
2363            }
2364        }
2365    }
2366}
2367
2368enum WaitAllFutureState {
2369    Ready(Option<Result<WaitAllResult, OpsLifecycleError>>),
2370    Waiting(tokio::sync::oneshot::Receiver<WaitAllSatisfied>),
2371    Done,
2372}
2373
2374struct WaitAllFuture<'a> {
2375    registry: &'a RuntimeOpsLifecycleRegistry,
2376    wait_request_id: WaitRequestId,
2377    state: WaitAllFutureState,
2378}
2379
2380impl Future for WaitAllFuture<'_> {
2381    type Output = Result<WaitAllResult, OpsLifecycleError>;
2382
2383    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2384        match &mut self.state {
2385            WaitAllFutureState::Ready(result) => {
2386                let ready = result.take().unwrap_or_else(|| {
2387                    Err(OpsLifecycleError::Internal(
2388                        "wait_all future polled after completion".into(),
2389                    ))
2390                });
2391                self.state = WaitAllFutureState::Done;
2392                Poll::Ready(ready)
2393            }
2394            WaitAllFutureState::Waiting(receiver) => match std::pin::Pin::new(receiver).poll(cx) {
2395                Poll::Pending => Poll::Pending,
2396                Poll::Ready(Ok(satisfied)) => {
2397                    let outcomes = match self.registry.read_state() {
2398                        Ok(state) => state.collect_wait_outcomes(&satisfied.operation_ids),
2399                        Err(err) => Err(err),
2400                    };
2401                    self.state = WaitAllFutureState::Done;
2402                    Poll::Ready(outcomes.map(|outcomes| WaitAllResult {
2403                        outcomes,
2404                        satisfied,
2405                    }))
2406                }
2407                Poll::Ready(Err(_)) => {
2408                    self.state = WaitAllFutureState::Done;
2409                    Poll::Ready(Err(OpsLifecycleError::Internal(
2410                        "wait_all completion channel dropped".into(),
2411                    )))
2412                }
2413            },
2414            WaitAllFutureState::Done => Poll::Ready(Err(OpsLifecycleError::Internal(
2415                "wait_all future polled after completion".into(),
2416            ))),
2417        }
2418    }
2419}
2420
2421impl Drop for WaitAllFuture<'_> {
2422    fn drop(&mut self) {
2423        if matches!(self.state, WaitAllFutureState::Waiting(_))
2424            && let Err(err) = self
2425                .registry
2426                .cancel_wait_all_internal(&self.wait_request_id)
2427        {
2428            tracing::error!(
2429                wait_request_id = %self.wait_request_id,
2430                error = %err,
2431                "generated wait_all authority rejected cancellation during drop"
2432            );
2433        }
2434    }
2435}
2436
2437// ---------------------------------------------------------------------------
2438// Generated op lifecycle result-class feedback
2439// ---------------------------------------------------------------------------
2440
2441fn op_lifecycle_action_label(action: mm_dsl::OpLifecycleActionKind) -> &'static str {
2442    match action {
2443        mm_dsl::OpLifecycleActionKind::Start => "provisioning_succeeded",
2444        mm_dsl::OpLifecycleActionKind::Fail => "fail_operation",
2445        mm_dsl::OpLifecycleActionKind::PeerReady => "peer_ready",
2446        mm_dsl::OpLifecycleActionKind::ProgressReported => "report_progress",
2447        mm_dsl::OpLifecycleActionKind::Complete => "complete_operation",
2448        mm_dsl::OpLifecycleActionKind::Abort => "abort_provisioning",
2449        mm_dsl::OpLifecycleActionKind::Cancel => "cancel_operation",
2450        mm_dsl::OpLifecycleActionKind::RetireRequested => "request_retire",
2451        mm_dsl::OpLifecycleActionKind::RetireCompleted => "mark_retired",
2452        mm_dsl::OpLifecycleActionKind::Terminate => "terminate_owner",
2453    }
2454}
2455
2456fn op_lifecycle_rejection_error_from_effects(
2457    id: &OperationId,
2458    requested_action: mm_dsl::OpLifecycleActionKind,
2459    effects: &[mm_dsl::MeerkatMachineEffect],
2460) -> Result<OpsLifecycleError, OpsLifecycleError> {
2461    let expected_id = mm_dsl::OperationId::from_domain(id).0;
2462    let mut rejection = None;
2463    for effect in effects {
2464        let mm_dsl::MeerkatMachineEffect::OpLifecycleTransitionRejected {
2465            operation_id,
2466            action,
2467            reason,
2468            status,
2469        } = effect
2470        else {
2471            continue;
2472        };
2473        if rejection.is_some() {
2474            return Err(OpsLifecycleError::Internal(
2475                "generated op lifecycle authority emitted multiple rejection results".into(),
2476            ));
2477        }
2478        if operation_id != &expected_id || *action != requested_action {
2479            return Err(OpsLifecycleError::Internal(format!(
2480                "generated op lifecycle authority resolved {operation_id}/{action:?} while shell requested {expected_id}/{requested_action:?}"
2481            )));
2482        }
2483        let error = match reason {
2484            mm_dsl::OpLifecycleRejectReasonKind::OperationNotFound => {
2485                if status.is_some() {
2486                    return Err(OpsLifecycleError::Internal(
2487                        "generated op lifecycle authority emitted not-found with status".into(),
2488                    ));
2489                }
2490                OpsLifecycleError::NotFound(id.clone())
2491            }
2492            mm_dsl::OpLifecycleRejectReasonKind::InvalidTransition => {
2493                let status = status.ok_or_else(|| {
2494                    OpsLifecycleError::Internal(
2495                        "generated op lifecycle authority emitted invalid-transition without status"
2496                            .into(),
2497                    )
2498                })?;
2499                OpsLifecycleError::InvalidTransition {
2500                    id: id.clone(),
2501                    status: OperationStatus::from(status),
2502                    action: op_lifecycle_action_label(requested_action),
2503                }
2504            }
2505            mm_dsl::OpLifecycleRejectReasonKind::PeerNotExpected => {
2506                if status.is_none() {
2507                    return Err(OpsLifecycleError::Internal(
2508                        "generated op lifecycle authority emitted peer-not-expected without status"
2509                            .into(),
2510                    ));
2511                }
2512                OpsLifecycleError::PeerNotExpected(id.clone())
2513            }
2514            mm_dsl::OpLifecycleRejectReasonKind::AlreadyPeerReady => {
2515                if status.is_none() {
2516                    return Err(OpsLifecycleError::Internal(
2517                        "generated op lifecycle authority emitted already-peer-ready without status"
2518                            .into(),
2519                    ));
2520                }
2521                OpsLifecycleError::AlreadyPeerReady(id.clone())
2522            }
2523        };
2524        rejection = Some(error);
2525    }
2526    rejection.ok_or_else(|| {
2527        OpsLifecycleError::Internal(
2528            "generated op lifecycle authority emitted no rejection result".into(),
2529        )
2530    })
2531}
2532
2533fn classify_generated_op_rejection(
2534    state: &mut ShellState,
2535    err: mm_dsl::MeerkatMachineTransitionError,
2536    id: &OperationId,
2537    action: mm_dsl::OpLifecycleActionKind,
2538) -> OpsLifecycleError {
2539    match err {
2540        mm_dsl::MeerkatMachineTransitionError::GuardRejected { .. } => {
2541            match state.dsl_apply_with_effects(
2542                mm_dsl::MeerkatMachineInput::ResolveOpLifecycleTransitionRejection {
2543                    operation_id: mm_dsl::OperationId::from_domain(id).0,
2544                    action,
2545                },
2546                "ResolveOpLifecycleTransitionRejection",
2547            ) {
2548                Ok(effects) => op_lifecycle_rejection_error_from_effects(id, action, &effects)
2549                    .unwrap_or_else(|err| err),
2550                Err(err) => err,
2551            }
2552        }
2553        other => OpsLifecycleError::Internal(format!(
2554            "DSL rejected ops transition ({}): {other:?}",
2555            op_lifecycle_action_label(action)
2556        )),
2557    }
2558}
2559
2560fn apply_op_transition(
2561    state: &mut ShellState,
2562    id: &OperationId,
2563    input: mm_dsl::MeerkatMachineInput,
2564    action: mm_dsl::OpLifecycleActionKind,
2565) -> Result<(), OpsLifecycleError> {
2566    state
2567        .dsl_apply_raw(input)
2568        .map_err(|err| classify_generated_op_rejection(state, err, id, action))
2569}
2570
2571fn op_registration_error_from_effects(
2572    id: &OperationId,
2573    effects: &[mm_dsl::MeerkatMachineEffect],
2574) -> Result<Option<OpsLifecycleError>, OpsLifecycleError> {
2575    let expected_id = mm_dsl::OperationId::from_domain(id).0;
2576    let mut admission = None;
2577    for effect in effects {
2578        let mm_dsl::MeerkatMachineEffect::OpRegistrationAdmissionResolved {
2579            operation_id,
2580            result,
2581            reject_reason,
2582            max_concurrent_limit,
2583            active_op_count,
2584        } = effect
2585        else {
2586            continue;
2587        };
2588        if admission.is_some() {
2589            return Err(OpsLifecycleError::Internal(
2590                "generated op registration authority emitted multiple admission results".into(),
2591            ));
2592        }
2593        if operation_id != &expected_id {
2594            return Err(OpsLifecycleError::Internal(format!(
2595                "generated op registration authority resolved {operation_id} while shell requested {expected_id}"
2596            )));
2597        }
2598        admission = Some(match result {
2599            mm_dsl::OpRegistrationAdmissionResultKind::Accept => {
2600                if reject_reason.is_some() {
2601                    return Err(OpsLifecycleError::Internal(
2602                        "generated op registration authority accepted with rejection reason".into(),
2603                    ));
2604                }
2605                None
2606            }
2607            mm_dsl::OpRegistrationAdmissionResultKind::Reject => {
2608                let reason = reject_reason.ok_or_else(|| {
2609                    OpsLifecycleError::Internal(
2610                        "generated op registration authority rejected without reason".into(),
2611                    )
2612                })?;
2613                let error = match reason {
2614                    mm_dsl::OpRegistrationRejectReasonKind::AlreadyRegistered => {
2615                        OpsLifecycleError::AlreadyRegistered(id.clone())
2616                    }
2617                    mm_dsl::OpRegistrationRejectReasonKind::MaxConcurrentExceeded => {
2618                        let limit = max_concurrent_limit.ok_or_else(|| {
2619                            OpsLifecycleError::Internal(
2620                                "generated op registration authority rejected capacity without limit"
2621                                    .into(),
2622                            )
2623                        })?;
2624                        OpsLifecycleError::MaxConcurrentExceeded {
2625                            limit: limit as usize,
2626                            active: *active_op_count as usize,
2627                        }
2628                    }
2629                };
2630                Some(error)
2631            }
2632        });
2633    }
2634    admission.ok_or_else(|| {
2635        OpsLifecycleError::Internal(
2636            "generated op registration authority emitted no admission result".into(),
2637        )
2638    })
2639}
2640
2641impl OpsLifecycleRegistry for RuntimeOpsLifecycleRegistry {
2642    fn register_operation(&self, spec: OperationSpec) -> Result<(), OpsLifecycleError> {
2643        self.register_operation_with_admission_limit(spec, None)
2644    }
2645
2646    fn register_operation_with_admission_limit(
2647        &self,
2648        mut spec: OperationSpec,
2649        max_concurrent: Option<usize>,
2650    ) -> Result<(), OpsLifecycleError> {
2651        let mut state = self.write_state()?;
2652        let operation_id = spec.id.clone();
2653        let kind = spec.kind;
2654        let max_concurrent = max_concurrent
2655            .or(state.max_concurrent)
2656            .map(|limit| limit as u64);
2657
2658        let effects = state.dsl_apply_with_effects(
2659            mm_dsl::MeerkatMachineInput::RegisterOp {
2660                operation_id: mm_dsl::OperationId::from_domain(&operation_id).0,
2661                kind: mm_dsl::OperationKind::from_domain(&kind),
2662                source: spec
2663                    .operation_source
2664                    .as_ref()
2665                    .map(mm_dsl::OperationSource::from_domain),
2666                max_concurrent,
2667            },
2668            "RegisterOp",
2669        )?;
2670        if let Some(error) = op_registration_error_from_effects(&operation_id, &effects)? {
2671            return Err(error);
2672        }
2673
2674        let authority_operation_source = state.operation_source(&operation_id)?;
2675        ShellState::align_spec_child_session_id_to_source(
2676            &mut spec,
2677            authority_operation_source.as_ref(),
2678        );
2679
2680        // Insert shell record.
2681        state.records.insert(operation_id, ShellRecord::new(spec));
2682        Ok(())
2683    }
2684
2685    fn provisioning_succeeded(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
2686        let mut state = self.write_state()?;
2687
2688        apply_op_transition(
2689            &mut state,
2690            id,
2691            mm_dsl::MeerkatMachineInput::StartOp {
2692                operation_id: mm_dsl::OperationId::from_domain(id).0,
2693            },
2694            mm_dsl::OpLifecycleActionKind::Start,
2695        )?;
2696
2697        // Shell concern: record the started timestamp.
2698        if let Some(shell) = state.records.get_mut(id) {
2699            shell.started_at = Some(Instant::now());
2700        }
2701        Ok(())
2702    }
2703
2704    fn provisioning_failed(
2705        &self,
2706        id: &OperationId,
2707        error: String,
2708    ) -> Result<(), OpsLifecycleError> {
2709        let mut state = self.write_state()?;
2710
2711        let terminal_outcome = OperationTerminalOutcome::Failed { error };
2712        let outcome_kind = mm_dsl::OperationTerminalOutcomeKind::from(&terminal_outcome);
2713
2714        apply_op_transition(
2715            &mut state,
2716            id,
2717            mm_dsl::MeerkatMachineInput::FailOp {
2718                operation_id: mm_dsl::OperationId::from_domain(id).0,
2719                outcome: outcome_kind,
2720                payload: terminal_outcome,
2721            },
2722            mm_dsl::OpLifecycleActionKind::Fail,
2723        )?;
2724
2725        state.finalize_terminal(id)?;
2726        state.maybe_persist()?;
2727        Ok(())
2728    }
2729
2730    fn peer_ready(
2731        &self,
2732        id: &OperationId,
2733        peer: OperationPeerHandle,
2734    ) -> Result<(), OpsLifecycleError> {
2735        let mut state = self.write_state()?;
2736
2737        apply_op_transition(
2738            &mut state,
2739            id,
2740            mm_dsl::MeerkatMachineInput::PeerReadyOp {
2741                operation_id: mm_dsl::OperationId::from_domain(id).0,
2742            },
2743            mm_dsl::OpLifecycleActionKind::PeerReady,
2744        )?;
2745
2746        // Shell concern: store the peer handle.
2747        if let Some(shell) = state.records.get_mut(id) {
2748            shell.peer_handle = Some(peer);
2749        }
2750        Ok(())
2751    }
2752
2753    fn register_watcher(
2754        &self,
2755        id: &OperationId,
2756    ) -> Result<OperationCompletionWatch, OpsLifecycleError> {
2757        let mut state = self.write_state()?;
2758
2759        if !state.contains(id) {
2760            return Err(OpsLifecycleError::NotFound(id.clone()));
2761        }
2762
2763        // If already terminal, return an already-resolved watch.
2764        if let Some(outcome) = state.terminal_outcome(id)? {
2765            return Ok(resolved_operation_completion_watch(outcome));
2766        }
2767
2768        // Shell concern: create the channel and store the sender.
2769        let shell = state.shell_record_mut(id)?;
2770        let (tx, rx) = tokio::sync::oneshot::channel();
2771        let watch = operation_completion_watch_from_receiver(rx);
2772        shell.watchers.push(OperationCompletionNotifier::new(tx));
2773        Ok(watch)
2774    }
2775
2776    fn report_progress(
2777        &self,
2778        id: &OperationId,
2779        _update: OperationProgressUpdate,
2780    ) -> Result<(), OpsLifecycleError> {
2781        let mut state = self.write_state()?;
2782
2783        apply_op_transition(
2784            &mut state,
2785            id,
2786            mm_dsl::MeerkatMachineInput::ProgressReportedOp {
2787                operation_id: mm_dsl::OperationId::from_domain(id).0,
2788            },
2789            mm_dsl::OpLifecycleActionKind::ProgressReported,
2790        )?;
2791        Ok(())
2792    }
2793
2794    fn complete_operation(
2795        &self,
2796        id: &OperationId,
2797        result: OperationResult,
2798    ) -> Result<(), OpsLifecycleError> {
2799        let mut state = self.write_state()?;
2800
2801        let terminal_outcome = OperationTerminalOutcome::Completed(result);
2802        let outcome_kind = mm_dsl::OperationTerminalOutcomeKind::from(&terminal_outcome);
2803
2804        apply_op_transition(
2805            &mut state,
2806            id,
2807            mm_dsl::MeerkatMachineInput::CompleteOp {
2808                operation_id: mm_dsl::OperationId::from_domain(id).0,
2809                outcome: outcome_kind,
2810                payload: terminal_outcome,
2811            },
2812            mm_dsl::OpLifecycleActionKind::Complete,
2813        )?;
2814
2815        state.finalize_terminal(id)?;
2816        state.maybe_persist()?;
2817        Ok(())
2818    }
2819
2820    fn fail_operation(&self, id: &OperationId, error: String) -> Result<(), OpsLifecycleError> {
2821        let mut state = self.write_state()?;
2822
2823        let terminal_outcome = OperationTerminalOutcome::Failed { error };
2824        let outcome_kind = mm_dsl::OperationTerminalOutcomeKind::from(&terminal_outcome);
2825
2826        apply_op_transition(
2827            &mut state,
2828            id,
2829            mm_dsl::MeerkatMachineInput::FailOp {
2830                operation_id: mm_dsl::OperationId::from_domain(id).0,
2831                outcome: outcome_kind,
2832                payload: terminal_outcome,
2833            },
2834            mm_dsl::OpLifecycleActionKind::Fail,
2835        )?;
2836
2837        state.finalize_terminal(id)?;
2838        state.maybe_persist()?;
2839        Ok(())
2840    }
2841
2842    fn abort_provisioning(
2843        &self,
2844        id: &OperationId,
2845        reason: Option<String>,
2846    ) -> Result<(), OpsLifecycleError> {
2847        let mut state = self.write_state()?;
2848
2849        let terminal_outcome = OperationTerminalOutcome::Aborted { reason };
2850        let outcome_kind = mm_dsl::OperationTerminalOutcomeKind::from(&terminal_outcome);
2851
2852        apply_op_transition(
2853            &mut state,
2854            id,
2855            mm_dsl::MeerkatMachineInput::AbortOp {
2856                operation_id: mm_dsl::OperationId::from_domain(id).0,
2857                outcome: outcome_kind,
2858                payload: terminal_outcome,
2859            },
2860            mm_dsl::OpLifecycleActionKind::Abort,
2861        )?;
2862
2863        state.finalize_terminal(id)?;
2864        state.maybe_persist()?;
2865        Ok(())
2866    }
2867
2868    fn cancel_operation(
2869        &self,
2870        id: &OperationId,
2871        reason: Option<String>,
2872    ) -> Result<(), OpsLifecycleError> {
2873        let mut state = self.write_state()?;
2874
2875        let terminal_outcome = OperationTerminalOutcome::Cancelled { reason };
2876        let outcome_kind = mm_dsl::OperationTerminalOutcomeKind::from(&terminal_outcome);
2877
2878        apply_op_transition(
2879            &mut state,
2880            id,
2881            mm_dsl::MeerkatMachineInput::CancelOp {
2882                operation_id: mm_dsl::OperationId::from_domain(id).0,
2883                outcome: outcome_kind,
2884                payload: terminal_outcome,
2885            },
2886            mm_dsl::OpLifecycleActionKind::Cancel,
2887        )?;
2888
2889        state.finalize_terminal(id)?;
2890        state.maybe_persist()?;
2891        Ok(())
2892    }
2893
2894    fn request_retire(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
2895        let mut state = self.write_state()?;
2896
2897        apply_op_transition(
2898            &mut state,
2899            id,
2900            mm_dsl::MeerkatMachineInput::RetireRequestedOp {
2901                operation_id: mm_dsl::OperationId::from_domain(id).0,
2902            },
2903            mm_dsl::OpLifecycleActionKind::RetireRequested,
2904        )?;
2905        Ok(())
2906    }
2907
2908    fn mark_retired(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
2909        let mut state = self.write_state()?;
2910
2911        let terminal_outcome = OperationTerminalOutcome::Retired;
2912        let outcome_kind = mm_dsl::OperationTerminalOutcomeKind::from(&terminal_outcome);
2913
2914        apply_op_transition(
2915            &mut state,
2916            id,
2917            mm_dsl::MeerkatMachineInput::RetireCompletedOp {
2918                operation_id: mm_dsl::OperationId::from_domain(id).0,
2919                outcome: outcome_kind,
2920                payload: terminal_outcome,
2921            },
2922            mm_dsl::OpLifecycleActionKind::RetireCompleted,
2923        )?;
2924
2925        state.finalize_terminal(id)?;
2926        state.maybe_persist()?;
2927        Ok(())
2928    }
2929
2930    fn snapshot(
2931        &self,
2932        id: &OperationId,
2933    ) -> Result<Option<OperationLifecycleSnapshot>, OpsLifecycleError> {
2934        let state = self.read_state()?;
2935        state.snapshot(id)
2936    }
2937
2938    fn list_operations(&self) -> Result<Vec<OperationLifecycleSnapshot>, OpsLifecycleError> {
2939        let state = self.read_state()?;
2940        let mut snapshots = Vec::new();
2941        for id in state.operation_ids()? {
2942            let snapshot = state.snapshot(&id)?.ok_or_else(|| {
2943                OpsLifecycleError::Internal(format!(
2944                    "operation {id} was present in generated lifecycle authority but produced no public snapshot"
2945                ))
2946            })?;
2947            snapshots.push(snapshot);
2948        }
2949        snapshots.sort_by(|left, right| left.display_name.cmp(&right.display_name));
2950        Ok(snapshots)
2951    }
2952
2953    fn classify_operation_terminality(
2954        &self,
2955        id: &OperationId,
2956        status: OperationStatus,
2957    ) -> Result<bool, OpsLifecycleError> {
2958        ShellState::operation_status_is_terminal(id, status)
2959    }
2960
2961    fn classify_operation_public_result(
2962        &self,
2963        id: &OperationId,
2964    ) -> Result<OperationPublicResultClass, OpsLifecycleError> {
2965        let state = self.read_state()?;
2966        let status = match state.status(id) {
2967            Some(status) => status,
2968            None if state.records.contains_key(id)
2969                || state.has_generated_operation_record_fact(id) =>
2970            {
2971                return Err(OpsLifecycleError::Internal(format!(
2972                    "generated op lifecycle authority missing status for {id}"
2973                )));
2974            }
2975            None => OperationStatus::Absent,
2976        };
2977        ShellState::operation_public_result_class(id, status)
2978    }
2979
2980    fn classify_operation_completion_wake(
2981        &self,
2982        id: &OperationId,
2983        kind: OperationKind,
2984    ) -> Result<OperationCompletionWakeClass, OpsLifecycleError> {
2985        ShellState::operation_completion_wake_class(id, kind)
2986    }
2987
2988    fn classify_operation_transition_idempotence(
2989        &self,
2990        id: &OperationId,
2991        action: OperationLifecycleAction,
2992    ) -> Result<bool, OpsLifecycleError> {
2993        let state = self.read_state()?;
2994        let status = match state.status(id) {
2995            Some(status) => status,
2996            None if state.records.contains_key(id)
2997                || state.has_generated_operation_record_fact(id) =>
2998            {
2999                return Err(OpsLifecycleError::Internal(format!(
3000                    "generated op lifecycle authority missing status for {id}"
3001                )));
3002            }
3003            None => OperationStatus::Absent,
3004        };
3005        ShellState::operation_transition_rejection_is_idempotent(id, action, status)
3006    }
3007
3008    fn terminate_owner(&self, reason: String) -> Result<(), OpsLifecycleError> {
3009        let mut state = self.write_state()?;
3010
3011        let to_terminate = state.owner_termination_targets()?;
3012
3013        for (op_id, _status) in &to_terminate {
3014            let terminal_outcome = OperationTerminalOutcome::Terminated {
3015                reason: reason.clone(),
3016            };
3017            let outcome_kind = mm_dsl::OperationTerminalOutcomeKind::from(&terminal_outcome);
3018
3019            apply_op_transition(
3020                &mut state,
3021                op_id,
3022                mm_dsl::MeerkatMachineInput::TerminateOp {
3023                    operation_id: mm_dsl::OperationId::from_domain(op_id).0,
3024                    outcome: outcome_kind,
3025                    payload: terminal_outcome,
3026                },
3027                mm_dsl::OpLifecycleActionKind::Terminate,
3028            )?;
3029
3030            state.finalize_terminal(op_id)?;
3031        }
3032
3033        if !to_terminate.is_empty() {
3034            state.maybe_persist()?;
3035        }
3036        Ok(())
3037    }
3038
3039    fn collect_completed(
3040        &self,
3041    ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
3042        let mut state = self.write_state()?;
3043
3044        let ids: Vec<OperationId> = state.completed_order.iter().cloned().collect();
3045        let mut collected = Vec::with_capacity(ids.len());
3046        for id in ids {
3047            let outcome = state.terminal_outcome(&id)?;
3048            state.dsl_apply(
3049                mm_dsl::MeerkatMachineInput::CollectCompletedOp {
3050                    operation_id: mm_dsl::OperationId::from_domain(&id).0,
3051                },
3052                "CollectCompletedOp",
3053            )?;
3054            state.completed_order.retain(|queued| queued != &id);
3055            state.records.remove(&id);
3056            if let Some(outcome) = outcome {
3057                collected.push((id, outcome));
3058            }
3059        }
3060        Ok(collected)
3061    }
3062
3063    fn completion_feed(&self) -> Option<Arc<dyn CompletionFeed>> {
3064        Some(self.completion_feed_handle())
3065    }
3066
3067    fn completion_cursor(
3068        &self,
3069        consumer: CompletionCursorConsumer,
3070    ) -> Result<Option<CompletionSeq>, OpsLifecycleError> {
3071        let state = self.read_state()?;
3072        Ok(Some(state.completion_cursor(consumer)))
3073    }
3074
3075    fn advance_completion_cursor(
3076        &self,
3077        consumer: CompletionCursorConsumer,
3078        cursor: CompletionSeq,
3079        projection: Option<&meerkat_core::EpochCursorState>,
3080    ) -> Result<CompletionSeq, OpsLifecycleError> {
3081        let mut state = self.write_state()?;
3082        let input = match consumer {
3083            CompletionCursorConsumer::AgentApplied => {
3084                mm_dsl::MeerkatMachineInput::AdvanceAgentCompletionCursor { cursor }
3085            }
3086            CompletionCursorConsumer::RuntimeObserved => {
3087                mm_dsl::MeerkatMachineInput::AdvanceRuntimeObservedCompletionCursor { cursor }
3088            }
3089            CompletionCursorConsumer::RuntimeInjected => {
3090                mm_dsl::MeerkatMachineInput::AdvanceRuntimeInjectedCompletionCursor { cursor }
3091            }
3092        };
3093        let effects = state.dsl_apply_with_effects(input, "AdvanceCompletionCursor")?;
3094        let advanced = effects
3095            .iter()
3096            .find_map(|effect| match (consumer, effect) {
3097                (
3098                    CompletionCursorConsumer::AgentApplied,
3099                    mm_dsl::MeerkatMachineEffect::AgentCompletionCursorAdvanced { cursor },
3100                ) => Some(*cursor),
3101                (
3102                    CompletionCursorConsumer::RuntimeObserved,
3103                    mm_dsl::MeerkatMachineEffect::RuntimeObservedCompletionCursorAdvanced {
3104                        cursor,
3105                    },
3106                ) => Some(*cursor),
3107                (
3108                    CompletionCursorConsumer::RuntimeInjected,
3109                    mm_dsl::MeerkatMachineEffect::RuntimeInjectedCompletionCursorAdvanced {
3110                        cursor,
3111                    },
3112                ) => Some(*cursor),
3113                _ => None,
3114            })
3115            .ok_or_else(|| {
3116                OpsLifecycleError::Internal(format!(
3117                    "generated completion cursor transition emitted no feedback for {consumer:?}"
3118                ))
3119            })?;
3120        if let Some(projection) = projection {
3121            projection.project_authorized_completion_cursor(consumer, advanced);
3122        }
3123        Ok(advanced)
3124    }
3125
3126    fn wait_all(
3127        &self,
3128        run_id: &RunId,
3129        ids: &[OperationId],
3130    ) -> std::pin::Pin<
3131        Box<dyn std::future::Future<Output = Result<WaitAllResult, OpsLifecycleError>> + Send + '_>,
3132    > {
3133        let wait_request_id = WaitRequestId::new();
3134        let owned_ids = ids.to_vec();
3135
3136        let state = match self.write_state() {
3137            Ok(mut state) => {
3138                match state.begin_wait_all_authority(run_id, &wait_request_id, &owned_ids) {
3139                    Ok(WaitAllAuthorityPlan::AlreadySatisfied(satisfied)) => {
3140                        let outcomes =
3141                            state
3142                                .collect_wait_outcomes(&satisfied.operation_ids)
3143                                .map(|outcomes| WaitAllResult {
3144                                    outcomes,
3145                                    satisfied,
3146                                });
3147                        WaitAllFutureState::Ready(Some(outcomes))
3148                    }
3149                    Ok(WaitAllAuthorityPlan::ActivateBarrier) => {
3150                        if state.pending_wait.is_some() {
3151                            // Roll back the DSL barrier we just activated so the
3152                            // registry is not stuck in a wait-active state with
3153                            // no correlation oneshot to resolve. `CancelWaitAll`
3154                            // is the no-obligation clearer (members need not be
3155                            // terminal).
3156                            let rollback = state.dsl_apply(
3157                                mm_dsl::MeerkatMachineInput::CancelWaitAll,
3158                                "CancelWaitAll(rollback)",
3159                            );
3160                            return Box::pin(WaitAllFuture {
3161                                registry: self,
3162                                wait_request_id,
3163                                state: WaitAllFutureState::Ready(Some(Err(match rollback {
3164                                    Ok(()) => OpsLifecycleError::Internal(
3165                                        "wait_all started while a pending wait sender already existed"
3166                                            .into(),
3167                                    ),
3168                                    Err(err) => err,
3169                                }))),
3170                            });
3171                        }
3172                        state.wait_request_id = Some(wait_request_id.clone());
3173                        let (sender, receiver) = tokio::sync::oneshot::channel();
3174                        state.pending_wait = Some(PendingWaitState {
3175                            wait_request_id: wait_request_id.clone(),
3176                            sender,
3177                        });
3178                        WaitAllFutureState::Waiting(receiver)
3179                    }
3180                    Err(err) => WaitAllFutureState::Ready(Some(Err(err))),
3181                }
3182            }
3183            Err(err) => WaitAllFutureState::Ready(Some(Err(err))),
3184        };
3185
3186        Box::pin(WaitAllFuture {
3187            registry: self,
3188            wait_request_id,
3189            state,
3190        })
3191    }
3192}
3193
3194#[cfg(test)]
3195#[allow(clippy::unwrap_used, clippy::panic)]
3196mod tests {
3197    use super::*;
3198    use meerkat_core::comms::{PeerId, TrustedPeerDescriptor};
3199    use meerkat_core::lifecycle::RunId;
3200    use meerkat_core::ops_lifecycle::{OperationKind, OpsLifecycleRegistry};
3201    use meerkat_core::types::SessionId;
3202    use std::sync::atomic::Ordering;
3203    use uuid::Uuid;
3204
3205    fn test_run_id() -> RunId {
3206        RunId(Uuid::from_u128(1))
3207    }
3208
3209    fn background_spec(name: &str) -> OperationSpec {
3210        OperationSpec {
3211            id: OperationId::new(),
3212            kind: OperationKind::BackgroundToolOp,
3213            owner_session_id: SessionId::new(),
3214            display_name: name.into(),
3215            source_label: "test".into(),
3216            operation_source: None,
3217            child_session_id: None,
3218            expect_peer_channel: false,
3219        }
3220    }
3221
3222    #[tokio::test]
3223    async fn late_watchers_resolve_immediately() {
3224        let registry = RuntimeOpsLifecycleRegistry::new();
3225        let spec = background_spec("late");
3226        let op_id = spec.id.clone();
3227        registry.register_operation(spec).unwrap();
3228        registry.provisioning_succeeded(&op_id).unwrap();
3229        registry
3230            .complete_operation(
3231                &op_id,
3232                OperationResult {
3233                    id: op_id.clone(),
3234                    content: "done".into(),
3235                    is_error: false,
3236                    duration_ms: 1,
3237                    tokens_used: 0,
3238                },
3239            )
3240            .unwrap();
3241
3242        let watch = registry.register_watcher(&op_id).unwrap();
3243        match watch
3244            .await
3245            .expect("operation completion watch should resolve")
3246        {
3247            OperationTerminalOutcome::Completed(result) => assert_eq!(result.content, "done"),
3248            other => panic!("expected completed outcome, got {other:?}"),
3249        }
3250    }
3251
3252    #[tokio::test]
3253    async fn dropped_watch_sender_is_waiter_error_not_terminal_outcome() {
3254        let (tx, rx) = tokio::sync::oneshot::channel();
3255        let watch = operation_completion_watch_from_receiver(rx);
3256        drop(tx);
3257
3258        assert_eq!(
3259            watch.await,
3260            Err(meerkat_core::ops_lifecycle::OperationCompletionWatchError::ChannelClosed)
3261        );
3262    }
3263
3264    #[test]
3265    fn peer_ready_requires_peer_expectation() {
3266        let registry = RuntimeOpsLifecycleRegistry::new();
3267        let spec = background_spec("no-peer");
3268        let op_id = spec.id.clone();
3269        registry.register_operation(spec).unwrap();
3270        registry.provisioning_succeeded(&op_id).unwrap();
3271
3272        let result = registry.peer_ready(
3273            &op_id,
3274            OperationPeerHandle {
3275                peer_name: meerkat_core::comms::PeerName::new("peer").unwrap(),
3276                trusted_peer: TrustedPeerDescriptor::test_only_unsigned_typed(
3277                    "peer",
3278                    PeerId::new(),
3279                    "inproc://peer",
3280                )
3281                .unwrap(),
3282            },
3283        );
3284        assert!(matches!(result, Err(OpsLifecycleError::PeerNotExpected(_))));
3285    }
3286
3287    /// K8b pinning test: the terminal payload carried through the generated
3288    /// machine IS the typed domain outcome — no JSON codec exists in either
3289    /// direction. Every variant classifies to its matching discriminant, the
3290    /// fail-closed read returns the exact payload when the discriminant
3291    /// matches, and rejects any discriminant/variant disagreement.
3292    #[test]
3293    fn typed_terminal_payload_classifies_and_reads_back_each_variant() {
3294        let op_id = OperationId::new();
3295        let outcomes = vec![
3296            (
3297                OperationTerminalOutcome::Completed(OperationResult {
3298                    id: op_id.clone(),
3299                    content: "done".into(),
3300                    is_error: false,
3301                    duration_ms: 7,
3302                    tokens_used: 42,
3303                }),
3304                mm_dsl::OperationTerminalOutcomeKind::Completed,
3305            ),
3306            (
3307                OperationTerminalOutcome::Failed {
3308                    error: "boom".into(),
3309                },
3310                mm_dsl::OperationTerminalOutcomeKind::Failed,
3311            ),
3312            (
3313                OperationTerminalOutcome::Aborted {
3314                    reason: Some("user aborted".into()),
3315                },
3316                mm_dsl::OperationTerminalOutcomeKind::Aborted,
3317            ),
3318            (
3319                OperationTerminalOutcome::Aborted { reason: None },
3320                mm_dsl::OperationTerminalOutcomeKind::Aborted,
3321            ),
3322            (
3323                OperationTerminalOutcome::Cancelled {
3324                    reason: Some("cancelled".into()),
3325                },
3326                mm_dsl::OperationTerminalOutcomeKind::Cancelled,
3327            ),
3328            (
3329                OperationTerminalOutcome::Cancelled { reason: None },
3330                mm_dsl::OperationTerminalOutcomeKind::Cancelled,
3331            ),
3332            (
3333                OperationTerminalOutcome::Retired,
3334                mm_dsl::OperationTerminalOutcomeKind::Retired,
3335            ),
3336            (
3337                OperationTerminalOutcome::Terminated {
3338                    reason: "owner stopped".into(),
3339                },
3340                mm_dsl::OperationTerminalOutcomeKind::Terminated,
3341            ),
3342        ];
3343
3344        for (outcome, expected_kind) in &outcomes {
3345            assert_eq!(
3346                mm_dsl::OperationTerminalOutcomeKind::from(outcome),
3347                *expected_kind,
3348                "typed payload {outcome:?} must classify to {expected_kind:?}"
3349            );
3350            let read = ShellState::checked_terminal_payload(
3351                *expected_kind,
3352                outcome,
3353                "test authority",
3354                "test-op",
3355            )
3356            .expect("matching discriminant must read back the exact payload");
3357            assert_eq!(&read, outcome);
3358        }
3359
3360        // Discriminant/variant disagreement fails closed.
3361        let err = ShellState::checked_terminal_payload(
3362            mm_dsl::OperationTerminalOutcomeKind::Completed,
3363            &OperationTerminalOutcome::Retired,
3364            "test authority",
3365            "test-op",
3366        )
3367        .expect_err("variant mismatch must be rejected");
3368        assert!(matches!(err, OpsLifecycleError::Internal(_)));
3369    }
3370
3371    /// K8b machine-ownership pin: a terminal payload whose variant disagrees
3372    /// with the transition's terminal kind is rejected by the generated
3373    /// machine guard (`payload_variant_matches_kind`) — not by any shell
3374    /// decode step. Drives the DSL input directly to prove the guard owns
3375    /// the invariant.
3376    #[test]
3377    fn generated_guard_rejects_terminal_payload_variant_mismatch() {
3378        let registry = RuntimeOpsLifecycleRegistry::new();
3379        let spec = background_spec("variant-mismatch");
3380        let op_id = spec.id.clone();
3381        registry.register_operation(spec).unwrap();
3382        registry.provisioning_succeeded(&op_id).unwrap();
3383
3384        let mut state = registry.write_state().unwrap();
3385        let err = state
3386            .dsl_apply(
3387                mm_dsl::MeerkatMachineInput::CompleteOp {
3388                    operation_id: mm_dsl::OperationId::from_domain(&op_id).0,
3389                    outcome: mm_dsl::OperationTerminalOutcomeKind::Completed,
3390                    // Variant mismatch: Completed kind with a Retired payload.
3391                    payload: OperationTerminalOutcome::Retired,
3392                },
3393                "CompleteOp",
3394            )
3395            .expect_err("machine must reject payload variant mismatch");
3396        // The kernel reports guard rejections without naming the guard; the
3397        // discriminating pin is the pair: identical input EXCEPT the payload
3398        // variant is guard-rejected here, then accepted below. Status
3399        // (`Running`) and discriminant (`Completed`) are identical in both
3400        // calls, so `payload_variant_matches_kind` is the only differing
3401        // guard.
3402        assert!(
3403            matches!(
3404                &err,
3405                OpsLifecycleError::Internal(message)
3406                    if message.contains("GuardRejected") && message.contains("CompleteOp")
3407            ),
3408            "expected generated guard rejection for CompleteOp, got: {err:?}"
3409        );
3410        drop(state);
3411
3412        // RetireCompletedOp requires the unit Retired payload; a data-carrying
3413        // payload is rejected by the same guard shape.
3414        registry
3415            .complete_operation(
3416                &op_id,
3417                OperationResult {
3418                    id: op_id.clone(),
3419                    content: "done".into(),
3420                    is_error: false,
3421                    duration_ms: 1,
3422                    tokens_used: 0,
3423                },
3424            )
3425            .expect("matching variant must complete");
3426    }
3427
3428    #[test]
3429    fn duplicate_registration_rejection_is_generated() {
3430        let registry = RuntimeOpsLifecycleRegistry::new();
3431        let spec = background_spec("duplicate");
3432        let op_id = spec.id.clone();
3433
3434        registry.register_operation(spec.clone()).unwrap();
3435        let result = registry.register_operation(spec);
3436
3437        assert!(matches!(
3438            result,
3439            Err(OpsLifecycleError::AlreadyRegistered(id)) if id == op_id
3440        ));
3441    }
3442
3443    #[test]
3444    fn invalid_transition_rejection_is_generated() {
3445        let registry = RuntimeOpsLifecycleRegistry::new();
3446        let spec = background_spec("invalid-transition");
3447        let op_id = spec.id.clone();
3448        registry.register_operation(spec).unwrap();
3449
3450        let result = registry.complete_operation(
3451            &op_id,
3452            OperationResult {
3453                id: op_id.clone(),
3454                content: "too-early".into(),
3455                is_error: false,
3456                duration_ms: 1,
3457                tokens_used: 0,
3458            },
3459        );
3460
3461        assert!(matches!(
3462            result,
3463            Err(OpsLifecycleError::InvalidTransition {
3464                id,
3465                status: OperationStatus::Provisioning,
3466                action: "complete_operation",
3467            }) if id == op_id
3468        ));
3469    }
3470
3471    #[tokio::test]
3472    async fn multi_listener_completion() {
3473        let registry = RuntimeOpsLifecycleRegistry::new();
3474        let spec = background_spec("multi");
3475        let op_id = spec.id.clone();
3476        registry.register_operation(spec).unwrap();
3477        registry.provisioning_succeeded(&op_id).unwrap();
3478
3479        let watch1 = registry.register_watcher(&op_id).unwrap();
3480        let watch2 = registry.register_watcher(&op_id).unwrap();
3481        let watch3 = registry.register_watcher(&op_id).unwrap();
3482
3483        registry
3484            .complete_operation(
3485                &op_id,
3486                OperationResult {
3487                    id: op_id.clone(),
3488                    content: "multi-done".into(),
3489                    is_error: false,
3490                    duration_ms: 1,
3491                    tokens_used: 0,
3492                },
3493            )
3494            .unwrap();
3495
3496        for watch in [watch1, watch2, watch3] {
3497            match watch
3498                .await
3499                .expect("operation completion watch should resolve")
3500            {
3501                OperationTerminalOutcome::Completed(result) => {
3502                    assert_eq!(result.content, "multi-done");
3503                }
3504                other => panic!("expected completed, got {other:?}"),
3505            }
3506        }
3507    }
3508
3509    #[tokio::test]
3510    async fn wait_all_returns_all_outcomes() {
3511        let registry = RuntimeOpsLifecycleRegistry::new();
3512
3513        let spec_a = background_spec("a");
3514        let id_a = spec_a.id.clone();
3515        registry.register_operation(spec_a).unwrap();
3516        registry.provisioning_succeeded(&id_a).unwrap();
3517
3518        let spec_b = background_spec("b");
3519        let id_b = spec_b.id.clone();
3520        registry.register_operation(spec_b).unwrap();
3521        registry.provisioning_succeeded(&id_b).unwrap();
3522
3523        registry
3524            .complete_operation(
3525                &id_a,
3526                OperationResult {
3527                    id: id_a.clone(),
3528                    content: "a-done".into(),
3529                    is_error: false,
3530                    duration_ms: 1,
3531                    tokens_used: 0,
3532                },
3533            )
3534            .unwrap();
3535        registry.fail_operation(&id_b, "b-error".into()).unwrap();
3536
3537        let wait_result = registry
3538            .wait_all(&test_run_id(), &[id_a.clone(), id_b.clone()])
3539            .await
3540            .unwrap();
3541        assert_eq!(wait_result.outcomes.len(), 2);
3542        assert_eq!(wait_result.outcomes[0].0, id_a);
3543        assert!(matches!(
3544            wait_result.outcomes[0].1,
3545            OperationTerminalOutcome::Completed(_)
3546        ));
3547        assert_eq!(wait_result.outcomes[1].0, id_b);
3548        assert!(matches!(
3549            wait_result.outcomes[1].1,
3550            OperationTerminalOutcome::Failed { .. }
3551        ));
3552        // Obligation carries the awaited IDs
3553        assert_eq!(wait_result.satisfied.operation_ids.len(), 2);
3554        assert_ne!(wait_result.satisfied.wait_request_id.to_string(), "");
3555    }
3556
3557    /// Exercises the trait `wait_all` path (via `dyn OpsLifecycleRegistry`)
3558    /// which must submit WaitAll through the DSL for cross-machine handoff.
3559    #[tokio::test]
3560    async fn wait_all_trait_path_submits_through_authority() {
3561        let registry = RuntimeOpsLifecycleRegistry::new();
3562        let spec = background_spec("trait-wait");
3563        let op_id = spec.id.clone();
3564        registry.register_operation(spec).unwrap();
3565        registry.provisioning_succeeded(&op_id).unwrap();
3566        registry
3567            .complete_operation(
3568                &op_id,
3569                OperationResult {
3570                    id: op_id.clone(),
3571                    content: "done".into(),
3572                    is_error: false,
3573                    duration_ms: 1,
3574                    tokens_used: 0,
3575                },
3576            )
3577            .unwrap();
3578
3579        // Call through trait object to exercise the trait impl, not the inherent method.
3580        let trait_ref: &dyn OpsLifecycleRegistry = &registry;
3581        let wait_result = trait_ref
3582            .wait_all(&test_run_id(), std::slice::from_ref(&op_id))
3583            .await
3584            .unwrap();
3585        assert_eq!(wait_result.outcomes.len(), 1);
3586        assert!(matches!(
3587            wait_result.outcomes[0].1,
3588            OperationTerminalOutcome::Completed(_)
3589        ));
3590        // Obligation carries the validated ID
3591        assert_eq!(wait_result.satisfied.operation_ids, vec![op_id]);
3592        assert_ne!(wait_result.satisfied.wait_request_id.to_string(), "");
3593        let state = registry.read_state().unwrap();
3594        assert!(
3595            !state.wait_active(),
3596            "already-satisfied wait_all must be cleared by generated satisfaction authority"
3597        );
3598        assert!(state.wait_operation_ids().unwrap().is_empty());
3599    }
3600
3601    #[tokio::test]
3602    async fn wait_all_duplicate_rejection_is_generated() {
3603        let registry = RuntimeOpsLifecycleRegistry::new();
3604        let spec = background_spec("duplicate-wait");
3605        let op_id = spec.id.clone();
3606        registry.register_operation(spec).unwrap();
3607        registry.provisioning_succeeded(&op_id).unwrap();
3608
3609        let result = registry
3610            .wait_all(&test_run_id(), &[op_id.clone(), op_id.clone()])
3611            .await;
3612
3613        assert!(matches!(
3614            result,
3615            Err(OpsLifecycleError::DuplicateWaitOperation(id)) if id == op_id
3616        ));
3617        let state = registry.read_state().unwrap();
3618        assert!(
3619            !state.wait_active(),
3620            "duplicate wait rejection must not create a shell or machine barrier"
3621        );
3622        assert!(state.wait_operation_ids().unwrap().is_empty());
3623    }
3624
3625    #[tokio::test]
3626    async fn wait_all_active_rejection_is_generated() {
3627        let registry = RuntimeOpsLifecycleRegistry::new();
3628        let spec = background_spec("active-wait");
3629        let op_id = spec.id.clone();
3630        registry.register_operation(spec).unwrap();
3631        registry.provisioning_succeeded(&op_id).unwrap();
3632
3633        let active_wait = registry.wait_all(&test_run_id(), std::slice::from_ref(&op_id));
3634        let result = registry
3635            .wait_all(&test_run_id(), std::slice::from_ref(&op_id))
3636            .await;
3637
3638        assert!(matches!(result, Err(OpsLifecycleError::WaitAlreadyActive)));
3639        drop(active_wait);
3640        let state = registry.read_state().unwrap();
3641        assert!(!state.wait_active());
3642        assert!(state.wait_operation_ids().unwrap().is_empty());
3643    }
3644
3645    #[tokio::test]
3646    async fn wait_all_unknown_operation_rejection_is_generated() {
3647        let registry = RuntimeOpsLifecycleRegistry::new();
3648        let op_id = OperationId::new();
3649
3650        let result = registry
3651            .wait_all(&test_run_id(), std::slice::from_ref(&op_id))
3652            .await;
3653
3654        assert!(matches!(result, Err(OpsLifecycleError::NotFound(id)) if id == op_id));
3655        let state = registry.read_state().unwrap();
3656        assert!(!state.wait_active());
3657        assert!(state.wait_operation_ids().unwrap().is_empty());
3658    }
3659
3660    #[tokio::test]
3661    async fn wait_all_resolves_from_authority_owned_wait_request() {
3662        let registry = RuntimeOpsLifecycleRegistry::new();
3663        let run_id = test_run_id();
3664
3665        let spec = background_spec("pending");
3666        let op_id = spec.id.clone();
3667        registry.register_operation(spec).unwrap();
3668        registry.provisioning_succeeded(&op_id).unwrap();
3669
3670        let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
3671        tokio::pin!(wait_fut);
3672        assert!(
3673            tokio::time::timeout(std::time::Duration::from_millis(10), &mut wait_fut)
3674                .await
3675                .is_err()
3676        );
3677
3678        let active_wait_request_id = {
3679            let state = registry.read_state().unwrap();
3680            let wait_request_id = match state.wait_request_id.clone() {
3681                Some(wait_request_id) => wait_request_id,
3682                None => panic!("wait request should be active"),
3683            };
3684            assert_eq!(
3685                state.wait_operation_ids().unwrap().as_slice(),
3686                std::slice::from_ref(&op_id)
3687            );
3688            wait_request_id
3689        };
3690
3691        registry
3692            .complete_operation(
3693                &op_id,
3694                OperationResult {
3695                    id: op_id.clone(),
3696                    content: "done".into(),
3697                    is_error: false,
3698                    duration_ms: 1,
3699                    tokens_used: 0,
3700                },
3701            )
3702            .unwrap();
3703
3704        let wait_result = wait_fut.await.unwrap();
3705        assert_eq!(
3706            wait_result.satisfied.wait_request_id,
3707            active_wait_request_id
3708        );
3709        assert_eq!(wait_result.satisfied.operation_ids, vec![op_id.clone()]);
3710        assert!(matches!(
3711            wait_result.outcomes.as_slice(),
3712            [(returned_id, OperationTerminalOutcome::Completed(_))] if *returned_id == op_id
3713        ));
3714        assert!(registry.read_state().unwrap().wait_request_id.is_none());
3715    }
3716
3717    #[tokio::test]
3718    async fn dropping_wait_all_future_cancels_active_wait_request() {
3719        let registry = RuntimeOpsLifecycleRegistry::new();
3720        let run_id = test_run_id();
3721
3722        let spec = background_spec("cancelled-wait");
3723        let op_id = spec.id.clone();
3724        registry.register_operation(spec).unwrap();
3725        registry.provisioning_succeeded(&op_id).unwrap();
3726
3727        let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
3728        drop(wait_fut);
3729
3730        let state = registry.read_state().unwrap();
3731        assert!(state.wait_request_id.is_none());
3732        assert!(state.wait_operation_ids().unwrap().is_empty());
3733        assert!(!state.wait_active());
3734    }
3735
3736    /// id 101: an authority-invariant corruption surfaced during a terminal
3737    /// transition must NOT report the op terminal with a silently-hung barrier.
3738    /// The terminal call must return the typed `Internal` fault, AND the awaited
3739    /// `wait_all` future must resolve to `Err` (via the dropped-sender arm)
3740    /// instead of hanging forever.
3741    #[tokio::test]
3742    async fn satisfy_wait_authority_fault_fails_terminal_and_unblocks_waiter() {
3743        let registry = RuntimeOpsLifecycleRegistry::new();
3744        let run_id = test_run_id();
3745
3746        let spec = background_spec("corrupt-barrier");
3747        let op_id = spec.id.clone();
3748        registry.register_operation(spec).unwrap();
3749        registry.provisioning_succeeded(&op_id).unwrap();
3750
3751        // Activate a barrier whose waiter is still pending.
3752        let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
3753        tokio::pin!(wait_fut);
3754        assert!(
3755            tokio::time::timeout(std::time::Duration::from_millis(10), &mut wait_fut)
3756                .await
3757                .is_err(),
3758            "barrier waiter must still be pending before corruption"
3759        );
3760        assert!(registry.read_state().unwrap().wait_request_id.is_some());
3761
3762        // Corrupt the generated wait authority: an active wait_request_id with
3763        // no wait_run_id forces `try_satisfy_wait_all_authority` down its
3764        // non-GuardRejected `Internal` arm during the terminal transition.
3765        {
3766            let mut state = registry.write_state().unwrap();
3767            let mut machine_state = state.dsl.0.state().clone();
3768            machine_state.wait_run_id = None;
3769            state.dsl = DslAuthority(Box::new(
3770                mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
3771            ));
3772        }
3773
3774        // The terminal transition must FAIL with the typed authority fault, not
3775        // report the op complete.
3776        let err = registry
3777            .complete_operation(
3778                &op_id,
3779                OperationResult {
3780                    id: op_id.clone(),
3781                    content: "done".into(),
3782                    is_error: false,
3783                    duration_ms: 1,
3784                    tokens_used: 0,
3785                },
3786            )
3787            .expect_err("corrupt wait authority must fail the terminal transition");
3788        assert!(
3789            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("active wait without run id")),
3790            "unexpected terminal error: {err:?}"
3791        );
3792
3793        // The waiter must resolve to Err (dropped sender), not hang.
3794        let waiter_result = tokio::time::timeout(std::time::Duration::from_secs(1), &mut wait_fut)
3795            .await
3796            .expect("waiter must resolve, not hang, after authority corruption");
3797        match waiter_result {
3798            Err(OpsLifecycleError::Internal(message)) => assert!(
3799                message.contains("wait_all completion channel dropped"),
3800                "unexpected waiter error message: {message}"
3801            ),
3802            other => panic!("expected dropped-channel Internal error, got {other:?}"),
3803        }
3804    }
3805
3806    /// id 97: a poisoned registry lock must surface as a typed `Internal` fault
3807    /// from `completion_cursor`, NOT laundered into `Ok(None)` (which means "no
3808    /// generated cursor authority") or a bare `None`.
3809    #[test]
3810    fn completion_cursor_propagates_poison_not_none() {
3811        let registry = std::sync::Arc::new(RuntimeOpsLifecycleRegistry::new());
3812
3813        // Poison the registry RwLock by panicking while holding the write guard.
3814        let poison_registry = std::sync::Arc::clone(&registry);
3815        let join = std::thread::spawn(move || {
3816            let _guard = poison_registry.write_state().unwrap();
3817            panic!("intentional panic to poison ops lifecycle registry lock");
3818        });
3819        assert!(
3820            join.join().is_err(),
3821            "poisoning thread must have panicked while holding the write guard"
3822        );
3823
3824        let trait_ref: &dyn OpsLifecycleRegistry = registry.as_ref();
3825        let result = trait_ref.completion_cursor(CompletionCursorConsumer::AgentApplied);
3826        match result {
3827            Err(OpsLifecycleError::Internal(message)) => assert!(
3828                message.contains("ops lifecycle registry poisoned"),
3829                "unexpected cursor error message: {message}"
3830            ),
3831            other => panic!("poisoned registry must surface typed Internal fault, got {other:?}"),
3832        }
3833    }
3834
3835    #[test]
3836    fn terminate_owner_only_targets_non_terminal_operations() {
3837        let registry = RuntimeOpsLifecycleRegistry::new();
3838
3839        let running_spec = background_spec("running");
3840        let running_id = running_spec.id.clone();
3841        registry.register_operation(running_spec).unwrap();
3842        registry.provisioning_succeeded(&running_id).unwrap();
3843
3844        let completed_spec = background_spec("completed");
3845        let completed_id = completed_spec.id.clone();
3846        registry.register_operation(completed_spec).unwrap();
3847        registry.provisioning_succeeded(&completed_id).unwrap();
3848        registry
3849            .complete_operation(
3850                &completed_id,
3851                OperationResult {
3852                    id: completed_id.clone(),
3853                    content: "done".into(),
3854                    is_error: false,
3855                    duration_ms: 1,
3856                    tokens_used: 0,
3857                },
3858            )
3859            .unwrap();
3860
3861        registry.terminate_owner("shutdown".into()).unwrap();
3862
3863        assert!(matches!(
3864            registry.snapshot(&running_id).unwrap().unwrap().status,
3865            OperationStatus::Terminated
3866        ));
3867        assert!(matches!(
3868            registry.snapshot(&completed_id).unwrap().unwrap().status,
3869            OperationStatus::Completed
3870        ));
3871    }
3872
3873    #[test]
3874    fn collect_completed_drains_terminal_operations() {
3875        let registry = RuntimeOpsLifecycleRegistry::new();
3876
3877        let spec_a = background_spec("a");
3878        let id_a = spec_a.id.clone();
3879        registry.register_operation(spec_a).unwrap();
3880        registry.provisioning_succeeded(&id_a).unwrap();
3881        registry
3882            .complete_operation(
3883                &id_a,
3884                OperationResult {
3885                    id: id_a.clone(),
3886                    content: "done".into(),
3887                    is_error: false,
3888                    duration_ms: 1,
3889                    tokens_used: 0,
3890                },
3891            )
3892            .unwrap();
3893
3894        let spec_b = background_spec("b");
3895        let id_b = spec_b.id.clone();
3896        registry.register_operation(spec_b).unwrap();
3897
3898        let collected = registry.collect_completed().unwrap();
3899        assert_eq!(collected.len(), 1);
3900        assert_eq!(collected[0].0, id_a);
3901
3902        assert!(registry.snapshot(&id_a).unwrap().is_none());
3903        assert!(registry.snapshot(&id_b).unwrap().is_some());
3904
3905        let collected2 = registry.collect_completed().unwrap();
3906        assert!(collected2.is_empty());
3907    }
3908
3909    #[test]
3910    fn bounded_completed_retention_evicts_oldest() {
3911        let registry = RuntimeOpsLifecycleRegistry::with_config(OpsLifecycleConfig {
3912            max_completed: 3,
3913            max_concurrent: None,
3914        });
3915
3916        let mut ids = Vec::new();
3917        for i in 0..5 {
3918            let spec = background_spec(&format!("op-{i}"));
3919            let id = spec.id.clone();
3920            registry.register_operation(spec).unwrap();
3921            registry.provisioning_succeeded(&id).unwrap();
3922            registry
3923                .complete_operation(
3924                    &id,
3925                    OperationResult {
3926                        id: id.clone(),
3927                        content: format!("done-{i}"),
3928                        is_error: false,
3929                        duration_ms: 1,
3930                        tokens_used: 0,
3931                    },
3932                )
3933                .unwrap();
3934            ids.push(id);
3935        }
3936
3937        assert!(registry.snapshot(&ids[0]).unwrap().is_none());
3938        assert!(registry.snapshot(&ids[1]).unwrap().is_none());
3939        assert!(registry.snapshot(&ids[2]).unwrap().is_some());
3940        assert!(registry.snapshot(&ids[3]).unwrap().is_some());
3941        assert!(registry.snapshot(&ids[4]).unwrap().is_some());
3942    }
3943
3944    #[test]
3945    fn recovered_snapshot_retains_only_machine_accepted_terminal_records() {
3946        let registry = RuntimeOpsLifecycleRegistry::new();
3947
3948        let completed_spec = background_spec("completed");
3949        let completed_id = completed_spec.id.clone();
3950        registry.register_operation(completed_spec).unwrap();
3951        registry.provisioning_succeeded(&completed_id).unwrap();
3952        registry
3953            .complete_operation(
3954                &completed_id,
3955                OperationResult {
3956                    id: completed_id.clone(),
3957                    content: "done".into(),
3958                    is_error: false,
3959                    duration_ms: 1,
3960                    tokens_used: 0,
3961                },
3962            )
3963            .unwrap();
3964
3965        let running_spec = background_spec("running");
3966        let running_id = running_spec.id.clone();
3967        registry.register_operation(running_spec).unwrap();
3968        registry.provisioning_succeeded(&running_id).unwrap();
3969
3970        let cursor_state = meerkat_core::EpochCursorState::new();
3971        let snapshot = registry
3972            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
3973            .unwrap();
3974        let recovered = RuntimeOpsLifecycleRegistry::from_recovered(snapshot).unwrap();
3975
3976        assert!(recovered.snapshot(&completed_id).unwrap().is_some());
3977        assert!(recovered.snapshot(&running_id).unwrap().is_none());
3978
3979        let collected = recovered.collect_completed().unwrap();
3980        assert_eq!(collected.len(), 1);
3981        assert_eq!(collected[0].0, completed_id);
3982    }
3983
3984    #[test]
3985    fn capacity_slot_terminal_is_not_persisted_or_recovered() {
3986        let registry = RuntimeOpsLifecycleRegistry::new();
3987
3988        let mut spec = background_spec("capacity");
3989        spec.kind = OperationKind::BackgroundToolCapacitySlot;
3990        let operation_id = spec.id.clone();
3991        registry.register_operation(spec).unwrap();
3992        registry.provisioning_succeeded(&operation_id).unwrap();
3993        registry.mark_retired(&operation_id).unwrap();
3994
3995        assert!(registry.snapshot(&operation_id).unwrap().is_none());
3996
3997        let cursor_state = meerkat_core::EpochCursorState::new();
3998        let snapshot = registry
3999            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4000            .unwrap();
4001        assert!(
4002            !snapshot
4003                .authority_state
4004                .operations
4005                .contains_key(&operation_id)
4006        );
4007        assert!(!snapshot.operation_specs.contains_key(&operation_id));
4008        assert!(snapshot.completion_entries.is_empty());
4009
4010        let recovered = RuntimeOpsLifecycleRegistry::from_recovered(snapshot).unwrap();
4011        assert!(recovered.snapshot(&operation_id).unwrap().is_none());
4012    }
4013
4014    #[test]
4015    fn recovered_snapshot_uses_authority_operation_source() {
4016        let registry = RuntimeOpsLifecycleRegistry::new();
4017        let child_session_id = SessionId::new();
4018        let operation_source = OperationSource::session_child(child_session_id.clone());
4019        let spec = OperationSpec {
4020            id: OperationId::new(),
4021            kind: OperationKind::MobMemberChild,
4022            owner_session_id: SessionId::new(),
4023            display_name: "source-recovery".into(),
4024            source_label: "test".into(),
4025            operation_source: Some(operation_source.clone()),
4026            child_session_id: Some(child_session_id),
4027            expect_peer_channel: true,
4028        };
4029        let operation_id = spec.id.clone();
4030
4031        registry.register_operation(spec).unwrap();
4032        registry.provisioning_succeeded(&operation_id).unwrap();
4033        registry.mark_retired(&operation_id).unwrap();
4034
4035        let cursor_state = meerkat_core::EpochCursorState::new();
4036        let mut snapshot = registry
4037            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4038            .unwrap();
4039        assert_eq!(
4040            snapshot
4041                .authority_state
4042                .operations
4043                .get(&operation_id)
4044                .and_then(|state| state.operation_source.as_ref()),
4045            Some(&operation_source)
4046        );
4047
4048        snapshot
4049            .operation_specs
4050            .get_mut(&operation_id)
4051            .expect("persisted spec")
4052            .operation_source = None;
4053        let recovered = RuntimeOpsLifecycleRegistry::from_recovered(snapshot).unwrap();
4054        assert_eq!(
4055            recovered
4056                .snapshot(&operation_id)
4057                .unwrap()
4058                .unwrap()
4059                .operation_source,
4060            Some(operation_source)
4061        );
4062    }
4063
4064    #[test]
4065    fn recovered_snapshot_rejects_operation_source_mirror_drift() {
4066        let registry = RuntimeOpsLifecycleRegistry::new();
4067        let child_session_id = SessionId::new();
4068        let operation_source = OperationSource::session_child(child_session_id.clone());
4069        let spec = OperationSpec {
4070            id: OperationId::new(),
4071            kind: OperationKind::MobMemberChild,
4072            owner_session_id: SessionId::new(),
4073            display_name: "source-drift".into(),
4074            source_label: "test".into(),
4075            operation_source: Some(operation_source),
4076            child_session_id: Some(child_session_id),
4077            expect_peer_channel: true,
4078        };
4079        let operation_id = spec.id.clone();
4080
4081        registry.register_operation(spec).unwrap();
4082        registry.provisioning_succeeded(&operation_id).unwrap();
4083        registry.mark_retired(&operation_id).unwrap();
4084
4085        let cursor_state = meerkat_core::EpochCursorState::new();
4086        let mut snapshot = registry
4087            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4088            .unwrap();
4089        snapshot
4090            .operation_specs
4091            .get_mut(&operation_id)
4092            .expect("persisted spec")
4093            .operation_source = Some(OperationSource::session_child(SessionId::new()));
4094
4095        let err = RuntimeOpsLifecycleRegistry::from_recovered(snapshot)
4096            .expect_err("source mirror drift must fail recovery");
4097        assert!(
4098            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("operation source mirror")),
4099            "unexpected recovery error: {err:?}"
4100        );
4101    }
4102
4103    #[test]
4104    fn persisted_authority_state_serializes_explicit_no_operation_source() {
4105        let registry = RuntimeOpsLifecycleRegistry::new();
4106
4107        let spec = background_spec("explicit-no-source");
4108        let operation_id = spec.id.clone();
4109        registry.register_operation(spec).unwrap();
4110        registry.provisioning_succeeded(&operation_id).unwrap();
4111
4112        let cursor_state = meerkat_core::EpochCursorState::new();
4113        let snapshot = registry
4114            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4115            .unwrap();
4116        let value = serde_json::to_value(&snapshot).unwrap();
4117        let operations = value
4118            .get("authority_state")
4119            .and_then(|state| state.get("operations"))
4120            .and_then(serde_json::Value::as_object)
4121            .expect("serialized authority operations");
4122        let persisted_state = operations
4123            .values()
4124            .next()
4125            .and_then(serde_json::Value::as_object)
4126            .expect("serialized operation state");
4127
4128        assert!(
4129            persisted_state
4130                .get("operation_source")
4131                .is_some_and(serde_json::Value::is_null),
4132            "generated explicit no-source fact must be serialized as present null: {persisted_state:?}"
4133        );
4134
4135        let recovered_snapshot = serde_json::from_value::<PersistedOpsSnapshot>(value).unwrap();
4136        assert_eq!(
4137            recovered_snapshot
4138                .authority_state
4139                .operations
4140                .get(&operation_id)
4141                .expect("round-tripped operation")
4142                .operation_source,
4143            None
4144        );
4145    }
4146
4147    #[test]
4148    fn persisted_authority_state_rejects_missing_operation_source_fact() {
4149        let registry = RuntimeOpsLifecycleRegistry::new();
4150
4151        let spec = background_spec("missing-source-fact");
4152        registry.register_operation(spec).unwrap();
4153
4154        let cursor_state = meerkat_core::EpochCursorState::new();
4155        let snapshot = registry
4156            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4157            .unwrap();
4158        let mut value = serde_json::to_value(&snapshot).unwrap();
4159        let operations = value
4160            .get_mut("authority_state")
4161            .and_then(|state| state.get_mut("operations"))
4162            .and_then(serde_json::Value::as_object_mut)
4163            .expect("serialized authority operations");
4164        let operation_state = operations
4165            .values_mut()
4166            .next()
4167            .and_then(serde_json::Value::as_object_mut)
4168            .expect("serialized operation state");
4169        assert!(operation_state.remove("operation_source").is_some());
4170
4171        let err = serde_json::from_value::<PersistedOpsSnapshot>(value)
4172            .expect_err("missing generated source fact must fail recovery snapshot decoding");
4173        assert!(
4174            err.to_string().contains("operation_source"),
4175            "unexpected decode error: {err}"
4176        );
4177    }
4178
4179    #[test]
4180    fn persisted_authority_state_rejects_missing_completion_feed_authority() {
4181        let registry = RuntimeOpsLifecycleRegistry::new();
4182
4183        let spec = background_spec("missing-feed-authority");
4184        let operation_id = spec.id.clone();
4185        registry.register_operation(spec).unwrap();
4186        registry.provisioning_succeeded(&operation_id).unwrap();
4187        registry
4188            .complete_operation(
4189                &operation_id,
4190                OperationResult {
4191                    id: operation_id.clone(),
4192                    content: "done".into(),
4193                    is_error: false,
4194                    duration_ms: 1,
4195                    tokens_used: 0,
4196                },
4197            )
4198            .unwrap();
4199
4200        let cursor_state = meerkat_core::EpochCursorState::new();
4201        let snapshot = registry
4202            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4203            .unwrap();
4204        let mut value = serde_json::to_value(&snapshot).unwrap();
4205        let authority_state = value
4206            .get_mut("authority_state")
4207            .and_then(serde_json::Value::as_object_mut)
4208            .expect("serialized authority state");
4209        assert!(authority_state.remove("completion_feed_entries").is_some());
4210
4211        let err = serde_json::from_value::<PersistedOpsSnapshot>(value)
4212            .expect_err("missing generated feed authority must fail recovery snapshot decoding");
4213        assert!(
4214            err.to_string().contains("completion_feed_entries"),
4215            "unexpected decode error: {err}"
4216        );
4217    }
4218
4219    #[test]
4220    fn public_child_session_projection_uses_authority_operation_source() {
4221        let registry = RuntimeOpsLifecycleRegistry::new();
4222        let authority_child_session_id = SessionId::new();
4223        let stale_shell_child_session_id = SessionId::new();
4224        let operation_source = OperationSource::session_child(authority_child_session_id.clone());
4225        let spec = OperationSpec {
4226            id: OperationId::new(),
4227            kind: OperationKind::MobMemberChild,
4228            owner_session_id: SessionId::new(),
4229            display_name: "child-projection".into(),
4230            source_label: "test".into(),
4231            operation_source: Some(operation_source),
4232            child_session_id: Some(stale_shell_child_session_id),
4233            expect_peer_channel: true,
4234        };
4235        let operation_id = spec.id.clone();
4236
4237        registry.register_operation(spec).unwrap();
4238
4239        assert_eq!(
4240            registry
4241                .snapshot(&operation_id)
4242                .unwrap()
4243                .unwrap()
4244                .child_session_id,
4245            Some(authority_child_session_id.clone())
4246        );
4247
4248        let cursor_state = meerkat_core::EpochCursorState::new();
4249        let snapshot = registry
4250            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4251            .unwrap();
4252        assert_eq!(
4253            snapshot
4254                .operation_specs
4255                .get(&operation_id)
4256                .expect("persisted spec")
4257                .child_session_id,
4258            Some(authority_child_session_id)
4259        );
4260    }
4261
4262    #[test]
4263    fn generated_terminal_payload_projection_fails_closed() {
4264        let registry = RuntimeOpsLifecycleRegistry::new();
4265
4266        let spec = background_spec("terminal-payload-drift");
4267        let operation_id = spec.id.clone();
4268        registry.register_operation(spec).unwrap();
4269        registry.provisioning_succeeded(&operation_id).unwrap();
4270        registry
4271            .complete_operation(
4272                &operation_id,
4273                OperationResult {
4274                    id: operation_id.clone(),
4275                    content: "done".into(),
4276                    is_error: false,
4277                    duration_ms: 1,
4278                    tokens_used: 0,
4279                },
4280            )
4281            .unwrap();
4282
4283        {
4284            let mut state = registry.write_state().unwrap();
4285            let mut machine_state = state.dsl.0.state().clone();
4286            let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4287            machine_state
4288                .op_terminal_payload
4289                .insert(operation_id_key, OperationTerminalOutcome::Retired);
4290            state.dsl = DslAuthority(Box::new(
4291                mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4292            ));
4293        }
4294
4295        let err = match registry.register_watcher(&operation_id) {
4296            Ok(_) => panic!("invalid generated terminal payload must reject watcher projection"),
4297            Err(err) => err,
4298        };
4299        assert!(
4300            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("payload variant") && message.contains("does not match terminal outcome discriminant")),
4301            "unexpected watcher error: {err:?}"
4302        );
4303        let err = registry
4304            .snapshot(&operation_id)
4305            .expect_err("invalid generated terminal payload must reject public snapshot");
4306        assert!(
4307            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("does not match terminal outcome discriminant")),
4308            "unexpected public snapshot error: {err:?}"
4309        );
4310
4311        let cursor_state = meerkat_core::EpochCursorState::new();
4312        let err = match registry
4313            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4314        {
4315            Ok(_) => panic!("invalid generated terminal payload must reject persistence snapshot"),
4316            Err(err) => err,
4317        };
4318        assert!(
4319            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("payload variant") && message.contains("does not match terminal outcome discriminant")),
4320            "unexpected snapshot error: {err:?}"
4321        );
4322
4323        let err = match registry.collect_completed() {
4324            Ok(_) => panic!("invalid generated terminal payload must reject collection"),
4325            Err(err) => err,
4326        };
4327        assert!(
4328            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("payload variant") && message.contains("does not match terminal outcome discriminant")),
4329            "unexpected collection error: {err:?}"
4330        );
4331    }
4332
4333    #[test]
4334    fn generated_terminal_payload_missing_projection_fails_closed() {
4335        let registry = RuntimeOpsLifecycleRegistry::new();
4336
4337        let spec = background_spec("terminal-payload-missing");
4338        let operation_id = spec.id.clone();
4339        registry.register_operation(spec).unwrap();
4340        registry.provisioning_succeeded(&operation_id).unwrap();
4341        registry
4342            .fail_operation(&operation_id, "boom".into())
4343            .unwrap();
4344
4345        {
4346            let mut state = registry.write_state().unwrap();
4347            let mut machine_state = state.dsl.0.state().clone();
4348            let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4349            machine_state.op_terminal_payload.remove(&operation_id_key);
4350            state.dsl = DslAuthority(Box::new(
4351                mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4352            ));
4353        }
4354
4355        let err = match registry.register_watcher(&operation_id) {
4356            Ok(_) => panic!("missing generated terminal payload must reject watcher projection"),
4357            Err(err) => err,
4358        };
4359        assert!(
4360            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing terminal payload")),
4361            "unexpected watcher error: {err:?}"
4362        );
4363        let err = registry
4364            .snapshot(&operation_id)
4365            .expect_err("missing generated terminal payload must reject public snapshot");
4366        assert!(
4367            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing terminal payload")),
4368            "unexpected public snapshot error: {err:?}"
4369        );
4370    }
4371
4372    #[test]
4373    fn generated_terminal_status_without_outcome_fails_closed() {
4374        let registry = RuntimeOpsLifecycleRegistry::new();
4375
4376        let spec = background_spec("terminal-outcome-missing");
4377        let operation_id = spec.id.clone();
4378        registry.register_operation(spec).unwrap();
4379        registry.provisioning_succeeded(&operation_id).unwrap();
4380        registry
4381            .fail_operation(&operation_id, "boom".into())
4382            .unwrap();
4383
4384        {
4385            let mut state = registry.write_state().unwrap();
4386            let mut machine_state = state.dsl.0.state().clone();
4387            let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4388            machine_state.op_terminal_outcomes.remove(&operation_id_key);
4389            state.dsl = DslAuthority(Box::new(
4390                mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4391            ));
4392        }
4393
4394        let err = registry
4395            .snapshot(&operation_id)
4396            .expect_err("terminal status without outcome must reject public snapshot");
4397        assert!(
4398            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing terminal outcome")),
4399            "unexpected public snapshot error: {err:?}"
4400        );
4401
4402        let err = match registry.collect_completed() {
4403            Ok(_) => panic!("terminal status without outcome must reject collection"),
4404            Err(err) => err,
4405        };
4406        assert!(
4407            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing terminal outcome")),
4408            "unexpected collection error: {err:?}"
4409        );
4410    }
4411
4412    #[test]
4413    fn generated_operation_source_projection_fails_closed() {
4414        let registry = RuntimeOpsLifecycleRegistry::new();
4415        let child_session_id = SessionId::new();
4416        let operation_source = OperationSource::session_child(child_session_id.clone());
4417        let spec = OperationSpec {
4418            id: OperationId::new(),
4419            kind: OperationKind::MobMemberChild,
4420            owner_session_id: SessionId::new(),
4421            display_name: "source-authority-drift".into(),
4422            source_label: "test".into(),
4423            operation_source: Some(operation_source),
4424            child_session_id: Some(child_session_id),
4425            expect_peer_channel: true,
4426        };
4427        let operation_id = spec.id.clone();
4428
4429        registry.register_operation(spec).unwrap();
4430
4431        {
4432            let mut state = registry.write_state().unwrap();
4433            let mut machine_state = state.dsl.0.state().clone();
4434            let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4435            machine_state
4436                .op_sources
4437                .get_mut(&operation_id_key)
4438                .expect("generated operation source")
4439                .session_id = None;
4440            state.dsl = DslAuthority(Box::new(
4441                mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4442            ));
4443        }
4444
4445        let err = registry
4446            .snapshot(&operation_id)
4447            .expect_err("invalid generated operation source must reject public snapshot");
4448        assert!(
4449            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("generated operation source authority has invalid source")),
4450            "unexpected public snapshot error: {err:?}"
4451        );
4452        let err = registry
4453            .list_operations()
4454            .expect_err("invalid generated operation source must reject public operation list");
4455        assert!(
4456            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("generated operation source authority has invalid source")),
4457            "unexpected operation list error: {err:?}"
4458        );
4459
4460        let cursor_state = meerkat_core::EpochCursorState::new();
4461        let err = match registry
4462            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4463        {
4464            Ok(_) => panic!("invalid generated operation source must reject persistence snapshot"),
4465            Err(err) => err,
4466        };
4467        assert!(
4468            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("generated operation source authority has invalid source")),
4469            "unexpected snapshot error: {err:?}"
4470        );
4471    }
4472
4473    #[test]
4474    fn generated_operation_id_projection_fails_closed() {
4475        let registry = RuntimeOpsLifecycleRegistry::new();
4476
4477        {
4478            let mut state = registry.write_state().unwrap();
4479            let mut machine_state = state.dsl.0.state().clone();
4480            machine_state.op_statuses.insert(
4481                "not-json-operation-id".into(),
4482                mm_dsl::OperationStatus::Running,
4483            );
4484            state.dsl = DslAuthority(Box::new(
4485                mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4486            ));
4487        }
4488
4489        let err = registry
4490            .list_operations()
4491            .expect_err("invalid generated operation id must reject public operation list");
4492        assert!(
4493            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("invalid operation id key")),
4494            "unexpected operation list error: {err:?}"
4495        );
4496
4497        let cursor_state = meerkat_core::EpochCursorState::new();
4498        let err = match registry
4499            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4500        {
4501            Ok(_) => panic!("invalid generated operation id must reject persistence snapshot"),
4502            Err(err) => err,
4503        };
4504        assert!(
4505            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("invalid operation id key")),
4506            "unexpected persistence snapshot error: {err:?}"
4507        );
4508    }
4509
4510    #[test]
4511    fn generated_missing_kind_projection_fails_closed() {
4512        let registry = RuntimeOpsLifecycleRegistry::new();
4513        let spec = background_spec("missing-kind");
4514        let operation_id = spec.id.clone();
4515        registry.register_operation(spec).unwrap();
4516
4517        {
4518            let mut state = registry.write_state().unwrap();
4519            let mut machine_state = state.dsl.0.state().clone();
4520            let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4521            machine_state.op_kinds.remove(&operation_id_key);
4522            state.dsl = DslAuthority(Box::new(
4523                mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4524            ));
4525        }
4526
4527        let err = registry
4528            .snapshot(&operation_id)
4529            .expect_err("missing generated kind must reject public snapshot");
4530        assert!(
4531            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing kind")),
4532            "unexpected public snapshot error: {err:?}"
4533        );
4534        let err = registry
4535            .list_operations()
4536            .expect_err("missing generated kind must reject public list");
4537        assert!(
4538            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing kind")),
4539            "unexpected public list error: {err:?}"
4540        );
4541
4542        let cursor_state = meerkat_core::EpochCursorState::new();
4543        let err = registry
4544            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4545            .expect_err("missing generated kind must reject persistence snapshot");
4546        assert!(
4547            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing kind")),
4548            "unexpected persistence snapshot error: {err:?}"
4549        );
4550    }
4551
4552    #[test]
4553    fn generated_missing_status_projection_fails_closed() {
4554        let registry = RuntimeOpsLifecycleRegistry::new();
4555        let spec = background_spec("missing-status");
4556        let operation_id = spec.id.clone();
4557        registry.register_operation(spec).unwrap();
4558
4559        {
4560            let mut state = registry.write_state().unwrap();
4561            let mut machine_state = state.dsl.0.state().clone();
4562            let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4563            machine_state.op_statuses.remove(&operation_id_key);
4564            state.dsl = DslAuthority(Box::new(
4565                mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4566            ));
4567        }
4568
4569        let err = registry
4570            .snapshot(&operation_id)
4571            .expect_err("missing generated status must reject public snapshot");
4572        assert!(
4573            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing status")),
4574            "unexpected public snapshot error: {err:?}"
4575        );
4576        let err = registry
4577            .list_operations()
4578            .expect_err("missing generated status must reject public list");
4579        assert!(
4580            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing status")),
4581            "unexpected public list error: {err:?}"
4582        );
4583        let err = registry
4584            .classify_operation_public_result(&operation_id)
4585            .expect_err("missing generated status must reject public-result classification");
4586        assert!(
4587            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing status")),
4588            "unexpected public-result error: {err:?}"
4589        );
4590    }
4591
4592    #[test]
4593    fn generated_retiring_public_result_remains_running_until_terminal() {
4594        let registry = RuntimeOpsLifecycleRegistry::new();
4595        let spec = background_spec("retiring-public-result");
4596        let operation_id = spec.id.clone();
4597        registry.register_operation(spec).unwrap();
4598        registry.provisioning_succeeded(&operation_id).unwrap();
4599        registry.request_retire(&operation_id).unwrap();
4600
4601        let snapshot = registry.snapshot(&operation_id).unwrap().unwrap();
4602        assert_eq!(snapshot.status, OperationStatus::Retiring);
4603        assert!(snapshot.terminal_outcome.is_none());
4604        assert!(!snapshot.terminal);
4605        assert_eq!(
4606            snapshot.public_result_class,
4607            OperationPublicResultClass::Running
4608        );
4609        assert_eq!(
4610            registry
4611                .classify_operation_public_result(&operation_id)
4612                .unwrap(),
4613            OperationPublicResultClass::Running
4614        );
4615    }
4616
4617    #[test]
4618    fn generated_missing_peer_ready_projection_fails_closed() {
4619        let registry = RuntimeOpsLifecycleRegistry::new();
4620        let spec = background_spec("missing-peer-ready");
4621        let operation_id = spec.id.clone();
4622        registry.register_operation(spec).unwrap();
4623
4624        {
4625            let mut state = registry.write_state().unwrap();
4626            let mut machine_state = state.dsl.0.state().clone();
4627            let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4628            machine_state.op_peer_ready.remove(&operation_id_key);
4629            state.dsl = DslAuthority(Box::new(
4630                mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4631            ));
4632        }
4633
4634        let err = registry
4635            .snapshot(&operation_id)
4636            .expect_err("missing generated peer-ready fact must reject public snapshot");
4637        assert!(
4638            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing peer-ready")),
4639            "unexpected public snapshot error: {err:?}"
4640        );
4641
4642        let cursor_state = meerkat_core::EpochCursorState::new();
4643        let err = registry
4644            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4645            .expect_err("missing generated peer-ready fact must reject persistence snapshot");
4646        assert!(
4647            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing peer-ready")),
4648            "unexpected persistence snapshot error: {err:?}"
4649        );
4650    }
4651
4652    #[test]
4653    fn generated_missing_progress_count_projection_fails_closed() {
4654        let registry = RuntimeOpsLifecycleRegistry::new();
4655        let spec = background_spec("missing-progress-count");
4656        let operation_id = spec.id.clone();
4657        registry.register_operation(spec).unwrap();
4658
4659        {
4660            let mut state = registry.write_state().unwrap();
4661            let mut machine_state = state.dsl.0.state().clone();
4662            let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4663            machine_state.op_progress_counts.remove(&operation_id_key);
4664            state.dsl = DslAuthority(Box::new(
4665                mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4666            ));
4667        }
4668
4669        let err = registry
4670            .snapshot(&operation_id)
4671            .expect_err("missing generated progress count must reject public snapshot");
4672        assert!(
4673            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing progress count")),
4674            "unexpected public snapshot error: {err:?}"
4675        );
4676
4677        let cursor_state = meerkat_core::EpochCursorState::new();
4678        let err = registry
4679            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4680            .expect_err("missing generated progress count must reject persistence snapshot");
4681        assert!(
4682            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing progress count")),
4683            "unexpected persistence snapshot error: {err:?}"
4684        );
4685    }
4686
4687    #[test]
4688    fn generated_terminal_sequence_missing_persistence_fails_closed() {
4689        let registry = RuntimeOpsLifecycleRegistry::new();
4690        let spec = background_spec("terminal-sequence-missing");
4691        let operation_id = spec.id.clone();
4692        registry.register_operation(spec).unwrap();
4693        registry.provisioning_succeeded(&operation_id).unwrap();
4694        registry
4695            .complete_operation(
4696                &operation_id,
4697                OperationResult {
4698                    id: operation_id.clone(),
4699                    content: "done".into(),
4700                    is_error: false,
4701                    duration_ms: 1,
4702                    tokens_used: 0,
4703                },
4704            )
4705            .unwrap();
4706
4707        {
4708            let mut state = registry.write_state().unwrap();
4709            let mut machine_state = state.dsl.0.state().clone();
4710            let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4711            machine_state.op_completion_seq.remove(&operation_id_key);
4712            state.dsl = DslAuthority(Box::new(
4713                mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4714            ));
4715        }
4716
4717        let cursor_state = meerkat_core::EpochCursorState::new();
4718        let err = registry
4719            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4720            .expect_err("missing generated terminal sequence must reject persistence snapshot");
4721        assert!(
4722            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing completion sequence")),
4723            "unexpected persistence snapshot error: {err:?}"
4724        );
4725    }
4726
4727    #[test]
4728    fn generated_record_without_shell_projection_fails_closed() {
4729        let registry = RuntimeOpsLifecycleRegistry::new();
4730        let spec = background_spec("missing-shell-record");
4731        let operation_id = spec.id.clone();
4732        registry.register_operation(spec).unwrap();
4733
4734        {
4735            let mut state = registry.write_state().unwrap();
4736            state.records.remove(&operation_id);
4737        }
4738
4739        let err = registry
4740            .snapshot(&operation_id)
4741            .expect_err("generated operation without shell record must reject public snapshot");
4742        assert!(
4743            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("without shell projection record")),
4744            "unexpected public snapshot error: {err:?}"
4745        );
4746        let err = registry
4747            .list_operations()
4748            .expect_err("generated operation without shell record must reject public list");
4749        assert!(
4750            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("without shell projection record")),
4751            "unexpected public list error: {err:?}"
4752        );
4753    }
4754
4755    #[test]
4756    fn recovered_snapshot_rebuilds_child_session_mirror_from_authority() {
4757        let registry = RuntimeOpsLifecycleRegistry::new();
4758        let child_session_id = SessionId::new();
4759        let operation_source = OperationSource::session_child(child_session_id.clone());
4760        let spec = OperationSpec {
4761            id: OperationId::new(),
4762            kind: OperationKind::MobMemberChild,
4763            owner_session_id: SessionId::new(),
4764            display_name: "child-drift".into(),
4765            source_label: "test".into(),
4766            operation_source: Some(operation_source),
4767            child_session_id: Some(child_session_id.clone()),
4768            expect_peer_channel: true,
4769        };
4770        let operation_id = spec.id.clone();
4771
4772        registry.register_operation(spec).unwrap();
4773        registry.provisioning_succeeded(&operation_id).unwrap();
4774        registry.mark_retired(&operation_id).unwrap();
4775
4776        let cursor_state = meerkat_core::EpochCursorState::new();
4777        let mut snapshot = registry
4778            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4779            .unwrap();
4780        snapshot
4781            .operation_specs
4782            .get_mut(&operation_id)
4783            .expect("persisted spec")
4784            .child_session_id = Some(SessionId::new());
4785
4786        let recovered = RuntimeOpsLifecycleRegistry::from_recovered(snapshot).unwrap();
4787        assert_eq!(
4788            recovered
4789                .snapshot(&operation_id)
4790                .unwrap()
4791                .unwrap()
4792                .child_session_id,
4793            Some(child_session_id)
4794        );
4795    }
4796
4797    #[test]
4798    fn completion_wake_class_is_generated_by_operation_kind() {
4799        let registry = RuntimeOpsLifecycleRegistry::new();
4800        let operation_id = OperationId::new();
4801
4802        assert_eq!(
4803            registry
4804                .classify_operation_completion_wake(&operation_id, OperationKind::BackgroundToolOp)
4805                .unwrap(),
4806            OperationCompletionWakeClass::Wake
4807        );
4808        assert_eq!(
4809            registry
4810                .classify_operation_completion_wake(&operation_id, OperationKind::MobMemberChild)
4811                .unwrap(),
4812            OperationCompletionWakeClass::Ignore
4813        );
4814        assert_eq!(
4815            registry
4816                .classify_operation_completion_wake(
4817                    &operation_id,
4818                    OperationKind::BackgroundToolCapacitySlot,
4819                )
4820                .unwrap(),
4821            OperationCompletionWakeClass::Ignore
4822        );
4823    }
4824
4825    #[test]
4826    fn recovered_snapshot_rejects_completion_feed_without_generated_record() {
4827        let registry = RuntimeOpsLifecycleRegistry::new();
4828
4829        let running_spec = background_spec("running");
4830        let running_id = running_spec.id.clone();
4831        registry.register_operation(running_spec.clone()).unwrap();
4832        registry.provisioning_succeeded(&running_id).unwrap();
4833
4834        let cursor_state = meerkat_core::EpochCursorState::new();
4835        let mut snapshot = registry
4836            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4837            .unwrap();
4838        snapshot.completion_entries.push(CompletionEntry {
4839            seq: 1,
4840            operation_id: running_id.clone(),
4841            kind: running_spec.kind,
4842            display_name: running_spec.display_name,
4843            terminal_outcome: OperationTerminalOutcome::Completed(OperationResult {
4844                id: running_id,
4845                content: "phantom".into(),
4846                is_error: false,
4847                duration_ms: 1,
4848                tokens_used: 0,
4849            }),
4850            completed_at_ms: None,
4851        });
4852
4853        let err = match RuntimeOpsLifecycleRegistry::from_recovered(snapshot) {
4854            Ok(_) => panic!("public completion feed must not recover without generated op truth"),
4855            Err(err) => err,
4856        };
4857        assert!(
4858            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("no generated feed authority")),
4859            "unexpected recovery error: {err:?}"
4860        );
4861    }
4862
4863    #[test]
4864    fn recovered_snapshot_rejects_feed_authority_beyond_completion_cursor() {
4865        let registry = RuntimeOpsLifecycleRegistry::new();
4866
4867        let spec = background_spec("terminal");
4868        let operation_id = spec.id.clone();
4869        registry.register_operation(spec).unwrap();
4870        registry.provisioning_succeeded(&operation_id).unwrap();
4871        registry
4872            .complete_operation(
4873                &operation_id,
4874                OperationResult {
4875                    id: operation_id.clone(),
4876                    content: "done".into(),
4877                    is_error: false,
4878                    duration_ms: 1,
4879                    tokens_used: 0,
4880                },
4881            )
4882            .unwrap();
4883
4884        let cursor_state = meerkat_core::EpochCursorState::new();
4885        let mut snapshot = registry
4886            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4887            .unwrap();
4888        let phantom_id = OperationId::new();
4889        let phantom_result = OperationResult {
4890            id: phantom_id.clone(),
4891            content: "phantom".into(),
4892            is_error: false,
4893            duration_ms: 1,
4894            tokens_used: 0,
4895        };
4896        let phantom_entry = CompletionFeedCanonicalState {
4897            seq: snapshot.authority_state.next_completion_seq + 1,
4898            kind: OperationKind::BackgroundToolOp,
4899            terminal_outcome: OperationTerminalOutcome::Completed(phantom_result.clone()),
4900        };
4901        snapshot
4902            .authority_state
4903            .completion_feed_entries
4904            .insert(phantom_id.clone(), phantom_entry);
4905        snapshot.completion_entries.push(CompletionEntry {
4906            seq: snapshot.authority_state.next_completion_seq + 1,
4907            operation_id: phantom_id.clone(),
4908            kind: OperationKind::BackgroundToolOp,
4909            display_name: "phantom".into(),
4910            terminal_outcome: OperationTerminalOutcome::Completed(phantom_result),
4911            completed_at_ms: None,
4912        });
4913
4914        let err = match RuntimeOpsLifecycleRegistry::from_recovered(snapshot) {
4915            Ok(_) => panic!("feed authority must not advance the recovered completion cursor"),
4916            Err(err) => err,
4917        };
4918        assert!(
4919            matches!(&err, OpsLifecycleError::Internal(message) if message.contains("RecoverCompletionFeedEntry")),
4920            "unexpected recovery error: {err:?}"
4921        );
4922    }
4923
4924    #[test]
4925    fn recovered_completed_order_uses_generated_completion_sequences() {
4926        let registry = RuntimeOpsLifecycleRegistry::new();
4927
4928        let spec_a = background_spec("a");
4929        let id_a = spec_a.id.clone();
4930        registry.register_operation(spec_a).unwrap();
4931        registry.provisioning_succeeded(&id_a).unwrap();
4932        registry
4933            .complete_operation(
4934                &id_a,
4935                OperationResult {
4936                    id: id_a.clone(),
4937                    content: "a".into(),
4938                    is_error: false,
4939                    duration_ms: 1,
4940                    tokens_used: 0,
4941                },
4942            )
4943            .unwrap();
4944
4945        let spec_b = background_spec("b");
4946        let id_b = spec_b.id.clone();
4947        registry.register_operation(spec_b).unwrap();
4948        registry.provisioning_succeeded(&id_b).unwrap();
4949        registry
4950            .complete_operation(
4951                &id_b,
4952                OperationResult {
4953                    id: id_b.clone(),
4954                    content: "b".into(),
4955                    is_error: false,
4956                    duration_ms: 1,
4957                    tokens_used: 0,
4958                },
4959            )
4960            .unwrap();
4961
4962        let cursor_state = meerkat_core::EpochCursorState::new();
4963        let mut snapshot = registry
4964            .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4965            .unwrap();
4966        snapshot.authority_state.completed_order = VecDeque::from([id_b.clone(), id_a.clone()]);
4967
4968        let recovered = RuntimeOpsLifecycleRegistry::from_recovered(snapshot).unwrap();
4969        let collected = recovered.collect_completed().unwrap();
4970
4971        assert_eq!(collected[0].0, id_a);
4972        assert_eq!(collected[1].0, id_b);
4973    }
4974
4975    #[test]
4976    fn max_concurrent_enforcement() {
4977        let registry = RuntimeOpsLifecycleRegistry::with_config(OpsLifecycleConfig {
4978            max_completed: DEFAULT_MAX_COMPLETED,
4979            max_concurrent: Some(2),
4980        });
4981
4982        let spec_a = background_spec("a");
4983        let id_a = spec_a.id.clone();
4984        registry.register_operation(spec_a).unwrap();
4985
4986        let spec_b = background_spec("b");
4987        registry.register_operation(spec_b).unwrap();
4988
4989        let spec_c = background_spec("c");
4990        let result = registry.register_operation(spec_c);
4991        assert!(matches!(
4992            result,
4993            Err(OpsLifecycleError::MaxConcurrentExceeded {
4994                limit: 2,
4995                active: 2,
4996            })
4997        ));
4998
4999        registry.provisioning_succeeded(&id_a).unwrap();
5000        registry
5001            .complete_operation(
5002                &id_a,
5003                OperationResult {
5004                    id: id_a.clone(),
5005                    content: "done".into(),
5006                    is_error: false,
5007                    duration_ms: 1,
5008                    tokens_used: 0,
5009                },
5010            )
5011            .unwrap();
5012
5013        let spec_d = background_spec("d");
5014        assert!(registry.register_operation(spec_d).is_ok());
5015    }
5016
5017    #[test]
5018    fn snapshot_includes_timestamps() {
5019        let registry = RuntimeOpsLifecycleRegistry::new();
5020        let spec = background_spec("timed");
5021        let op_id = spec.id.clone();
5022        registry.register_operation(spec).unwrap();
5023
5024        let snap1 = registry.snapshot(&op_id).unwrap().unwrap();
5025        assert!(snap1.created_at_ms > 0);
5026        assert!(snap1.started_at_ms.is_none());
5027        assert!(snap1.completed_at_ms.is_none());
5028        assert!(snap1.elapsed_ms.is_none());
5029
5030        registry.provisioning_succeeded(&op_id).unwrap();
5031        let snap2 = registry.snapshot(&op_id).unwrap().unwrap();
5032        assert!(snap2.started_at_ms.is_some());
5033        assert!(snap2.started_at_ms.unwrap() >= snap2.created_at_ms);
5034
5035        registry
5036            .complete_operation(
5037                &op_id,
5038                OperationResult {
5039                    id: op_id.clone(),
5040                    content: "done".into(),
5041                    is_error: false,
5042                    duration_ms: 1,
5043                    tokens_used: 0,
5044                },
5045            )
5046            .unwrap();
5047        let snap3 = registry.snapshot(&op_id).unwrap().unwrap();
5048        assert!(snap3.completed_at_ms.is_some());
5049        assert!(snap3.elapsed_ms.is_some());
5050        assert!(snap3.completed_at_ms.unwrap() >= snap3.started_at_ms.unwrap());
5051    }
5052
5053    #[test]
5054    fn snapshot_includes_peer_handle() {
5055        let registry = RuntimeOpsLifecycleRegistry::new();
5056        let child_session_id = SessionId::new();
5057        let spec = OperationSpec {
5058            id: OperationId::new(),
5059            kind: OperationKind::MobMemberChild,
5060            owner_session_id: SessionId::new(),
5061            display_name: "peer-test".into(),
5062            source_label: "test".into(),
5063            operation_source: Some(OperationSource::session_child(child_session_id.clone())),
5064            child_session_id: Some(child_session_id),
5065            expect_peer_channel: true,
5066        };
5067        let op_id = spec.id.clone();
5068        registry.register_operation(spec).unwrap();
5069        registry.provisioning_succeeded(&op_id).unwrap();
5070
5071        let snap1 = registry.snapshot(&op_id).unwrap().unwrap();
5072        assert!(snap1.peer_handle.is_none());
5073
5074        let handle = OperationPeerHandle {
5075            peer_name: meerkat_core::comms::PeerName::new("member-x").unwrap(),
5076            trusted_peer: TrustedPeerDescriptor::test_only_unsigned_typed(
5077                "member-x",
5078                PeerId::new(),
5079                "inproc://x",
5080            )
5081            .unwrap(),
5082        };
5083        registry.peer_ready(&op_id, handle).unwrap();
5084
5085        let snap2 = registry.snapshot(&op_id).unwrap().unwrap();
5086        assert_eq!(
5087            snap2.peer_handle.as_ref().unwrap().peer_name.as_str(),
5088            "member-x"
5089        );
5090    }
5091}