Skip to main content

meerkat_runtime/
ops_lifecycle.rs

1//! In-memory runtime implementation of the shared async-operation lifecycle seam.
2//!
3//! Per-operation canonical lifecycle state lives in the MeerkatMachine DSL
4//! authority (`op_statuses`, `op_terminal_outcomes`, `op_kinds`,
5//! `op_peer_ready`, `op_progress_counts`, `active_op_count`, `wait_active`,
6//! `wait_operation_ids`). This shell layer owns pure mechanics: watcher
7//! channels, timestamps, peer handles, snapshot assembly, FIFO eviction
8//! bookkeeping, the completion feed buffer, and concurrency-limit / duplicate
9//! / peer-expectation admission checks run BEFORE the DSL apply.
10//!
11//! Per-transition legality ("is `CompleteOp` legal on a `Provisioning` op?")
12//! is NOT owned by the shell — it lives in the DSL's `from_status_valid`
13//! guards on each op-lifecycle transition. The shell's only job on a
14//! `GuardRejected` rejection is to surface the pre-read status back to the
15//! caller as [`OpsLifecycleError::InvalidTransition`]; see
16//! [`classify_op_rejection`].
17
18use std::collections::{HashMap, HashSet, VecDeque};
19use std::future::Future;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
22use std::task::{Context, Poll};
23
24use meerkat_core::completion_feed::{
25    CompletionBatch, CompletionEntry, CompletionFeed, CompletionSeq,
26};
27
28#[cfg(target_arch = "wasm32")]
29use crate::tokio;
30use meerkat_core::lifecycle::{RunId, WaitRequestId};
31use meerkat_core::ops_lifecycle::{
32    DEFAULT_MAX_COMPLETED, OperationCompletionWatch, OperationId, OperationKind,
33    OperationLifecycleSnapshot, OperationPeerHandle, OperationProgressUpdate, OperationResult,
34    OperationSpec, OperationStatus, OperationTerminalOutcome, OpsLifecycleError,
35    OpsLifecycleRegistry, WaitAllResult, WaitAllSatisfied,
36};
37use meerkat_core::time_compat::{Instant, SystemTime, UNIX_EPOCH};
38
39use crate::meerkat_machine::dsl as mm_dsl;
40
41// ---------------------------------------------------------------------------
42// Serde-only persisted canonical state shells
43// ---------------------------------------------------------------------------
44//
45// These structures preserve the on-disk wire format of `PersistedOpsSnapshot`
46// produced by earlier runtime versions. They are pure serde shells — no
47// methods beyond read-only field accessors, no authority behavior.
48
49/// Canonical per-operation state as captured in persisted snapshots.
50#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
51pub struct OperationCanonicalState {
52    status: OperationStatus,
53    kind: OperationKind,
54    peer_ready: bool,
55    progress_count: u32,
56    watcher_count: u32,
57    terminal_outcome: Option<OperationTerminalOutcome>,
58    terminal_buffered: bool,
59}
60
61/// Canonical registry-level state as captured in persisted snapshots.
62#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
63pub struct RegistryCanonicalState {
64    operations: HashMap<OperationId, OperationCanonicalState>,
65    completed_order: VecDeque<OperationId>,
66    max_completed: usize,
67    max_concurrent: Option<usize>,
68    active_count: usize,
69    wait_request_id: Option<WaitRequestId>,
70    wait_operation_ids: Vec<OperationId>,
71    next_completion_seq: CompletionSeq,
72}
73
74impl RegistryCanonicalState {
75    /// Maximum completed operations retained at capture time.
76    pub fn max_completed(&self) -> usize {
77        self.max_completed
78    }
79
80    /// Maximum concurrent non-terminal operations at capture time.
81    pub fn max_concurrent(&self) -> Option<usize> {
82        self.max_concurrent
83    }
84
85    /// Number of operations captured in the snapshot.
86    pub fn operation_count(&self) -> usize {
87        self.operations.len()
88    }
89}
90
91// ---------------------------------------------------------------------------
92// Serializable snapshot for persistence
93// ---------------------------------------------------------------------------
94
95/// Serializable snapshot of the ops lifecycle registry state.
96///
97/// Captured on terminal transitions for durable persistence. Contains
98/// canonical state, operation specs, persisted completion feed entries, and
99/// consumer cursor values. Wire format preserved verbatim from legacy
100/// runtime versions for backward compatibility.
101#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
102pub struct PersistedOpsSnapshot {
103    /// Epoch identity at capture time.
104    pub epoch_id: meerkat_core::RuntimeEpochId,
105    /// Canonical machine-owned state at capture time.
106    pub authority_state: RegistryCanonicalState,
107    /// Per-operation specs for shell record reconstruction.
108    pub operation_specs: HashMap<OperationId, meerkat_core::ops_lifecycle::OperationSpec>,
109    /// Persisted completion feed entries (actual contents, not reconstructed).
110    pub completion_entries: Vec<CompletionEntry>,
111    /// Consumer cursor snapshot at capture time.
112    pub cursors: meerkat_core::EpochCursorSnapshot,
113}
114
115#[derive(Debug)]
116pub struct OpsLifecyclePersistenceRequest {
117    snapshot: PersistedOpsSnapshot,
118    result_tx: std::sync::mpsc::SyncSender<Result<(), OpsLifecycleError>>,
119}
120
121impl OpsLifecyclePersistenceRequest {
122    pub fn snapshot(&self) -> &PersistedOpsSnapshot {
123        &self.snapshot
124    }
125
126    pub fn complete(self, result: Result<(), OpsLifecycleError>) {
127        let _ = self.result_tx.send(result);
128    }
129}
130
131// ---------------------------------------------------------------------------
132// Concrete completion feed buffer
133// ---------------------------------------------------------------------------
134
135/// Shared inner state of the completion feed buffer.
136///
137/// Protected by the registry's `RwLock<ShellState>` for writes, and by its
138/// own `RwLock` for reads by external consumers (agent boundary, idle wake).
139#[derive(Debug)]
140struct FeedBufferInner {
141    entries: VecDeque<CompletionEntry>,
142    watermark: CompletionSeq,
143    max_retained: usize,
144}
145
146/// Shared completion feed buffer owned by the runtime registry.
147///
148/// The registry writes entries under its own write lock. External consumers
149/// read through the [`RuntimeCompletionFeed`] handle.
150#[derive(Debug)]
151struct FeedBuffer {
152    inner: RwLock<FeedBufferInner>,
153    /// Atomic mirror of watermark for lock-free `watermark()` reads.
154    watermark_atomic: AtomicU64,
155    /// Notifies all waiters when new entries are appended.
156    notify: tokio::sync::Notify,
157}
158
159impl FeedBuffer {
160    fn new(max_retained: usize) -> Self {
161        Self {
162            inner: RwLock::new(FeedBufferInner {
163                entries: VecDeque::new(),
164                watermark: 0,
165                max_retained,
166            }),
167            watermark_atomic: AtomicU64::new(0),
168            notify: tokio::sync::Notify::new(),
169        }
170    }
171
172    fn push(&self, entry: CompletionEntry) {
173        let mut inner = self
174            .inner
175            .write()
176            .unwrap_or_else(std::sync::PoisonError::into_inner);
177        let seq = entry.seq;
178        inner.entries.push_back(entry);
179        inner.watermark = seq;
180
181        // Evict oldest if over capacity.
182        while inner.entries.len() > inner.max_retained {
183            inner.entries.pop_front();
184        }
185
186        drop(inner);
187
188        self.watermark_atomic.store(seq, Ordering::Release);
189        self.notify.notify_waiters();
190    }
191}
192
193/// Read-only handle to the runtime completion feed.
194///
195/// Implements [`CompletionFeed`] for external consumers. Obtained via
196/// [`RuntimeOpsLifecycleRegistry::completion_feed()`].
197#[derive(Debug, Clone)]
198pub struct RuntimeCompletionFeed {
199    buffer: Arc<FeedBuffer>,
200}
201
202impl CompletionFeed for RuntimeCompletionFeed {
203    fn watermark(&self) -> CompletionSeq {
204        self.buffer.watermark_atomic.load(Ordering::Acquire)
205    }
206
207    fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch {
208        let inner = self
209            .buffer
210            .inner
211            .read()
212            .unwrap_or_else(std::sync::PoisonError::into_inner);
213        let entries: Vec<CompletionEntry> = inner
214            .entries
215            .iter()
216            .filter(|e| e.seq > after_seq)
217            .cloned()
218            .collect();
219        let watermark = inner.watermark;
220        CompletionBatch { entries, watermark }
221    }
222
223    fn wait_for_advance(
224        &self,
225        after_seq: CompletionSeq,
226    ) -> std::pin::Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>> {
227        Box::pin(async move {
228            loop {
229                // Register the waiter BEFORE reading the watermark.
230                // notify_waiters() in push() only wakes already-registered
231                // listeners — if we read first and push() lands between the
232                // read and notified().await, the wake is lost.
233                let notified = self.buffer.notify.notified();
234                let current = self.buffer.watermark_atomic.load(Ordering::Acquire);
235                if current > after_seq {
236                    return current;
237                }
238                notified.await;
239            }
240        })
241    }
242}
243
244// ---------------------------------------------------------------------------
245// Shell-only per-operation record (not part of canonical machine state)
246// ---------------------------------------------------------------------------
247
248/// Shell-owned data for a single operation. Canonical lifecycle state lives in
249/// the DSL authority; this struct holds I/O concerns that the DSL has no
250/// knowledge of.
251#[derive(Debug)]
252struct ShellRecord {
253    spec: OperationSpec,
254    peer_handle: Option<OperationPeerHandle>,
255    watchers: Vec<tokio::sync::oneshot::Sender<OperationTerminalOutcome>>,
256    // Monotonic timestamps for elapsed computation
257    created_at: Instant,
258    started_at: Option<Instant>,
259    completed_at: Option<Instant>,
260    // Wall-clock anchor captured at creation for epoch millis
261    created_at_wall: SystemTime,
262}
263
264#[derive(Debug)]
265struct PendingWaitState {
266    wait_request_id: WaitRequestId,
267    sender: tokio::sync::oneshot::Sender<WaitAllSatisfied>,
268}
269
270enum WaitAllAuthorityPlan {
271    AlreadySatisfied(WaitAllSatisfied),
272    ActivateBarrier,
273}
274
275impl ShellRecord {
276    fn new(spec: OperationSpec) -> Self {
277        Self {
278            spec,
279            peer_handle: None,
280            watchers: Vec::new(),
281            created_at: Instant::now(),
282            started_at: None,
283            completed_at: None,
284            created_at_wall: SystemTime::now(),
285        }
286    }
287
288    fn epoch_millis(wall_anchor: &SystemTime) -> u64 {
289        wall_anchor
290            .duration_since(UNIX_EPOCH)
291            .map(|d| d.as_millis() as u64)
292            .unwrap_or(0)
293    }
294
295    fn epoch_millis_for_instant(&self, instant: Instant) -> u64 {
296        // Compute wall time for a given instant using the wall-clock anchor:
297        // wall_time = created_at_wall + (instant - created_at)
298        let offset = instant.saturating_duration_since(self.created_at);
299        let wall = self.created_at_wall + offset;
300        Self::epoch_millis(&wall)
301    }
302
303    /// Notify all watchers with the given terminal outcome and drain the list.
304    fn notify_watchers(&mut self, outcome: &OperationTerminalOutcome) {
305        for watcher in std::mem::take(&mut self.watchers) {
306            let _ = watcher.send(outcome.clone());
307        }
308    }
309
310    /// Mark the completion timestamp.
311    fn mark_completed(&mut self) {
312        self.completed_at = Some(Instant::now());
313    }
314}
315
316// ---------------------------------------------------------------------------
317// Combined shell state: DSL authority + shell records
318// ---------------------------------------------------------------------------
319
320#[derive(Debug)]
321struct ShellState {
322    /// DSL authority — sole source of truth for per-op canonical state.
323    dsl: DslAuthority,
324    /// Shell-owned per-operation records (specs, watchers, timestamps, peer handles).
325    records: HashMap<OperationId, ShellRecord>,
326    /// Pending wait-all coordination (oneshot channel).
327    pending_wait: Option<PendingWaitState>,
328    /// FIFO ordering of completed operation IDs for bounded eviction.
329    completed_order: VecDeque<OperationId>,
330    /// Maximum completed operations to retain.
331    max_completed: usize,
332    /// Maximum concurrent non-terminal operations (None = unlimited).
333    max_concurrent: Option<usize>,
334    /// Oneshot correlation id for the currently-pending `wait_all` future.
335    ///
336    /// Barrier membership (`wait_operation_ids`) and activation (`wait_active`)
337    /// are DSL-owned. This field is pure transport mechanics — the identity
338    /// the oneshot sender is tagged with so `Drop` can correlate cancellation.
339    wait_request_id: Option<WaitRequestId>,
340    /// Monotonic sequence counter for completion feed entries.
341    next_completion_seq: CompletionSeq,
342    /// Shared feed buffer for completion events.
343    feed_buffer: Arc<FeedBuffer>,
344    /// Persistence channel for durable snapshot writes (set via `set_persistence_channel`).
345    persist_tx: Option<crate::tokio::sync::mpsc::UnboundedSender<OpsLifecyclePersistenceRequest>>,
346    /// Epoch ID for persistence snapshots.
347    persist_epoch_id: Option<meerkat_core::RuntimeEpochId>,
348    /// Shared cursor state for persistence snapshots.
349    persist_cursor_state: Option<Arc<meerkat_core::EpochCursorState>>,
350}
351
352/// Wrapper around the DSL authority that provides `Debug` output.
353///
354/// The generated `MeerkatMachineAuthority` does not derive `Debug`, but
355/// `ShellState` requires it. This wrapper delegates to the inner state's
356/// `Debug` impl.
357struct DslAuthority(mm_dsl::MeerkatMachineAuthority);
358
359impl std::fmt::Debug for DslAuthority {
360    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361        f.debug_struct("DslAuthority")
362            .field("state", &self.0.state)
363            .finish()
364    }
365}
366
367/// Create a DSL authority initialized with `lifecycle_phase: Idle` and all
368/// ops-related fields at their defaults. Per-op transitions guard only on
369/// `op_statuses.contains_key(operation_id)`, so the phase stays in `Idle`
370/// permanently (they all `to Idle`).
371fn new_ops_dsl_authority() -> DslAuthority {
372    let state = mm_dsl::MeerkatMachineState {
373        lifecycle_phase: mm_dsl::MeerkatPhase::Idle,
374        ..mm_dsl::MeerkatMachineState::default()
375    };
376    DslAuthority(mm_dsl::MeerkatMachineAuthority::from_state(state))
377}
378
379impl ShellState {
380    fn new(max_completed: usize, max_concurrent: Option<usize>) -> Self {
381        Self {
382            dsl: new_ops_dsl_authority(),
383            records: HashMap::new(),
384            pending_wait: None,
385            completed_order: VecDeque::new(),
386            max_completed,
387            max_concurrent,
388            wait_request_id: None,
389            next_completion_seq: 0,
390            // Feed buffer is larger than max_completed to absorb bursts.
391            // Entries are only evicted by buffer capacity, not by consumer cursor,
392            // so the buffer must be large enough that consumers drain before
393            // the oldest entry is evicted.
394            feed_buffer: Arc::new(FeedBuffer::new(max_completed.saturating_mul(4).max(1024))),
395            persist_tx: None,
396            persist_epoch_id: None,
397            persist_cursor_state: None,
398        }
399    }
400
401    /// Apply a DSL input, mapping transition errors into
402    /// [`OpsLifecycleError::Internal`]. Callers that need to distinguish
403    /// guard rejections (legal-transition violations) from internal desync
404    /// should use [`Self::dsl_apply_raw`] and classify the error themselves;
405    /// this helper is for DSL inputs whose preconditions the caller has
406    /// already fully validated (e.g., `RequestWaitAll`, `SatisfyWaitAll`).
407    fn dsl_apply(
408        &mut self,
409        input: mm_dsl::MeerkatMachineInput,
410        context: &str,
411    ) -> Result<(), OpsLifecycleError> {
412        self.dsl_apply_raw(input).map_err(|err| {
413            OpsLifecycleError::Internal(format!("DSL rejected ops transition ({context}): {err:?}"))
414        })
415    }
416
417    /// Apply a DSL input, returning the raw kernel-level rejection so callers
418    /// can distinguish `GuardRejected` (a legitimate legality violation, e.g.,
419    /// `complete_operation` on a `Provisioning` op) from
420    /// `NoMatchingTransition` (a shell/DSL desync). Every op-lifecycle entry
421    /// point (complete/fail/abort/cancel/start/retire/terminate/progress)
422    /// uses this form and synthesises [`OpsLifecycleError::InvalidTransition`]
423    /// on `GuardRejected`.
424    fn dsl_apply_raw(
425        &mut self,
426        input: mm_dsl::MeerkatMachineInput,
427    ) -> Result<(), mm_dsl::MeerkatMachineTransitionError> {
428        mm_dsl::MeerkatMachineMutator::apply(&mut self.dsl.0, input).map(|_transition| ())
429    }
430
431    /// Split a domain terminal outcome into a `(discriminant, payload)` pair
432    /// suitable for the DSL's typed `op_terminal_outcomes` +
433    /// `op_terminal_payload` fields.
434    ///
435    /// The discriminant is a typed DSL enum mirror; the payload is the JSON
436    /// encoding of the outcome's *inner* payload (e.g., the `OperationResult`
437    /// for `Completed`, the error string for `Failed`, the optional reason
438    /// for `Aborted` / `Cancelled`, the reason string for `Terminated`). For
439    /// `Retired` the payload is empty — it carries no data.
440    ///
441    /// The shell rehydrates the typed domain outcome via
442    /// [`Self::terminal_outcome`], pairing the DSL discriminant with the
443    /// companion payload entry.
444    fn split_outcome(
445        outcome: &OperationTerminalOutcome,
446    ) -> (mm_dsl::OperationTerminalOutcomeKind, String) {
447        match outcome {
448            OperationTerminalOutcome::Completed(result) => (
449                mm_dsl::OperationTerminalOutcomeKind::Completed,
450                serde_json::to_string(result).unwrap_or_default(),
451            ),
452            OperationTerminalOutcome::Failed { error } => (
453                mm_dsl::OperationTerminalOutcomeKind::Failed,
454                serde_json::to_string(error).unwrap_or_default(),
455            ),
456            OperationTerminalOutcome::Aborted { reason } => (
457                mm_dsl::OperationTerminalOutcomeKind::Aborted,
458                serde_json::to_string(reason).unwrap_or_default(),
459            ),
460            OperationTerminalOutcome::Cancelled { reason } => (
461                mm_dsl::OperationTerminalOutcomeKind::Cancelled,
462                serde_json::to_string(reason).unwrap_or_default(),
463            ),
464            OperationTerminalOutcome::Retired => {
465                (mm_dsl::OperationTerminalOutcomeKind::Retired, String::new())
466            }
467            OperationTerminalOutcome::Terminated { reason } => (
468                mm_dsl::OperationTerminalOutcomeKind::Terminated,
469                serde_json::to_string(reason).unwrap_or_default(),
470            ),
471        }
472    }
473
474    /// Read the DSL operation status for `id`, or `None` if not registered.
475    fn status(&self, id: &OperationId) -> Option<OperationStatus> {
476        let id_key = mm_dsl::OperationId::from_domain(id).0;
477        self.dsl
478            .0
479            .state
480            .op_statuses
481            .get(&id_key)
482            .copied()
483            .map(OperationStatus::from)
484    }
485
486    /// Read the DSL operation kind for `id`, or `None` if not registered.
487    fn kind(&self, id: &OperationId) -> Option<OperationKind> {
488        let id_key = mm_dsl::OperationId::from_domain(id).0;
489        self.dsl
490            .0
491            .state
492            .op_kinds
493            .get(&id_key)
494            .copied()
495            .map(OperationKind::from)
496    }
497
498    /// Read the peer-ready flag for `id`.
499    fn peer_ready(&self, id: &OperationId) -> Option<bool> {
500        let id_key = mm_dsl::OperationId::from_domain(id).0;
501        self.dsl.0.state.op_peer_ready.get(&id_key).copied()
502    }
503
504    /// Read the progress counter for `id`.
505    fn progress_count(&self, id: &OperationId) -> Option<u32> {
506        let id_key = mm_dsl::OperationId::from_domain(id).0;
507        self.dsl
508            .0
509            .state
510            .op_progress_counts
511            .get(&id_key)
512            .map(|v| (*v).min(u32::MAX as u64) as u32)
513    }
514
515    /// Read the terminal outcome for `id` by pairing the DSL's typed
516    /// discriminant with the companion payload JSON. Returns `None` when the
517    /// op has no recorded terminal discriminant.
518    fn terminal_outcome(&self, id: &OperationId) -> Option<OperationTerminalOutcome> {
519        let id_key = mm_dsl::OperationId::from_domain(id).0;
520        let kind = self
521            .dsl
522            .0
523            .state
524            .op_terminal_outcomes
525            .get(&id_key)
526            .copied()?;
527        let payload = self
528            .dsl
529            .0
530            .state
531            .op_terminal_payload
532            .get(&id_key)
533            .map(String::as_str)
534            .unwrap_or("");
535        match kind {
536            mm_dsl::OperationTerminalOutcomeKind::Completed => {
537                let result = serde_json::from_str::<OperationResult>(payload).ok()?;
538                Some(OperationTerminalOutcome::Completed(result))
539            }
540            mm_dsl::OperationTerminalOutcomeKind::Failed => {
541                let error = serde_json::from_str::<String>(payload).unwrap_or_default();
542                Some(OperationTerminalOutcome::Failed { error })
543            }
544            mm_dsl::OperationTerminalOutcomeKind::Aborted => {
545                let reason = serde_json::from_str::<Option<String>>(payload)
546                    .ok()
547                    .flatten();
548                Some(OperationTerminalOutcome::Aborted { reason })
549            }
550            mm_dsl::OperationTerminalOutcomeKind::Cancelled => {
551                let reason = serde_json::from_str::<Option<String>>(payload)
552                    .ok()
553                    .flatten();
554                Some(OperationTerminalOutcome::Cancelled { reason })
555            }
556            mm_dsl::OperationTerminalOutcomeKind::Retired => {
557                Some(OperationTerminalOutcome::Retired)
558            }
559            mm_dsl::OperationTerminalOutcomeKind::Terminated => {
560                let reason = serde_json::from_str::<String>(payload).unwrap_or_default();
561                Some(OperationTerminalOutcome::Terminated { reason })
562            }
563        }
564    }
565
566    /// Whether the operation is currently tracked in DSL state.
567    fn contains(&self, id: &OperationId) -> bool {
568        let id_key = mm_dsl::OperationId::from_domain(id).0;
569        self.dsl.0.state.op_statuses.contains_key(&id_key)
570    }
571
572    /// Number of non-terminal operations (derived from DSL state).
573    fn active_count(&self) -> usize {
574        self.dsl.0.state.active_op_count as usize
575    }
576
577    /// Number of operations currently tracked (including terminal).
578    fn operation_count(&self) -> usize {
579        self.dsl.0.state.op_statuses.len()
580    }
581
582    /// Iterate over all tracked operation IDs (DSL keys converted to domain).
583    fn operation_ids(&self) -> Vec<OperationId> {
584        self.dsl
585            .0
586            .state
587            .op_statuses
588            .keys()
589            .filter_map(|k| serde_json::from_str::<OperationId>(k).ok())
590            .collect()
591    }
592
593    /// Assign and return the next completion sequence number.
594    fn next_seq(&mut self) -> CompletionSeq {
595        self.next_completion_seq = self.next_completion_seq.saturating_add(1);
596        self.next_completion_seq
597    }
598
599    /// Build a snapshot from DSL state + shell record.
600    fn snapshot(&self, id: &OperationId) -> Option<OperationLifecycleSnapshot> {
601        let shell = self.records.get(id)?;
602        let kind = self.kind(id)?;
603        let status = self.status(id)?;
604        let peer_ready = self.peer_ready(id).unwrap_or(false);
605        let progress_count = self.progress_count(id).unwrap_or(0);
606        let terminal_outcome = self.terminal_outcome(id);
607
608        let created_at_ms = ShellRecord::epoch_millis(&shell.created_at_wall);
609        let started_at_ms = shell.started_at.map(|i| shell.epoch_millis_for_instant(i));
610        let completed_at_ms = shell
611            .completed_at
612            .map(|i| shell.epoch_millis_for_instant(i));
613        let elapsed_ms = shell.completed_at.map(|completed| {
614            completed
615                .saturating_duration_since(shell.created_at)
616                .as_millis() as u64
617        });
618
619        Some(OperationLifecycleSnapshot {
620            id: shell.spec.id.clone(),
621            kind,
622            display_name: shell.spec.display_name.clone(),
623            status,
624            peer_ready,
625            progress_count,
626            watcher_count: shell.watchers.len() as u32,
627            terminal_outcome,
628            child_session_id: shell.spec.child_session_id.clone(),
629            peer_handle: shell.peer_handle.clone(),
630            created_at_ms,
631            started_at_ms,
632            completed_at_ms,
633            elapsed_ms,
634        })
635    }
636
637    /// Emit shell-side mechanics for a terminal transition: notify watchers,
638    /// push CompletionEntry, retain in FIFO, evict as needed. Called AFTER the
639    /// DSL transition has already persisted the terminal status + outcome.
640    fn finalize_terminal(&mut self, id: &OperationId) {
641        let outcome = match self.terminal_outcome(id) {
642            Some(o) => o,
643            None => return,
644        };
645        let kind = self.kind(id);
646
647        // Notify watchers and mark completion timestamp.
648        if let Some(shell) = self.records.get_mut(id) {
649            shell.notify_watchers(&outcome);
650            shell.mark_completed();
651        }
652
653        // Push completion feed entry.
654        let seq = self.next_seq();
655        let display_name = self
656            .records
657            .get(id)
658            .map(|r| r.spec.display_name.clone())
659            .unwrap_or_default();
660        let completed_at_ms = self
661            .records
662            .get(id)
663            .and_then(|r| r.completed_at.map(|i| r.epoch_millis_for_instant(i)));
664        let kind_for_entry = kind.unwrap_or(OperationKind::BackgroundToolOp);
665        self.feed_buffer.push(CompletionEntry {
666            seq,
667            operation_id: id.clone(),
668            kind: kind_for_entry,
669            display_name,
670            terminal_outcome: outcome,
671            completed_at_ms,
672        });
673
674        // FIFO retention + eviction.
675        self.completed_order.push_back(id.clone());
676        while self.completed_order.len() > self.max_completed {
677            if let Some(evicted) = self.completed_order.pop_front() {
678                let evicted_key = mm_dsl::OperationId::from_domain(&evicted).0;
679                self.dsl.0.state.op_statuses.remove(&evicted_key);
680                self.dsl.0.state.op_kinds.remove(&evicted_key);
681                self.dsl.0.state.op_peer_ready.remove(&evicted_key);
682                self.dsl.0.state.op_progress_counts.remove(&evicted_key);
683                self.dsl.0.state.op_terminal_outcomes.remove(&evicted_key);
684                self.dsl.0.state.op_terminal_payload.remove(&evicted_key);
685                self.dsl.0.state.op_completion_seq.remove(&evicted_key);
686                self.records.remove(&evicted);
687            }
688        }
689
690        // Satisfy a pending wait request if all its ops are now terminal.
691        self.maybe_satisfy_wait();
692    }
693
694    /// Read barrier membership from DSL state (sole owner).
695    fn wait_operation_ids(&self) -> Vec<OperationId> {
696        self.dsl
697            .0
698            .state
699            .wait_operation_ids
700            .iter()
701            .filter_map(|k| serde_json::from_str::<OperationId>(k).ok())
702            .collect()
703    }
704
705    /// Whether the DSL has a barrier wait active.
706    fn wait_active(&self) -> bool {
707        self.dsl.0.state.wait_active
708    }
709
710    fn begin_wait_all_authority(
711        &mut self,
712        wait_request_id: &WaitRequestId,
713        operation_ids: &[OperationId],
714    ) -> Result<WaitAllAuthorityPlan, OpsLifecycleError> {
715        let mut seen = HashSet::new();
716        for operation_id in operation_ids {
717            if !seen.insert(operation_id.clone()) {
718                return Err(OpsLifecycleError::DuplicateWaitOperation(
719                    operation_id.clone(),
720                ));
721            }
722        }
723
724        if self.wait_active() {
725            return Err(OpsLifecycleError::WaitAlreadyActive);
726        }
727
728        for operation_id in operation_ids {
729            if !self.contains(operation_id) {
730                return Err(OpsLifecycleError::NotFound(operation_id.clone()));
731            }
732        }
733
734        let all_terminal = operation_ids.iter().all(|operation_id| {
735            self.status(operation_id)
736                .is_some_and(OperationStatus::is_terminal)
737        });
738        if all_terminal {
739            return Ok(WaitAllAuthorityPlan::AlreadySatisfied(WaitAllSatisfied {
740                wait_request_id: wait_request_id.clone(),
741                operation_ids: operation_ids.to_vec(),
742            }));
743        }
744
745        let dsl_ids: std::collections::BTreeSet<String> = operation_ids
746            .iter()
747            .map(|id| mm_dsl::OperationId::from_domain(id).0)
748            .collect();
749        let dsl_id_tokens: std::collections::BTreeSet<mm_dsl::OperationId> = operation_ids
750            .iter()
751            .map(mm_dsl::OperationId::from_domain)
752            .collect();
753        self.dsl_apply(
754            mm_dsl::MeerkatMachineInput::RequestWaitAll {
755                wait_request_id: mm_dsl::WaitRequestId::from_domain(wait_request_id),
756                operation_ids: dsl_ids,
757                operation_id_tokens: dsl_id_tokens,
758            },
759            "RequestWaitAll",
760        )?;
761        Ok(WaitAllAuthorityPlan::ActivateBarrier)
762    }
763
764    fn owner_termination_targets(&self) -> Vec<(OperationId, OperationStatus)> {
765        self.operation_ids()
766            .into_iter()
767            .filter_map(|id| self.status(&id).map(|status| (id, status)))
768            .filter(|(_, status)| !status.is_terminal())
769            .collect()
770    }
771
772    /// Check whether a pending barrier wait is now satisfied and resolve it.
773    ///
774    /// Barrier membership and the "all members terminal" decision both live
775    /// in the DSL: `wait_operation_ids` carries the set, `wait_active`
776    /// signals a pending barrier, and `SatisfyWaitAll`'s
777    /// `all_members_terminal` guard owns the fixed-point test. The shell
778    /// echoes the DSL-owned request id and typed operation tokens into
779    /// `SatisfyWaitAll` so the transition can clear the barrier before
780    /// rendering the handoff effect. It still fires idempotently on every
781    /// terminal transition and lets the DSL guard reject early firings as a
782    /// no-op. On acceptance (transition returns `Ok`), the shell selects the
783    /// correlated oneshot and delivers the `WaitAllSatisfied` obligation
784    /// token.
785    ///
786    /// `wait_request_id` is the shell-owned oneshot correlation id that
787    /// selects which sender to notify; when the DSL barrier satisfies
788    /// without a live correlation (post-recovery, or duplicate resolution),
789    /// the oneshot simply remains pending.
790    fn maybe_satisfy_wait(&mut self) {
791        // Capture membership *before* applying — `SatisfyWaitAll` clears the
792        // DSL barrier, so a post-apply read would lose the member list carried
793        // on the obligation token.
794        let ids = self.wait_operation_ids();
795        let Some(dsl_wait_request_id) = self.dsl.0.state.wait_request_id.clone() else {
796            return;
797        };
798        let dsl_operation_id_tokens = self.dsl.0.state.wait_operation_id_tokens.clone();
799        if self
800            .dsl_apply_raw(mm_dsl::MeerkatMachineInput::SatisfyWaitAll {
801                wait_request_id: dsl_wait_request_id,
802                operation_id_tokens: dsl_operation_id_tokens,
803            })
804            .is_err()
805        {
806            // Guard rejection (barrier inactive, or members not all terminal
807            // yet) is an expected idempotent no-op on the satisfaction fixed
808            // point, not a shell/DSL desync. Swallow.
809            return;
810        }
811        let wait_id = match self.wait_request_id.take() {
812            Some(id) => id,
813            None => return,
814        };
815        if let Some(pending) = self.pending_wait.take() {
816            if pending.wait_request_id == wait_id {
817                let _ = pending.sender.send(WaitAllSatisfied {
818                    wait_request_id: wait_id,
819                    operation_ids: ids,
820                });
821            } else {
822                self.pending_wait = Some(pending);
823            }
824        }
825    }
826
827    /// Persist a terminal snapshot if a persistence channel is wired.
828    ///
829    /// Called after terminal transitions. Captures authority + entries + cursors under the write
830    /// lock (caller already holds it), submits a persistence request, and waits for the worker's
831    /// durable-store result. Returning success only means the snapshot write itself succeeded.
832    fn maybe_persist(&self) -> Result<(), OpsLifecycleError> {
833        let (tx, epoch_id, cursor_state) = match (
834            &self.persist_tx,
835            &self.persist_epoch_id,
836            &self.persist_cursor_state,
837        ) {
838            (Some(tx), Some(epoch_id), Some(cs)) => (tx, epoch_id, cs),
839            _ => return Ok(()),
840        };
841
842        let snapshot = self.capture_snapshot(epoch_id.clone(), cursor_state);
843        let (result_tx, result_rx) = std::sync::mpsc::sync_channel(1);
844        let request = OpsLifecyclePersistenceRequest {
845            snapshot,
846            result_tx,
847        };
848
849        tx.send(request).map_err(|_| {
850            OpsLifecycleError::Internal(
851                "ops lifecycle persistence channel closed before terminal snapshot could be queued"
852                    .into(),
853            )
854        })?;
855        result_rx.recv().map_err(|_| {
856            OpsLifecycleError::Internal(
857                "ops lifecycle persistence worker dropped terminal snapshot before confirming durability"
858                    .into(),
859            )
860        })?
861    }
862
863    /// Capture the full persisted snapshot for the current state.
864    fn capture_snapshot(
865        &self,
866        epoch_id: meerkat_core::RuntimeEpochId,
867        cursor_state: &meerkat_core::EpochCursorState,
868    ) -> PersistedOpsSnapshot {
869        let operation_specs: HashMap<OperationId, OperationSpec> = self
870            .records
871            .iter()
872            .map(|(id, record)| (id.clone(), record.spec.clone()))
873            .collect();
874
875        let completion_entries = {
876            let inner = self
877                .feed_buffer
878                .inner
879                .read()
880                .unwrap_or_else(std::sync::PoisonError::into_inner);
881            inner.entries.iter().cloned().collect()
882        };
883
884        let mut operations: HashMap<OperationId, OperationCanonicalState> = HashMap::new();
885        for op_id in self.operation_ids() {
886            let Some(status) = self.status(&op_id) else {
887                continue;
888            };
889            let Some(kind) = self.kind(&op_id) else {
890                continue;
891            };
892            let peer_ready = self.peer_ready(&op_id).unwrap_or(false);
893            let progress_count = self.progress_count(&op_id).unwrap_or(0);
894            let terminal_outcome = self.terminal_outcome(&op_id);
895            let terminal_buffered = terminal_outcome.is_some();
896            let watcher_count = self
897                .records
898                .get(&op_id)
899                .map(|r| r.watchers.len() as u32)
900                .unwrap_or(0);
901            operations.insert(
902                op_id,
903                OperationCanonicalState {
904                    status,
905                    kind,
906                    peer_ready,
907                    progress_count,
908                    watcher_count,
909                    terminal_outcome,
910                    terminal_buffered,
911                },
912            );
913        }
914
915        let authority_state = RegistryCanonicalState {
916            operations,
917            completed_order: self.completed_order.clone(),
918            max_completed: self.max_completed,
919            max_concurrent: self.max_concurrent,
920            active_count: self.active_count(),
921            wait_request_id: self.wait_request_id.clone(),
922            wait_operation_ids: self.wait_operation_ids(),
923            next_completion_seq: self.next_completion_seq,
924        };
925
926        PersistedOpsSnapshot {
927            epoch_id,
928            authority_state,
929            operation_specs,
930            completion_entries,
931            cursors: cursor_state.snapshot(),
932        }
933    }
934
935    fn shell_record_mut(
936        &mut self,
937        id: &OperationId,
938    ) -> Result<&mut ShellRecord, OpsLifecycleError> {
939        self.records
940            .get_mut(id)
941            .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))
942    }
943
944    fn collect_wait_outcomes(
945        &self,
946        operation_ids: &[OperationId],
947    ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
948        operation_ids
949            .iter()
950            .map(|operation_id| {
951                let outcome = self.terminal_outcome(operation_id).ok_or_else(|| {
952                    OpsLifecycleError::Internal(format!(
953                        "wait_all completed without terminal outcome for {operation_id}"
954                    ))
955                })?;
956                Ok((operation_id.clone(), outcome))
957            })
958            .collect()
959    }
960}
961
962impl Default for ShellState {
963    fn default() -> Self {
964        Self::new(DEFAULT_MAX_COMPLETED, None)
965    }
966}
967
968// ---------------------------------------------------------------------------
969// Public configuration & registry
970// ---------------------------------------------------------------------------
971
972/// Configuration for [`RuntimeOpsLifecycleRegistry`].
973#[derive(Debug, Clone)]
974pub struct OpsLifecycleConfig {
975    /// Maximum number of completed operations to retain (default: 256).
976    pub max_completed: usize,
977    /// Maximum concurrent non-terminal operations (None = unlimited).
978    pub max_concurrent: Option<usize>,
979}
980
981impl Default for OpsLifecycleConfig {
982    fn default() -> Self {
983        Self {
984            max_completed: DEFAULT_MAX_COMPLETED,
985            max_concurrent: None,
986        }
987    }
988}
989
990/// Per-runtime shared registry for async operation lifecycle truth.
991///
992/// Per-operation canonical lifecycle state is owned by the DSL authority
993/// embedded in the shell. This struct manages I/O concerns: watcher
994/// channels, timestamps, peer handles, snapshot assembly, FIFO eviction,
995/// and the completion feed buffer.
996#[derive(Debug)]
997pub struct RuntimeOpsLifecycleRegistry {
998    state: RwLock<ShellState>,
999}
1000
1001#[derive(Debug, Clone)]
1002pub(crate) struct RuntimeOpsDiagnosticSnapshot {
1003    pub operation_count: usize,
1004    pub active_count: usize,
1005    pub wait_request_id: Option<WaitRequestId>,
1006    pub pending_wait_present: bool,
1007    pub pending_wait_request_id: Option<WaitRequestId>,
1008    pub wait_operation_ids: Vec<OperationId>,
1009    pub operations: Vec<OperationLifecycleSnapshot>,
1010}
1011
1012impl Default for RuntimeOpsLifecycleRegistry {
1013    fn default() -> Self {
1014        Self {
1015            state: RwLock::new(ShellState::default()),
1016        }
1017    }
1018}
1019
1020impl RuntimeOpsLifecycleRegistry {
1021    pub fn new() -> Self {
1022        Self::default()
1023    }
1024
1025    pub fn with_config(config: OpsLifecycleConfig) -> Self {
1026        Self {
1027            state: RwLock::new(ShellState::new(config.max_completed, config.max_concurrent)),
1028        }
1029    }
1030
1031    /// Wire a persistence channel for durable snapshot writes.
1032    ///
1033    /// After this call, terminal transitions (complete/fail/cancel/abort)
1034    /// capture a snapshot and queue it to the channel. A dedicated
1035    /// persistence task should drain the channel and write to the store.
1036    pub fn set_persistence_channel(
1037        &self,
1038        tx: crate::tokio::sync::mpsc::UnboundedSender<OpsLifecyclePersistenceRequest>,
1039        epoch_id: meerkat_core::RuntimeEpochId,
1040        cursor_state: Arc<meerkat_core::EpochCursorState>,
1041    ) {
1042        if let Ok(mut state) = self.state.write() {
1043            state.persist_tx = Some(tx);
1044            state.persist_epoch_id = Some(epoch_id);
1045            state.persist_cursor_state = Some(cursor_state);
1046        }
1047    }
1048
1049    /// Recover from a persisted snapshot.
1050    ///
1051    /// Rebuilds DSL state (stripping non-terminal ops — only terminals
1052    /// survive recovery), creates fresh shell records from specs, and seeds
1053    /// the feed buffer with persisted completion entries.
1054    pub fn from_recovered(snapshot: PersistedOpsSnapshot) -> Self {
1055        let mut shell = ShellState::new(
1056            snapshot.authority_state.max_completed,
1057            snapshot.authority_state.max_concurrent,
1058        );
1059
1060        // Only retain terminal operations in the DSL state.
1061        let mut retained_ids: HashSet<OperationId> = HashSet::new();
1062        for (op_id, op_state) in snapshot.authority_state.operations {
1063            if !op_state.status.is_terminal() {
1064                continue;
1065            }
1066            let id_key = mm_dsl::OperationId::from_domain(&op_id).0;
1067            shell.dsl.0.state.op_statuses.insert(
1068                id_key.clone(),
1069                mm_dsl::OperationStatus::from(op_state.status),
1070            );
1071            shell
1072                .dsl
1073                .0
1074                .state
1075                .op_kinds
1076                .insert(id_key.clone(), mm_dsl::OperationKind::from(op_state.kind));
1077            shell
1078                .dsl
1079                .0
1080                .state
1081                .op_peer_ready
1082                .insert(id_key.clone(), op_state.peer_ready);
1083            shell
1084                .dsl
1085                .0
1086                .state
1087                .op_progress_counts
1088                .insert(id_key.clone(), op_state.progress_count as u64);
1089            if let Some(outcome) = op_state.terminal_outcome.as_ref() {
1090                let (kind, payload) = ShellState::split_outcome(outcome);
1091                shell
1092                    .dsl
1093                    .0
1094                    .state
1095                    .op_terminal_outcomes
1096                    .insert(id_key.clone(), kind);
1097                shell
1098                    .dsl
1099                    .0
1100                    .state
1101                    .op_terminal_payload
1102                    .insert(id_key.clone(), payload);
1103            }
1104            retained_ids.insert(op_id);
1105        }
1106        // active_count is 0 — all retained ops are terminal.
1107        shell.dsl.0.state.active_op_count = 0;
1108
1109        // Rebuild completed_order keeping only retained ops.
1110        shell.completed_order = snapshot
1111            .authority_state
1112            .completed_order
1113            .into_iter()
1114            .filter(|id| retained_ids.contains(id))
1115            .collect();
1116
1117        // Re-seed completion sequence counter so new terminals keep
1118        // monotonic order relative to persisted entries.
1119        shell.next_completion_seq = snapshot.authority_state.next_completion_seq;
1120
1121        // Seed the feed buffer from persisted entries.
1122        for entry in &snapshot.completion_entries {
1123            shell.feed_buffer.push(entry.clone());
1124        }
1125
1126        // Rebuild shell records from specs (fresh timestamps, no watchers)
1127        // — only for operations still retained in the DSL state.
1128        for (op_id, spec) in snapshot.operation_specs {
1129            if retained_ids.contains(&op_id) {
1130                shell.records.insert(
1131                    op_id,
1132                    ShellRecord {
1133                        spec,
1134                        peer_handle: None,
1135                        watchers: Vec::new(),
1136                        created_at: Instant::now(),
1137                        started_at: None,
1138                        completed_at: None,
1139                        created_at_wall: SystemTime::now(),
1140                    },
1141                );
1142            }
1143        }
1144
1145        Self {
1146            state: RwLock::new(shell),
1147        }
1148    }
1149
1150    /// Capture a serializable snapshot of the current state for persistence.
1151    ///
1152    /// Includes authority state, operation specs, completion entries, and
1153    /// cursor values. Cursor values may be stale relative to the agent's
1154    /// true position (monotonic staleness, not atomicity).
1155    pub fn capture_persistence_snapshot(
1156        &self,
1157        epoch_id: meerkat_core::RuntimeEpochId,
1158        cursor_state: &meerkat_core::EpochCursorState,
1159    ) -> PersistedOpsSnapshot {
1160        let state = self
1161            .state
1162            .read()
1163            .unwrap_or_else(std::sync::PoisonError::into_inner);
1164        state.capture_snapshot(epoch_id, cursor_state)
1165    }
1166
1167    /// Return a read handle to the completion feed.
1168    pub fn completion_feed_handle(&self) -> Arc<dyn CompletionFeed> {
1169        let state = self
1170            .state
1171            .read()
1172            .unwrap_or_else(std::sync::PoisonError::into_inner);
1173        Arc::new(RuntimeCompletionFeed {
1174            buffer: Arc::clone(&state.feed_buffer),
1175        })
1176    }
1177
1178    /// Capture a stable diagnostic snapshot of the canonical ops lifecycle state.
1179    pub(crate) fn diagnostic_snapshot(&self) -> RuntimeOpsDiagnosticSnapshot {
1180        let state = self
1181            .state
1182            .read()
1183            .unwrap_or_else(std::sync::PoisonError::into_inner);
1184        let mut operations = state
1185            .operation_ids()
1186            .into_iter()
1187            .filter_map(|id| state.snapshot(&id))
1188            .collect::<Vec<_>>();
1189        operations.sort_by(|left, right| left.display_name.cmp(&right.display_name));
1190        RuntimeOpsDiagnosticSnapshot {
1191            operation_count: state.operation_count(),
1192            active_count: state.active_count(),
1193            wait_request_id: state.wait_request_id.clone(),
1194            pending_wait_present: state.pending_wait.is_some(),
1195            pending_wait_request_id: state
1196                .pending_wait
1197                .as_ref()
1198                .map(|pending_wait| pending_wait.wait_request_id.clone()),
1199            wait_operation_ids: state.wait_operation_ids(),
1200            operations,
1201        }
1202    }
1203
1204    fn read_state(&self) -> Result<RwLockReadGuard<'_, ShellState>, OpsLifecycleError> {
1205        self.state
1206            .read()
1207            .map_err(|_| OpsLifecycleError::Internal("ops lifecycle registry poisoned".into()))
1208    }
1209
1210    fn write_state(&self) -> Result<RwLockWriteGuard<'_, ShellState>, OpsLifecycleError> {
1211        self.state
1212            .write()
1213            .map_err(|_| OpsLifecycleError::Internal("ops lifecycle registry poisoned".into()))
1214    }
1215
1216    fn cancel_wait_all_internal(
1217        &self,
1218        wait_request_id: &WaitRequestId,
1219    ) -> Result<(), OpsLifecycleError> {
1220        let mut state = self.write_state()?;
1221        match state.wait_request_id.as_ref() {
1222            Some(active) if active == wait_request_id => {
1223                state.wait_request_id = None;
1224                state.pending_wait = None;
1225                // Clear the DSL barrier via the dedicated `CancelWaitAll`
1226                // transition. Unlike `SatisfyWaitAll`, it does not require
1227                // every member to be terminal (the request was dropped, not
1228                // resolved) and does not emit the `WaitAllSatisfied`
1229                // obligation. The `wait_is_active` guard keeps it an
1230                // idempotent no-op if the barrier was already cleared.
1231                let _ = state.dsl_apply(
1232                    mm_dsl::MeerkatMachineInput::CancelWaitAll,
1233                    "CancelWaitAll(cancel)",
1234                );
1235                Ok(())
1236            }
1237            _ => {
1238                state.pending_wait = None;
1239                Ok(())
1240            }
1241        }
1242    }
1243}
1244
1245enum WaitAllFutureState {
1246    Ready(Option<Result<WaitAllResult, OpsLifecycleError>>),
1247    Waiting(tokio::sync::oneshot::Receiver<WaitAllSatisfied>),
1248    Done,
1249}
1250
1251struct WaitAllFuture<'a> {
1252    registry: &'a RuntimeOpsLifecycleRegistry,
1253    wait_request_id: WaitRequestId,
1254    operation_ids: Vec<OperationId>,
1255    state: WaitAllFutureState,
1256}
1257
1258impl Future for WaitAllFuture<'_> {
1259    type Output = Result<WaitAllResult, OpsLifecycleError>;
1260
1261    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1262        match &mut self.state {
1263            WaitAllFutureState::Ready(result) => {
1264                let ready = result.take().unwrap_or_else(|| {
1265                    Err(OpsLifecycleError::Internal(
1266                        "wait_all future polled after completion".into(),
1267                    ))
1268                });
1269                self.state = WaitAllFutureState::Done;
1270                Poll::Ready(ready)
1271            }
1272            WaitAllFutureState::Waiting(receiver) => match std::pin::Pin::new(receiver).poll(cx) {
1273                Poll::Pending => Poll::Pending,
1274                Poll::Ready(Ok(satisfied)) => {
1275                    let outcomes = match self.registry.read_state() {
1276                        Ok(state) => state.collect_wait_outcomes(&self.operation_ids),
1277                        Err(err) => Err(err),
1278                    };
1279                    self.state = WaitAllFutureState::Done;
1280                    Poll::Ready(outcomes.map(|outcomes| WaitAllResult {
1281                        outcomes,
1282                        satisfied,
1283                    }))
1284                }
1285                Poll::Ready(Err(_)) => {
1286                    self.state = WaitAllFutureState::Done;
1287                    Poll::Ready(Err(OpsLifecycleError::Internal(
1288                        "wait_all completion channel dropped".into(),
1289                    )))
1290                }
1291            },
1292            WaitAllFutureState::Done => Poll::Ready(Err(OpsLifecycleError::Internal(
1293                "wait_all future polled after completion".into(),
1294            ))),
1295        }
1296    }
1297}
1298
1299impl Drop for WaitAllFuture<'_> {
1300    fn drop(&mut self) {
1301        if matches!(self.state, WaitAllFutureState::Waiting(_)) {
1302            let _ = self
1303                .registry
1304                .cancel_wait_all_internal(&self.wait_request_id);
1305        }
1306    }
1307}
1308
1309// ---------------------------------------------------------------------------
1310// Shell → DSL error classification
1311// ---------------------------------------------------------------------------
1312//
1313// Transition legality (which "from" statuses each op-lifecycle transition is
1314// legal from) is owned by the MeerkatMachine DSL's `from_status_valid` guards.
1315// The shell's only job when a DSL transition is rejected is to surface the
1316// pre-read status back to the caller as [`OpsLifecycleError::InvalidTransition`].
1317//
1318// Each op-lifecycle entry point follows the same shape:
1319//   1. Pre-read `status(id)` under the write lock. `None` → `NotFound`.
1320//   2. Fire the DSL input via `dsl_apply_raw`.
1321//   3. On `GuardRejected`, synthesise `InvalidTransition { status, action }`.
1322//   4. On `NoMatchingTransition`, surface as `Internal` (genuine desync).
1323//
1324// The `action` label is a short, human-readable name of the shell entry point
1325// — matches the names formerly passed to the deleted `require_status`.
1326
1327/// Classify a kernel rejection from an op-lifecycle DSL apply.
1328///
1329/// `GuardRejected` → `InvalidTransition { id, status, action }`. The pre-read
1330/// `status` and the DSL's guard observation come from the same canonical map
1331/// under a single write lock, so they cannot diverge.
1332/// `NoMatchingTransition` → `Internal` (genuine shell/DSL desync).
1333fn classify_op_rejection(
1334    err: mm_dsl::MeerkatMachineTransitionError,
1335    id: &OperationId,
1336    status: OperationStatus,
1337    action: &'static str,
1338) -> OpsLifecycleError {
1339    match err {
1340        mm_dsl::MeerkatMachineTransitionError::GuardRejected { .. } => {
1341            OpsLifecycleError::InvalidTransition {
1342                id: id.clone(),
1343                status,
1344                action,
1345            }
1346        }
1347        other => OpsLifecycleError::Internal(format!(
1348            "DSL rejected ops transition ({action}): {other:?}"
1349        )),
1350    }
1351}
1352
1353/// Classify a `PeerReadyOp` DSL rejection back into the public error surface.
1354///
1355/// `PeerReadyOp`'s DSL guards layer three distinct rejections onto a single
1356/// `GuardRejected` variant. The shell distinguishes them by re-reading the
1357/// canonical DSL state under the same write lock that observed the guard
1358/// failure — so the classification cannot race with a concurrent mutation.
1359///
1360/// Priority mirrors the declared guard order in the DSL transition:
1361/// 1. `kind_is_mob_member_child` → `PeerNotExpected`
1362/// 2. `not_already_peer_ready`   → `AlreadyPeerReady`
1363/// 3. `from_status_valid`        → `InvalidTransition`
1364fn classify_peer_ready_rejection(
1365    state: &ShellState,
1366    err: mm_dsl::MeerkatMachineTransitionError,
1367    id: &OperationId,
1368    status: OperationStatus,
1369) -> OpsLifecycleError {
1370    match err {
1371        mm_dsl::MeerkatMachineTransitionError::GuardRejected { .. } => {
1372            let kind = state.kind(id);
1373            if kind != Some(OperationKind::MobMemberChild) {
1374                return OpsLifecycleError::PeerNotExpected(id.clone());
1375            }
1376            if state.peer_ready(id).unwrap_or(false) {
1377                return OpsLifecycleError::AlreadyPeerReady(id.clone());
1378            }
1379            OpsLifecycleError::InvalidTransition {
1380                id: id.clone(),
1381                status,
1382                action: "peer_ready",
1383            }
1384        }
1385        other => OpsLifecycleError::Internal(format!(
1386            "DSL rejected ops transition (peer_ready): {other:?}"
1387        )),
1388    }
1389}
1390
1391impl OpsLifecycleRegistry for RuntimeOpsLifecycleRegistry {
1392    fn register_operation(&self, spec: OperationSpec) -> Result<(), OpsLifecycleError> {
1393        let mut state = self.write_state()?;
1394        let operation_id = spec.id.clone();
1395        let kind = spec.kind;
1396
1397        // Pre-check: duplicate.
1398        if state.contains(&operation_id) {
1399            return Err(OpsLifecycleError::AlreadyRegistered(operation_id));
1400        }
1401        // Pre-check: concurrency limit.
1402        if let Some(limit) = state.max_concurrent
1403            && state.active_count() >= limit
1404        {
1405            return Err(OpsLifecycleError::MaxConcurrentExceeded {
1406                limit,
1407                active: state.active_count(),
1408            });
1409        }
1410
1411        // DSL apply.
1412        state.dsl_apply(
1413            mm_dsl::MeerkatMachineInput::RegisterOp {
1414                operation_id: mm_dsl::OperationId::from_domain(&operation_id).0,
1415                kind: mm_dsl::OperationKind::from_domain(&kind),
1416            },
1417            "RegisterOp",
1418        )?;
1419
1420        // Insert shell record.
1421        state.records.insert(operation_id, ShellRecord::new(spec));
1422        Ok(())
1423    }
1424
1425    fn provisioning_succeeded(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
1426        let mut state = self.write_state()?;
1427
1428        let status = state
1429            .status(id)
1430            .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1431
1432        if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::StartOp {
1433            operation_id: mm_dsl::OperationId::from_domain(id).0,
1434        }) {
1435            return Err(classify_op_rejection(
1436                err,
1437                id,
1438                status,
1439                "provisioning_succeeded",
1440            ));
1441        }
1442
1443        // Shell concern: record the started timestamp.
1444        if let Some(shell) = state.records.get_mut(id) {
1445            shell.started_at = Some(Instant::now());
1446        }
1447        Ok(())
1448    }
1449
1450    fn provisioning_failed(
1451        &self,
1452        id: &OperationId,
1453        error: String,
1454    ) -> Result<(), OpsLifecycleError> {
1455        let mut state = self.write_state()?;
1456
1457        let status = state
1458            .status(id)
1459            .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1460
1461        let terminal_outcome = OperationTerminalOutcome::Failed { error };
1462        let (outcome_kind, outcome_payload) = ShellState::split_outcome(&terminal_outcome);
1463
1464        if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::FailOp {
1465            operation_id: mm_dsl::OperationId::from_domain(id).0,
1466            outcome: outcome_kind,
1467            payload: outcome_payload,
1468        }) {
1469            return Err(classify_op_rejection(err, id, status, "fail_operation"));
1470        }
1471
1472        state.finalize_terminal(id);
1473        state.maybe_persist()?;
1474        Ok(())
1475    }
1476
1477    fn peer_ready(
1478        &self,
1479        id: &OperationId,
1480        peer: OperationPeerHandle,
1481    ) -> Result<(), OpsLifecycleError> {
1482        let mut state = self.write_state()?;
1483
1484        let status = state
1485            .status(id)
1486            .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1487
1488        // The kind / not-already-peer-ready / status decisions all live in
1489        // the DSL's `PeerReadyOp` guards. The shell fires unconditionally
1490        // and classifies a guard rejection by re-reading the state under
1491        // the same write lock (so no race with the DSL's guard evaluation).
1492        if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::PeerReadyOp {
1493            operation_id: mm_dsl::OperationId::from_domain(id).0,
1494        }) {
1495            return Err(classify_peer_ready_rejection(&state, err, id, status));
1496        }
1497
1498        // Shell concern: store the peer handle.
1499        if let Some(shell) = state.records.get_mut(id) {
1500            shell.peer_handle = Some(peer);
1501        }
1502        Ok(())
1503    }
1504
1505    fn register_watcher(
1506        &self,
1507        id: &OperationId,
1508    ) -> Result<OperationCompletionWatch, OpsLifecycleError> {
1509        let mut state = self.write_state()?;
1510
1511        if !state.contains(id) {
1512            return Err(OpsLifecycleError::NotFound(id.clone()));
1513        }
1514
1515        // If already terminal, return an already-resolved watch.
1516        if let Some(outcome) = state.terminal_outcome(id) {
1517            return Ok(OperationCompletionWatch::already_resolved(outcome));
1518        }
1519
1520        // Shell concern: create the channel and store the sender.
1521        let shell = state.shell_record_mut(id)?;
1522        let (tx, watch) = OperationCompletionWatch::channel();
1523        shell.watchers.push(tx);
1524        Ok(watch)
1525    }
1526
1527    fn report_progress(
1528        &self,
1529        id: &OperationId,
1530        _update: OperationProgressUpdate,
1531    ) -> Result<(), OpsLifecycleError> {
1532        let mut state = self.write_state()?;
1533
1534        let status = state
1535            .status(id)
1536            .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1537
1538        if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::ProgressReportedOp {
1539            operation_id: mm_dsl::OperationId::from_domain(id).0,
1540        }) {
1541            return Err(classify_op_rejection(err, id, status, "report_progress"));
1542        }
1543        Ok(())
1544    }
1545
1546    fn complete_operation(
1547        &self,
1548        id: &OperationId,
1549        result: OperationResult,
1550    ) -> Result<(), OpsLifecycleError> {
1551        let mut state = self.write_state()?;
1552
1553        let status = state
1554            .status(id)
1555            .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1556
1557        let terminal_outcome = OperationTerminalOutcome::Completed(result);
1558        let (outcome_kind, outcome_payload) = ShellState::split_outcome(&terminal_outcome);
1559
1560        if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::CompleteOp {
1561            operation_id: mm_dsl::OperationId::from_domain(id).0,
1562            outcome: outcome_kind,
1563            payload: outcome_payload,
1564        }) {
1565            return Err(classify_op_rejection(err, id, status, "complete_operation"));
1566        }
1567
1568        state.finalize_terminal(id);
1569        state.maybe_persist()?;
1570        Ok(())
1571    }
1572
1573    fn fail_operation(&self, id: &OperationId, error: String) -> Result<(), OpsLifecycleError> {
1574        let mut state = self.write_state()?;
1575
1576        let status = state
1577            .status(id)
1578            .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1579
1580        let terminal_outcome = OperationTerminalOutcome::Failed { error };
1581        let (outcome_kind, outcome_payload) = ShellState::split_outcome(&terminal_outcome);
1582
1583        if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::FailOp {
1584            operation_id: mm_dsl::OperationId::from_domain(id).0,
1585            outcome: outcome_kind,
1586            payload: outcome_payload,
1587        }) {
1588            return Err(classify_op_rejection(err, id, status, "fail_operation"));
1589        }
1590
1591        state.finalize_terminal(id);
1592        state.maybe_persist()?;
1593        Ok(())
1594    }
1595
1596    fn abort_provisioning(
1597        &self,
1598        id: &OperationId,
1599        reason: Option<String>,
1600    ) -> Result<(), OpsLifecycleError> {
1601        let mut state = self.write_state()?;
1602
1603        let status = state
1604            .status(id)
1605            .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1606
1607        let terminal_outcome = OperationTerminalOutcome::Aborted { reason };
1608        let (outcome_kind, outcome_payload) = ShellState::split_outcome(&terminal_outcome);
1609
1610        if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::AbortOp {
1611            operation_id: mm_dsl::OperationId::from_domain(id).0,
1612            outcome: outcome_kind,
1613            payload: outcome_payload,
1614        }) {
1615            return Err(classify_op_rejection(err, id, status, "abort_provisioning"));
1616        }
1617
1618        state.finalize_terminal(id);
1619        state.maybe_persist()?;
1620        Ok(())
1621    }
1622
1623    fn cancel_operation(
1624        &self,
1625        id: &OperationId,
1626        reason: Option<String>,
1627    ) -> Result<(), OpsLifecycleError> {
1628        let mut state = self.write_state()?;
1629
1630        let status = state
1631            .status(id)
1632            .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1633
1634        let terminal_outcome = OperationTerminalOutcome::Cancelled { reason };
1635        let (outcome_kind, outcome_payload) = ShellState::split_outcome(&terminal_outcome);
1636
1637        if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::CancelOp {
1638            operation_id: mm_dsl::OperationId::from_domain(id).0,
1639            outcome: outcome_kind,
1640            payload: outcome_payload,
1641        }) {
1642            return Err(classify_op_rejection(err, id, status, "cancel_operation"));
1643        }
1644
1645        state.finalize_terminal(id);
1646        state.maybe_persist()?;
1647        Ok(())
1648    }
1649
1650    fn request_retire(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
1651        let mut state = self.write_state()?;
1652
1653        let status = state
1654            .status(id)
1655            .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1656
1657        if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::RetireRequestedOp {
1658            operation_id: mm_dsl::OperationId::from_domain(id).0,
1659        }) {
1660            return Err(classify_op_rejection(err, id, status, "request_retire"));
1661        }
1662        Ok(())
1663    }
1664
1665    fn mark_retired(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
1666        let mut state = self.write_state()?;
1667
1668        let status = state
1669            .status(id)
1670            .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1671
1672        let terminal_outcome = OperationTerminalOutcome::Retired;
1673        let (outcome_kind, outcome_payload) = ShellState::split_outcome(&terminal_outcome);
1674
1675        if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::RetireCompletedOp {
1676            operation_id: mm_dsl::OperationId::from_domain(id).0,
1677            outcome: outcome_kind,
1678            payload: outcome_payload,
1679        }) {
1680            return Err(classify_op_rejection(err, id, status, "mark_retired"));
1681        }
1682
1683        state.finalize_terminal(id);
1684        state.maybe_persist()?;
1685        Ok(())
1686    }
1687
1688    fn snapshot(&self, id: &OperationId) -> Option<OperationLifecycleSnapshot> {
1689        self.read_state().ok().and_then(|state| state.snapshot(id))
1690    }
1691
1692    fn list_operations(&self) -> Vec<OperationLifecycleSnapshot> {
1693        let mut snapshots = self
1694            .read_state()
1695            .map(|state| {
1696                state
1697                    .operation_ids()
1698                    .into_iter()
1699                    .filter_map(|id| state.snapshot(&id))
1700                    .collect::<Vec<_>>()
1701            })
1702            .unwrap_or_default();
1703        snapshots.sort_by(|left, right| left.display_name.cmp(&right.display_name));
1704        snapshots
1705    }
1706
1707    fn terminate_owner(&self, reason: String) -> Result<(), OpsLifecycleError> {
1708        let mut state = self.write_state()?;
1709
1710        let to_terminate = state.owner_termination_targets();
1711
1712        for (op_id, status) in &to_terminate {
1713            let terminal_outcome = OperationTerminalOutcome::Terminated {
1714                reason: reason.clone(),
1715            };
1716            let (outcome_kind, outcome_payload) = ShellState::split_outcome(&terminal_outcome);
1717
1718            if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::TerminateOp {
1719                operation_id: mm_dsl::OperationId::from_domain(op_id).0,
1720                outcome: outcome_kind,
1721                payload: outcome_payload,
1722            }) {
1723                return Err(classify_op_rejection(
1724                    err,
1725                    op_id,
1726                    *status,
1727                    "terminate_owner",
1728                ));
1729            }
1730
1731            state.finalize_terminal(op_id);
1732        }
1733
1734        if !to_terminate.is_empty() {
1735            state.maybe_persist()?;
1736        }
1737        Ok(())
1738    }
1739
1740    fn collect_completed(
1741        &self,
1742    ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
1743        let mut state = self.write_state()?;
1744
1745        let ids: Vec<OperationId> = state.completed_order.drain(..).collect();
1746        let mut collected = Vec::with_capacity(ids.len());
1747        for id in ids {
1748            let outcome = state.terminal_outcome(&id);
1749            // Remove from DSL state and shell record.
1750            let id_key = mm_dsl::OperationId::from_domain(&id).0;
1751            state.dsl.0.state.op_statuses.remove(&id_key);
1752            state.dsl.0.state.op_kinds.remove(&id_key);
1753            state.dsl.0.state.op_peer_ready.remove(&id_key);
1754            state.dsl.0.state.op_progress_counts.remove(&id_key);
1755            state.dsl.0.state.op_terminal_outcomes.remove(&id_key);
1756            state.dsl.0.state.op_terminal_payload.remove(&id_key);
1757            state.dsl.0.state.op_completion_seq.remove(&id_key);
1758            state.records.remove(&id);
1759            if let Some(outcome) = outcome {
1760                collected.push((id, outcome));
1761            }
1762        }
1763        Ok(collected)
1764    }
1765
1766    fn completion_feed(&self) -> Option<Arc<dyn CompletionFeed>> {
1767        Some(self.completion_feed_handle())
1768    }
1769
1770    fn wait_all(
1771        &self,
1772        _run_id: &RunId,
1773        ids: &[OperationId],
1774    ) -> std::pin::Pin<
1775        Box<dyn std::future::Future<Output = Result<WaitAllResult, OpsLifecycleError>> + Send + '_>,
1776    > {
1777        let wait_request_id = WaitRequestId::new();
1778        let owned_ids = ids.to_vec();
1779
1780        let state = match self.write_state() {
1781            Ok(mut state) => {
1782                match state.begin_wait_all_authority(&wait_request_id, &owned_ids) {
1783                    Ok(WaitAllAuthorityPlan::AlreadySatisfied(satisfied)) => {
1784                        let outcomes =
1785                            state
1786                                .collect_wait_outcomes(&owned_ids)
1787                                .map(|outcomes| WaitAllResult {
1788                                    outcomes,
1789                                    satisfied,
1790                                });
1791                        WaitAllFutureState::Ready(Some(outcomes))
1792                    }
1793                    Ok(WaitAllAuthorityPlan::ActivateBarrier) => {
1794                        state.wait_request_id = Some(wait_request_id.clone());
1795
1796                        if state.pending_wait.is_some() {
1797                            // Roll back the DSL barrier we just activated so the
1798                            // registry is not stuck in a wait-active state with
1799                            // no correlation oneshot to resolve. `CancelWaitAll`
1800                            // is the no-obligation clearer (members need not be
1801                            // terminal).
1802                            state.wait_request_id = None;
1803                            let _ = state.dsl_apply(
1804                                mm_dsl::MeerkatMachineInput::CancelWaitAll,
1805                                "CancelWaitAll(rollback)",
1806                            );
1807                            return Box::pin(WaitAllFuture {
1808                                registry: self,
1809                                wait_request_id,
1810                                operation_ids: owned_ids,
1811                                state: WaitAllFutureState::Ready(Some(Err(
1812                                    OpsLifecycleError::Internal(
1813                                        "wait_all started while a pending wait sender already existed"
1814                                            .into(),
1815                                    ),
1816                                ))),
1817                            });
1818                        }
1819                        let (sender, receiver) = tokio::sync::oneshot::channel();
1820                        state.pending_wait = Some(PendingWaitState {
1821                            wait_request_id: wait_request_id.clone(),
1822                            sender,
1823                        });
1824                        WaitAllFutureState::Waiting(receiver)
1825                    }
1826                    Err(err) => WaitAllFutureState::Ready(Some(Err(err))),
1827                }
1828            }
1829            Err(err) => WaitAllFutureState::Ready(Some(Err(err))),
1830        };
1831
1832        Box::pin(WaitAllFuture {
1833            registry: self,
1834            wait_request_id,
1835            operation_ids: owned_ids,
1836            state,
1837        })
1838    }
1839}
1840
1841#[cfg(test)]
1842#[allow(clippy::unwrap_used, clippy::panic)]
1843mod tests {
1844    use super::*;
1845    use meerkat_core::comms::{PeerId, TrustedPeerDescriptor};
1846    use meerkat_core::lifecycle::RunId;
1847    use meerkat_core::ops_lifecycle::{OperationKind, OpsLifecycleRegistry};
1848    use meerkat_core::types::SessionId;
1849    use std::sync::atomic::Ordering;
1850    use uuid::Uuid;
1851
1852    fn test_run_id() -> RunId {
1853        RunId(Uuid::from_u128(1))
1854    }
1855
1856    fn background_spec(name: &str) -> OperationSpec {
1857        OperationSpec {
1858            id: OperationId::new(),
1859            kind: OperationKind::BackgroundToolOp,
1860            owner_session_id: SessionId::new(),
1861            display_name: name.into(),
1862            source_label: "test".into(),
1863            child_session_id: None,
1864            expect_peer_channel: false,
1865        }
1866    }
1867
1868    #[tokio::test]
1869    async fn late_watchers_resolve_immediately() {
1870        let registry = RuntimeOpsLifecycleRegistry::new();
1871        let spec = background_spec("late");
1872        let op_id = spec.id.clone();
1873        registry.register_operation(spec).unwrap();
1874        registry.provisioning_succeeded(&op_id).unwrap();
1875        registry
1876            .complete_operation(
1877                &op_id,
1878                OperationResult {
1879                    id: op_id.clone(),
1880                    content: "done".into(),
1881                    is_error: false,
1882                    duration_ms: 1,
1883                    tokens_used: 0,
1884                },
1885            )
1886            .unwrap();
1887
1888        let watch = registry.register_watcher(&op_id).unwrap();
1889        match watch.wait().await {
1890            OperationTerminalOutcome::Completed(result) => assert_eq!(result.content, "done"),
1891            other => panic!("expected completed outcome, got {other:?}"),
1892        }
1893    }
1894
1895    #[test]
1896    fn peer_ready_requires_peer_expectation() {
1897        let registry = RuntimeOpsLifecycleRegistry::new();
1898        let spec = background_spec("no-peer");
1899        let op_id = spec.id.clone();
1900        registry.register_operation(spec).unwrap();
1901        registry.provisioning_succeeded(&op_id).unwrap();
1902
1903        let result = registry.peer_ready(
1904            &op_id,
1905            OperationPeerHandle {
1906                peer_name: meerkat_core::comms::PeerName::new("peer").unwrap(),
1907                trusted_peer: TrustedPeerDescriptor::test_only_unsigned_typed(
1908                    "peer",
1909                    PeerId::new(),
1910                    "inproc://peer",
1911                )
1912                .unwrap(),
1913            },
1914        );
1915        assert!(matches!(result, Err(OpsLifecycleError::PeerNotExpected(_))));
1916    }
1917
1918    #[tokio::test]
1919    async fn multi_listener_completion() {
1920        let registry = RuntimeOpsLifecycleRegistry::new();
1921        let spec = background_spec("multi");
1922        let op_id = spec.id.clone();
1923        registry.register_operation(spec).unwrap();
1924        registry.provisioning_succeeded(&op_id).unwrap();
1925
1926        let watch1 = registry.register_watcher(&op_id).unwrap();
1927        let watch2 = registry.register_watcher(&op_id).unwrap();
1928        let watch3 = registry.register_watcher(&op_id).unwrap();
1929
1930        registry
1931            .complete_operation(
1932                &op_id,
1933                OperationResult {
1934                    id: op_id.clone(),
1935                    content: "multi-done".into(),
1936                    is_error: false,
1937                    duration_ms: 1,
1938                    tokens_used: 0,
1939                },
1940            )
1941            .unwrap();
1942
1943        for watch in [watch1, watch2, watch3] {
1944            match watch.wait().await {
1945                OperationTerminalOutcome::Completed(result) => {
1946                    assert_eq!(result.content, "multi-done");
1947                }
1948                other => panic!("expected completed, got {other:?}"),
1949            }
1950        }
1951    }
1952
1953    #[tokio::test]
1954    async fn wait_all_returns_all_outcomes() {
1955        let registry = RuntimeOpsLifecycleRegistry::new();
1956
1957        let spec_a = background_spec("a");
1958        let id_a = spec_a.id.clone();
1959        registry.register_operation(spec_a).unwrap();
1960        registry.provisioning_succeeded(&id_a).unwrap();
1961
1962        let spec_b = background_spec("b");
1963        let id_b = spec_b.id.clone();
1964        registry.register_operation(spec_b).unwrap();
1965        registry.provisioning_succeeded(&id_b).unwrap();
1966
1967        registry
1968            .complete_operation(
1969                &id_a,
1970                OperationResult {
1971                    id: id_a.clone(),
1972                    content: "a-done".into(),
1973                    is_error: false,
1974                    duration_ms: 1,
1975                    tokens_used: 0,
1976                },
1977            )
1978            .unwrap();
1979        registry.fail_operation(&id_b, "b-error".into()).unwrap();
1980
1981        let wait_result = registry
1982            .wait_all(&test_run_id(), &[id_a.clone(), id_b.clone()])
1983            .await
1984            .unwrap();
1985        assert_eq!(wait_result.outcomes.len(), 2);
1986        assert_eq!(wait_result.outcomes[0].0, id_a);
1987        assert!(matches!(
1988            wait_result.outcomes[0].1,
1989            OperationTerminalOutcome::Completed(_)
1990        ));
1991        assert_eq!(wait_result.outcomes[1].0, id_b);
1992        assert!(matches!(
1993            wait_result.outcomes[1].1,
1994            OperationTerminalOutcome::Failed { .. }
1995        ));
1996        // Obligation carries the awaited IDs
1997        assert_eq!(wait_result.satisfied.operation_ids.len(), 2);
1998        assert_ne!(wait_result.satisfied.wait_request_id.to_string(), "");
1999    }
2000
2001    /// Exercises the trait `wait_all` path (via `dyn OpsLifecycleRegistry`)
2002    /// which must submit WaitAll through the DSL for cross-machine handoff.
2003    #[tokio::test]
2004    async fn wait_all_trait_path_submits_through_authority() {
2005        let registry = RuntimeOpsLifecycleRegistry::new();
2006        let spec = background_spec("trait-wait");
2007        let op_id = spec.id.clone();
2008        registry.register_operation(spec).unwrap();
2009        registry.provisioning_succeeded(&op_id).unwrap();
2010        registry
2011            .complete_operation(
2012                &op_id,
2013                OperationResult {
2014                    id: op_id.clone(),
2015                    content: "done".into(),
2016                    is_error: false,
2017                    duration_ms: 1,
2018                    tokens_used: 0,
2019                },
2020            )
2021            .unwrap();
2022
2023        // Call through trait object to exercise the trait impl, not the inherent method.
2024        let trait_ref: &dyn OpsLifecycleRegistry = &registry;
2025        let wait_result = trait_ref
2026            .wait_all(&test_run_id(), std::slice::from_ref(&op_id))
2027            .await
2028            .unwrap();
2029        assert_eq!(wait_result.outcomes.len(), 1);
2030        assert!(matches!(
2031            wait_result.outcomes[0].1,
2032            OperationTerminalOutcome::Completed(_)
2033        ));
2034        // Obligation carries the validated ID
2035        assert_eq!(wait_result.satisfied.operation_ids, vec![op_id]);
2036        assert_ne!(wait_result.satisfied.wait_request_id.to_string(), "");
2037    }
2038
2039    #[tokio::test]
2040    async fn wait_all_resolves_from_authority_owned_wait_request() {
2041        let registry = RuntimeOpsLifecycleRegistry::new();
2042        let run_id = test_run_id();
2043
2044        let spec = background_spec("pending");
2045        let op_id = spec.id.clone();
2046        registry.register_operation(spec).unwrap();
2047        registry.provisioning_succeeded(&op_id).unwrap();
2048
2049        let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
2050        tokio::pin!(wait_fut);
2051        assert!(
2052            tokio::time::timeout(std::time::Duration::from_millis(10), &mut wait_fut)
2053                .await
2054                .is_err()
2055        );
2056
2057        let active_wait_request_id = {
2058            let state = registry.read_state().unwrap();
2059            let wait_request_id = match state.wait_request_id.clone() {
2060                Some(wait_request_id) => wait_request_id,
2061                None => panic!("wait request should be active"),
2062            };
2063            assert_eq!(
2064                state.wait_operation_ids().as_slice(),
2065                std::slice::from_ref(&op_id)
2066            );
2067            wait_request_id
2068        };
2069
2070        registry
2071            .complete_operation(
2072                &op_id,
2073                OperationResult {
2074                    id: op_id.clone(),
2075                    content: "done".into(),
2076                    is_error: false,
2077                    duration_ms: 1,
2078                    tokens_used: 0,
2079                },
2080            )
2081            .unwrap();
2082
2083        let wait_result = wait_fut.await.unwrap();
2084        assert_eq!(
2085            wait_result.satisfied.wait_request_id,
2086            active_wait_request_id
2087        );
2088        assert_eq!(wait_result.satisfied.operation_ids, vec![op_id.clone()]);
2089        assert!(matches!(
2090            wait_result.outcomes.as_slice(),
2091            [(returned_id, OperationTerminalOutcome::Completed(_))] if *returned_id == op_id
2092        ));
2093        assert!(registry.read_state().unwrap().wait_request_id.is_none());
2094    }
2095
2096    #[tokio::test]
2097    async fn dropping_wait_all_future_cancels_active_wait_request() {
2098        let registry = RuntimeOpsLifecycleRegistry::new();
2099        let run_id = test_run_id();
2100
2101        let spec = background_spec("cancelled-wait");
2102        let op_id = spec.id.clone();
2103        registry.register_operation(spec).unwrap();
2104        registry.provisioning_succeeded(&op_id).unwrap();
2105
2106        let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
2107        drop(wait_fut);
2108
2109        let state = registry.read_state().unwrap();
2110        assert!(state.wait_request_id.is_none());
2111        assert!(state.wait_operation_ids().is_empty());
2112        assert!(!state.wait_active());
2113    }
2114
2115    #[test]
2116    fn terminate_owner_only_targets_non_terminal_operations() {
2117        let registry = RuntimeOpsLifecycleRegistry::new();
2118
2119        let running_spec = background_spec("running");
2120        let running_id = running_spec.id.clone();
2121        registry.register_operation(running_spec).unwrap();
2122        registry.provisioning_succeeded(&running_id).unwrap();
2123
2124        let completed_spec = background_spec("completed");
2125        let completed_id = completed_spec.id.clone();
2126        registry.register_operation(completed_spec).unwrap();
2127        registry.provisioning_succeeded(&completed_id).unwrap();
2128        registry
2129            .complete_operation(
2130                &completed_id,
2131                OperationResult {
2132                    id: completed_id.clone(),
2133                    content: "done".into(),
2134                    is_error: false,
2135                    duration_ms: 1,
2136                    tokens_used: 0,
2137                },
2138            )
2139            .unwrap();
2140
2141        registry.terminate_owner("shutdown".into()).unwrap();
2142
2143        assert!(matches!(
2144            registry.snapshot(&running_id).unwrap().status,
2145            OperationStatus::Terminated
2146        ));
2147        assert!(matches!(
2148            registry.snapshot(&completed_id).unwrap().status,
2149            OperationStatus::Completed
2150        ));
2151    }
2152
2153    #[test]
2154    fn collect_completed_drains_terminal_operations() {
2155        let registry = RuntimeOpsLifecycleRegistry::new();
2156
2157        let spec_a = background_spec("a");
2158        let id_a = spec_a.id.clone();
2159        registry.register_operation(spec_a).unwrap();
2160        registry.provisioning_succeeded(&id_a).unwrap();
2161        registry
2162            .complete_operation(
2163                &id_a,
2164                OperationResult {
2165                    id: id_a.clone(),
2166                    content: "done".into(),
2167                    is_error: false,
2168                    duration_ms: 1,
2169                    tokens_used: 0,
2170                },
2171            )
2172            .unwrap();
2173
2174        let spec_b = background_spec("b");
2175        let id_b = spec_b.id.clone();
2176        registry.register_operation(spec_b).unwrap();
2177
2178        let collected = registry.collect_completed().unwrap();
2179        assert_eq!(collected.len(), 1);
2180        assert_eq!(collected[0].0, id_a);
2181
2182        assert!(registry.snapshot(&id_a).is_none());
2183        assert!(registry.snapshot(&id_b).is_some());
2184
2185        let collected2 = registry.collect_completed().unwrap();
2186        assert!(collected2.is_empty());
2187    }
2188
2189    #[test]
2190    fn bounded_completed_retention_evicts_oldest() {
2191        let registry = RuntimeOpsLifecycleRegistry::with_config(OpsLifecycleConfig {
2192            max_completed: 3,
2193            max_concurrent: None,
2194        });
2195
2196        let mut ids = Vec::new();
2197        for i in 0..5 {
2198            let spec = background_spec(&format!("op-{i}"));
2199            let id = spec.id.clone();
2200            registry.register_operation(spec).unwrap();
2201            registry.provisioning_succeeded(&id).unwrap();
2202            registry
2203                .complete_operation(
2204                    &id,
2205                    OperationResult {
2206                        id: id.clone(),
2207                        content: format!("done-{i}"),
2208                        is_error: false,
2209                        duration_ms: 1,
2210                        tokens_used: 0,
2211                    },
2212                )
2213                .unwrap();
2214            ids.push(id);
2215        }
2216
2217        assert!(registry.snapshot(&ids[0]).is_none());
2218        assert!(registry.snapshot(&ids[1]).is_none());
2219        assert!(registry.snapshot(&ids[2]).is_some());
2220        assert!(registry.snapshot(&ids[3]).is_some());
2221        assert!(registry.snapshot(&ids[4]).is_some());
2222    }
2223
2224    #[test]
2225    fn max_concurrent_enforcement() {
2226        let registry = RuntimeOpsLifecycleRegistry::with_config(OpsLifecycleConfig {
2227            max_completed: DEFAULT_MAX_COMPLETED,
2228            max_concurrent: Some(2),
2229        });
2230
2231        let spec_a = background_spec("a");
2232        let id_a = spec_a.id.clone();
2233        registry.register_operation(spec_a).unwrap();
2234
2235        let spec_b = background_spec("b");
2236        registry.register_operation(spec_b).unwrap();
2237
2238        let spec_c = background_spec("c");
2239        let result = registry.register_operation(spec_c);
2240        assert!(matches!(
2241            result,
2242            Err(OpsLifecycleError::MaxConcurrentExceeded {
2243                limit: 2,
2244                active: 2,
2245            })
2246        ));
2247
2248        registry.provisioning_succeeded(&id_a).unwrap();
2249        registry
2250            .complete_operation(
2251                &id_a,
2252                OperationResult {
2253                    id: id_a.clone(),
2254                    content: "done".into(),
2255                    is_error: false,
2256                    duration_ms: 1,
2257                    tokens_used: 0,
2258                },
2259            )
2260            .unwrap();
2261
2262        let spec_d = background_spec("d");
2263        assert!(registry.register_operation(spec_d).is_ok());
2264    }
2265
2266    #[test]
2267    fn snapshot_includes_timestamps() {
2268        let registry = RuntimeOpsLifecycleRegistry::new();
2269        let spec = background_spec("timed");
2270        let op_id = spec.id.clone();
2271        registry.register_operation(spec).unwrap();
2272
2273        let snap1 = registry.snapshot(&op_id).unwrap();
2274        assert!(snap1.created_at_ms > 0);
2275        assert!(snap1.started_at_ms.is_none());
2276        assert!(snap1.completed_at_ms.is_none());
2277        assert!(snap1.elapsed_ms.is_none());
2278
2279        registry.provisioning_succeeded(&op_id).unwrap();
2280        let snap2 = registry.snapshot(&op_id).unwrap();
2281        assert!(snap2.started_at_ms.is_some());
2282        assert!(snap2.started_at_ms.unwrap() >= snap2.created_at_ms);
2283
2284        registry
2285            .complete_operation(
2286                &op_id,
2287                OperationResult {
2288                    id: op_id.clone(),
2289                    content: "done".into(),
2290                    is_error: false,
2291                    duration_ms: 1,
2292                    tokens_used: 0,
2293                },
2294            )
2295            .unwrap();
2296        let snap3 = registry.snapshot(&op_id).unwrap();
2297        assert!(snap3.completed_at_ms.is_some());
2298        assert!(snap3.elapsed_ms.is_some());
2299        assert!(snap3.completed_at_ms.unwrap() >= snap3.started_at_ms.unwrap());
2300    }
2301
2302    #[test]
2303    fn snapshot_includes_peer_handle() {
2304        let registry = RuntimeOpsLifecycleRegistry::new();
2305        let spec = OperationSpec {
2306            id: OperationId::new(),
2307            kind: OperationKind::MobMemberChild,
2308            owner_session_id: SessionId::new(),
2309            display_name: "peer-test".into(),
2310            source_label: "test".into(),
2311            child_session_id: Some(SessionId::new()),
2312            expect_peer_channel: true,
2313        };
2314        let op_id = spec.id.clone();
2315        registry.register_operation(spec).unwrap();
2316        registry.provisioning_succeeded(&op_id).unwrap();
2317
2318        let snap1 = registry.snapshot(&op_id).unwrap();
2319        assert!(snap1.peer_handle.is_none());
2320
2321        let handle = OperationPeerHandle {
2322            peer_name: meerkat_core::comms::PeerName::new("member-x").unwrap(),
2323            trusted_peer: TrustedPeerDescriptor::test_only_unsigned_typed(
2324                "member-x",
2325                PeerId::new(),
2326                "inproc://x",
2327            )
2328            .unwrap(),
2329        };
2330        registry.peer_ready(&op_id, handle).unwrap();
2331
2332        let snap2 = registry.snapshot(&op_id).unwrap();
2333        assert_eq!(
2334            snap2.peer_handle.as_ref().unwrap().peer_name.as_str(),
2335            "member-x"
2336        );
2337    }
2338}