Skip to main content

meerkat_runtime/
ops_lifecycle.rs

1//! In-memory runtime implementation of the shared async-operation lifecycle seam.
2//!
3//! All canonical lifecycle state mutations are delegated to
4//! [`OpsLifecycleAuthority`] via [`OpsLifecycleMutator::apply`]. This shell
5//! layer owns I/O concerns: watcher channels, timestamps, peer handles, and
6//! snapshot assembly.
7
8use std::collections::{HashMap, VecDeque};
9use std::future::Future;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
12use std::task::{Context, Poll};
13
14use meerkat_core::completion_feed::{
15    CompletionBatch, CompletionEntry, CompletionFeed, CompletionSeq,
16};
17
18#[cfg(target_arch = "wasm32")]
19use crate::tokio;
20use meerkat_core::lifecycle::{RunId, WaitRequestId};
21use meerkat_core::ops_lifecycle::{
22    DEFAULT_MAX_COMPLETED, OperationCompletionWatch, OperationId, OperationKind,
23    OperationLifecycleSnapshot, OperationPeerHandle, OperationProgressUpdate, OperationResult,
24    OperationSpec, OperationTerminalOutcome, OpsLifecycleError, OpsLifecycleRegistry,
25    WaitAllResult, WaitAllSatisfied,
26};
27use meerkat_core::time_compat::{Instant, SystemTime, UNIX_EPOCH};
28
29use crate::ops_lifecycle_authority::{
30    OpsLifecycleAuthority, OpsLifecycleEffect, OpsLifecycleInput, OpsLifecycleMutator,
31};
32
33// ---------------------------------------------------------------------------
34// Serializable snapshot for persistence
35// ---------------------------------------------------------------------------
36
37/// Serializable snapshot of the ops lifecycle registry state.
38///
39/// Captured on terminal transitions for durable persistence. Contains
40/// canonical authority state, operation specs, persisted completion feed
41/// entries, and consumer cursor values.
42#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
43pub struct PersistedOpsSnapshot {
44    /// Epoch identity at capture time.
45    pub epoch_id: meerkat_core::RuntimeEpochId,
46    /// Canonical machine-owned authority state.
47    pub authority_state: crate::ops_lifecycle_authority::RegistryCanonicalState,
48    /// Per-operation specs for shell record reconstruction.
49    pub operation_specs: HashMap<OperationId, meerkat_core::ops_lifecycle::OperationSpec>,
50    /// Persisted completion feed entries (actual contents, not reconstructed).
51    pub completion_entries: Vec<CompletionEntry>,
52    /// Consumer cursor snapshot at capture time.
53    pub cursors: meerkat_core::EpochCursorSnapshot,
54}
55
56// ---------------------------------------------------------------------------
57// Concrete completion feed buffer
58// ---------------------------------------------------------------------------
59
60/// Shared inner state of the completion feed buffer.
61///
62/// Protected by the registry's `RwLock<ShellState>` for writes, and by its
63/// own `RwLock` for reads by external consumers (agent boundary, idle wake).
64#[derive(Debug)]
65struct FeedBufferInner {
66    entries: VecDeque<CompletionEntry>,
67    watermark: CompletionSeq,
68    max_retained: usize,
69}
70
71/// Shared completion feed buffer owned by the runtime registry.
72///
73/// The registry writes entries under its own write lock. External consumers
74/// read through the [`RuntimeCompletionFeed`] handle.
75#[derive(Debug)]
76struct FeedBuffer {
77    inner: RwLock<FeedBufferInner>,
78    /// Atomic mirror of watermark for lock-free `watermark()` reads.
79    watermark_atomic: AtomicU64,
80    /// Notifies all waiters when new entries are appended.
81    notify: tokio::sync::Notify,
82}
83
84impl FeedBuffer {
85    fn new(max_retained: usize) -> Self {
86        Self {
87            inner: RwLock::new(FeedBufferInner {
88                entries: VecDeque::new(),
89                watermark: 0,
90                max_retained,
91            }),
92            watermark_atomic: AtomicU64::new(0),
93            notify: tokio::sync::Notify::new(),
94        }
95    }
96
97    fn push(&self, entry: CompletionEntry) {
98        let mut inner = self
99            .inner
100            .write()
101            .unwrap_or_else(std::sync::PoisonError::into_inner);
102        let seq = entry.seq;
103        inner.entries.push_back(entry);
104        inner.watermark = seq;
105
106        // Evict oldest if over capacity.
107        while inner.entries.len() > inner.max_retained {
108            inner.entries.pop_front();
109        }
110
111        drop(inner);
112
113        self.watermark_atomic.store(seq, Ordering::Release);
114        self.notify.notify_waiters();
115    }
116}
117
118/// Read-only handle to the runtime completion feed.
119///
120/// Implements [`CompletionFeed`] for external consumers. Obtained via
121/// [`RuntimeOpsLifecycleRegistry::completion_feed()`].
122#[derive(Debug, Clone)]
123pub struct RuntimeCompletionFeed {
124    buffer: Arc<FeedBuffer>,
125}
126
127impl CompletionFeed for RuntimeCompletionFeed {
128    fn watermark(&self) -> CompletionSeq {
129        self.buffer.watermark_atomic.load(Ordering::Acquire)
130    }
131
132    fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch {
133        let inner = self
134            .buffer
135            .inner
136            .read()
137            .unwrap_or_else(std::sync::PoisonError::into_inner);
138        let entries: Vec<CompletionEntry> = inner
139            .entries
140            .iter()
141            .filter(|e| e.seq > after_seq)
142            .cloned()
143            .collect();
144        let watermark = inner.watermark;
145        CompletionBatch { entries, watermark }
146    }
147
148    fn wait_for_advance(
149        &self,
150        after_seq: CompletionSeq,
151    ) -> std::pin::Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>> {
152        Box::pin(async move {
153            loop {
154                // Register the waiter BEFORE reading the watermark.
155                // notify_waiters() in push() only wakes already-registered
156                // listeners — if we read first and push() lands between the
157                // read and notified().await, the wake is lost.
158                let notified = self.buffer.notify.notified();
159                let current = self.buffer.watermark_atomic.load(Ordering::Acquire);
160                if current > after_seq {
161                    return current;
162                }
163                notified.await;
164            }
165        })
166    }
167}
168
169// ---------------------------------------------------------------------------
170// Shell-only per-operation record (not part of canonical machine state)
171// ---------------------------------------------------------------------------
172
173/// Shell-owned data for a single operation. Canonical lifecycle state lives in
174/// the authority; this struct holds I/O concerns that the authority has no
175/// knowledge of.
176#[derive(Debug)]
177struct ShellRecord {
178    spec: OperationSpec,
179    peer_handle: Option<OperationPeerHandle>,
180    watchers: Vec<tokio::sync::oneshot::Sender<OperationTerminalOutcome>>,
181    // Monotonic timestamps for elapsed computation
182    created_at: Instant,
183    started_at: Option<Instant>,
184    completed_at: Option<Instant>,
185    // Wall-clock anchor captured at creation for epoch millis
186    created_at_wall: SystemTime,
187}
188
189#[derive(Debug)]
190struct PendingWaitState {
191    wait_request_id: WaitRequestId,
192    sender: tokio::sync::oneshot::Sender<WaitAllSatisfied>,
193}
194
195impl ShellRecord {
196    fn new(spec: OperationSpec) -> Self {
197        Self {
198            spec,
199            peer_handle: None,
200            watchers: Vec::new(),
201            created_at: Instant::now(),
202            started_at: None,
203            completed_at: None,
204            created_at_wall: SystemTime::now(),
205        }
206    }
207
208    fn epoch_millis(wall_anchor: &SystemTime) -> u64 {
209        wall_anchor
210            .duration_since(UNIX_EPOCH)
211            .map(|d| d.as_millis() as u64)
212            .unwrap_or(0)
213    }
214
215    fn epoch_millis_for_instant(&self, instant: Instant) -> u64 {
216        // Compute wall time for a given instant using the wall-clock anchor:
217        // wall_time = created_at_wall + (instant - created_at)
218        let offset = instant.saturating_duration_since(self.created_at);
219        let wall = self.created_at_wall + offset;
220        Self::epoch_millis(&wall)
221    }
222
223    /// Notify all watchers with the given terminal outcome and drain the list.
224    fn notify_watchers(&mut self, outcome: &OperationTerminalOutcome) {
225        for watcher in std::mem::take(&mut self.watchers) {
226            let _ = watcher.send(outcome.clone());
227        }
228    }
229
230    /// Mark the completion timestamp.
231    fn mark_completed(&mut self) {
232        self.completed_at = Some(Instant::now());
233    }
234}
235
236// ---------------------------------------------------------------------------
237// Combined shell state: authority + shell records
238// ---------------------------------------------------------------------------
239
240#[derive(Debug)]
241struct ShellState {
242    authority: OpsLifecycleAuthority,
243    records: HashMap<OperationId, ShellRecord>,
244    pending_wait: Option<PendingWaitState>,
245    /// Shared detached-op wake state. When a `BackgroundToolOp` reaches terminal,
246    /// sets pending and fires the Notify so the waker task can inject a
247    /// continuation into the quiescent session.
248    detached_wake: Option<Arc<crate::detached_wake::DetachedWakeState>>,
249    /// Shared feed buffer for completion events.
250    feed_buffer: Arc<FeedBuffer>,
251    /// Persistence channel for durable snapshot writes (set via `set_persistence_channel`).
252    persist_tx: Option<crate::tokio::sync::mpsc::Sender<PersistedOpsSnapshot>>,
253    /// Epoch ID for persistence snapshots.
254    persist_epoch_id: Option<meerkat_core::RuntimeEpochId>,
255    /// Shared cursor state for persistence snapshots.
256    persist_cursor_state: Option<Arc<meerkat_core::EpochCursorState>>,
257}
258
259impl ShellState {
260    fn new(max_completed: usize, max_concurrent: Option<usize>) -> Self {
261        Self {
262            authority: OpsLifecycleAuthority::new(max_completed, max_concurrent),
263            records: HashMap::new(),
264            pending_wait: None,
265            detached_wake: None,
266            // Feed buffer is larger than authority retention to absorb bursts.
267            // Entries are only evicted by buffer capacity, not by consumer cursor,
268            // so the buffer must be large enough that consumers drain before
269            // the oldest entry is evicted.
270            feed_buffer: Arc::new(FeedBuffer::new(max_completed.saturating_mul(4).max(1024))),
271            persist_tx: None,
272            persist_epoch_id: None,
273            persist_cursor_state: None,
274        }
275    }
276
277    /// Build a snapshot by combining authority canonical state with shell data.
278    fn snapshot(&self, id: &OperationId) -> Option<OperationLifecycleSnapshot> {
279        let canonical = self.authority.operation(id)?;
280        let shell = self.records.get(id)?;
281
282        let created_at_ms = ShellRecord::epoch_millis(&shell.created_at_wall);
283        let started_at_ms = shell.started_at.map(|i| shell.epoch_millis_for_instant(i));
284        let completed_at_ms = shell
285            .completed_at
286            .map(|i| shell.epoch_millis_for_instant(i));
287        let elapsed_ms = shell.completed_at.map(|completed| {
288            completed
289                .saturating_duration_since(shell.created_at)
290                .as_millis() as u64
291        });
292
293        Some(OperationLifecycleSnapshot {
294            id: shell.spec.id.clone(),
295            kind: canonical.kind(),
296            display_name: shell.spec.display_name.clone(),
297            status: canonical.status(),
298            peer_ready: canonical.peer_ready(),
299            progress_count: canonical.progress_count(),
300            watcher_count: shell.watchers.len() as u32,
301            terminal_outcome: canonical.terminal_outcome().cloned(),
302            child_session_id: shell.spec.child_session_id.clone(),
303            peer_handle: shell.peer_handle.clone(),
304            created_at_ms,
305            started_at_ms,
306            completed_at_ms,
307            elapsed_ms,
308        })
309    }
310
311    /// Execute authority effects on shell state.
312    ///
313    /// **Important:** callers must patch the real terminal outcome on the
314    /// authority (via `patch_terminal_outcome`) *before* calling this method.
315    /// `NotifyOpWatcher` effects read the patched outcome from the authority
316    /// rather than using the placeholder embedded in the effect.
317    fn execute_effects(&mut self, effects: &[OpsLifecycleEffect]) {
318        for effect in effects {
319            match effect {
320                OpsLifecycleEffect::NotifyOpWatcher { operation_id, .. } => {
321                    // Read the real (patched) outcome from the authority.
322                    let outcome = self
323                        .authority
324                        .operation(operation_id)
325                        .and_then(|op| op.terminal_outcome().cloned());
326                    if let Some(outcome) = outcome
327                        && let Some(shell) = self.records.get_mut(operation_id)
328                    {
329                        let watcher_count = shell.watchers.len() as u32;
330                        shell.notify_watchers(&outcome);
331                        shell.mark_completed();
332                        self.authority.watchers_drained(operation_id, watcher_count);
333                    }
334                    // Arm + signal detached-op wake if this is a BackgroundToolOp terminal.
335                    if let Some(ref wake) = self.detached_wake
336                        && self
337                            .authority
338                            .operation(operation_id)
339                            .is_some_and(|op| op.kind() == OperationKind::BackgroundToolOp)
340                    {
341                        wake.pending.store(true, Ordering::Release);
342                        wake.notify.notify_one(); // wake the waker task directly
343                    }
344                }
345                OpsLifecycleEffect::ExposeOperationPeer { .. } => {
346                    // Peer handle is stored in shell record by the calling method
347                    // after authority.apply() succeeds. Nothing else to do here.
348                }
349                OpsLifecycleEffect::RetainTerminalRecord { .. } => {
350                    // The authority handles completed_order tracking internally.
351                    // Shell record stays in place until evicted.
352                }
353                OpsLifecycleEffect::EvictCompletedRecord { operation_id } => {
354                    self.records.remove(operation_id);
355                    self.authority.remove_operation(operation_id);
356                }
357                OpsLifecycleEffect::CompletionProduced {
358                    seq,
359                    operation_id,
360                    kind,
361                } => {
362                    // Build a CompletionEntry from authority + shell data and
363                    // push to the shared feed buffer.
364                    let (display_name, terminal_outcome, completed_at_ms) =
365                        if let Some(canonical) = self.authority.operation(operation_id) {
366                            let outcome = canonical.terminal_outcome().cloned().unwrap_or(
367                                OperationTerminalOutcome::Terminated {
368                                    reason: "missing outcome".into(),
369                                },
370                            );
371                            let completed_ms = self.records.get(operation_id).and_then(|r| {
372                                r.completed_at.map(|i| r.epoch_millis_for_instant(i))
373                            });
374                            let name = self
375                                .records
376                                .get(operation_id)
377                                .map(|r| r.spec.display_name.clone())
378                                .unwrap_or_default();
379                            (name, outcome, completed_ms)
380                        } else {
381                            (
382                                String::new(),
383                                OperationTerminalOutcome::Terminated {
384                                    reason: "unknown operation".into(),
385                                },
386                                None,
387                            )
388                        };
389
390                    self.feed_buffer.push(CompletionEntry {
391                        seq: *seq,
392                        operation_id: operation_id.clone(),
393                        kind: *kind,
394                        display_name,
395                        terminal_outcome,
396                        completed_at_ms,
397                    });
398                }
399                OpsLifecycleEffect::SubmitOpEvent { .. } => {
400                    // Future: emit observability events. Currently a no-op.
401                }
402                OpsLifecycleEffect::WaitAllSatisfied {
403                    wait_request_id,
404                    operation_ids,
405                } => {
406                    if let Some(pending_wait) = self.pending_wait.take() {
407                        if pending_wait.wait_request_id == *wait_request_id {
408                            let _ = pending_wait.sender.send(WaitAllSatisfied {
409                                wait_request_id: wait_request_id.clone(),
410                                operation_ids: operation_ids.clone(),
411                            });
412                        } else {
413                            self.pending_wait = Some(pending_wait);
414                        }
415                    }
416                }
417            }
418        }
419    }
420
421    /// Queue a persistence snapshot if a persistence channel is wired.
422    ///
423    /// Called after terminal transitions. Captures authority + entries + cursors
424    /// under the write lock (caller already holds it) and queues to the channel.
425    fn maybe_persist(&self) {
426        let (tx, epoch_id, cursor_state) = match (
427            &self.persist_tx,
428            &self.persist_epoch_id,
429            &self.persist_cursor_state,
430        ) {
431            (Some(tx), Some(epoch_id), Some(cs)) => (tx, epoch_id, cs),
432            _ => return,
433        };
434
435        let operation_specs: HashMap<OperationId, meerkat_core::ops_lifecycle::OperationSpec> =
436            self.records
437                .iter()
438                .map(|(id, record)| (id.clone(), record.spec.clone()))
439                .collect();
440
441        let completion_entries = {
442            let inner = self
443                .feed_buffer
444                .inner
445                .read()
446                .unwrap_or_else(std::sync::PoisonError::into_inner);
447            inner.entries.iter().cloned().collect()
448        };
449
450        let snapshot = PersistedOpsSnapshot {
451            epoch_id: epoch_id.clone(),
452            authority_state: self.authority.canonical_state().clone(),
453            operation_specs,
454            completion_entries,
455            cursors: cursor_state.snapshot(),
456        };
457
458        // Non-blocking send — bounded-loss is the acknowledged contract.
459        if tx.try_send(snapshot).is_err() {
460            tracing::warn!("ops lifecycle persistence channel full or closed; snapshot dropped");
461        }
462    }
463
464    fn shell_record_mut(
465        &mut self,
466        id: &OperationId,
467    ) -> Result<&mut ShellRecord, OpsLifecycleError> {
468        self.records
469            .get_mut(id)
470            .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))
471    }
472
473    fn collect_wait_outcomes(
474        &self,
475        operation_ids: &[OperationId],
476    ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
477        operation_ids
478            .iter()
479            .map(|operation_id| {
480                let outcome = self
481                    .authority
482                    .operation(operation_id)
483                    .and_then(|op| op.terminal_outcome().cloned())
484                    .ok_or_else(|| {
485                        OpsLifecycleError::Internal(format!(
486                            "wait_all completed without terminal outcome for {operation_id}"
487                        ))
488                    })?;
489                Ok((operation_id.clone(), outcome))
490            })
491            .collect()
492    }
493}
494
495impl Default for ShellState {
496    fn default() -> Self {
497        Self::new(DEFAULT_MAX_COMPLETED, None)
498    }
499}
500
501// ---------------------------------------------------------------------------
502// Public configuration & registry
503// ---------------------------------------------------------------------------
504
505/// Configuration for [`RuntimeOpsLifecycleRegistry`].
506#[derive(Debug, Clone)]
507pub struct OpsLifecycleConfig {
508    /// Maximum number of completed operations to retain (default: 256).
509    pub max_completed: usize,
510    /// Maximum concurrent non-terminal operations (None = unlimited).
511    pub max_concurrent: Option<usize>,
512}
513
514impl Default for OpsLifecycleConfig {
515    fn default() -> Self {
516        Self {
517            max_completed: DEFAULT_MAX_COMPLETED,
518            max_concurrent: None,
519        }
520    }
521}
522
523/// Per-runtime shared registry for async operation lifecycle truth.
524///
525/// All canonical lifecycle state mutations are delegated to
526/// [`OpsLifecycleAuthority`]. This shell manages I/O concerns: watcher
527/// channels, timestamps, peer handles, and snapshot assembly.
528#[derive(Debug)]
529pub struct RuntimeOpsLifecycleRegistry {
530    state: RwLock<ShellState>,
531}
532
533impl Default for RuntimeOpsLifecycleRegistry {
534    fn default() -> Self {
535        Self {
536            state: RwLock::new(ShellState::default()),
537        }
538    }
539}
540
541impl RuntimeOpsLifecycleRegistry {
542    pub fn new() -> Self {
543        Self::default()
544    }
545
546    pub fn with_config(config: OpsLifecycleConfig) -> Self {
547        Self {
548            state: RwLock::new(ShellState::new(config.max_completed, config.max_concurrent)),
549        }
550    }
551
552    /// Wire a persistence channel for durable snapshot writes.
553    ///
554    /// After this call, terminal transitions (complete/fail/cancel/abort)
555    /// capture a snapshot and queue it to the channel. A dedicated
556    /// persistence task should drain the channel and write to the store.
557    pub fn set_persistence_channel(
558        &self,
559        tx: crate::tokio::sync::mpsc::Sender<PersistedOpsSnapshot>,
560        epoch_id: meerkat_core::RuntimeEpochId,
561        cursor_state: Arc<meerkat_core::EpochCursorState>,
562    ) {
563        if let Ok(mut state) = self.state.write() {
564            state.persist_tx = Some(tx);
565            state.persist_epoch_id = Some(epoch_id);
566            state.persist_cursor_state = Some(cursor_state);
567        }
568    }
569
570    /// Wire the detached-wake state so that `execute_effects` arms pending
571    /// and fires the Notify when a `BackgroundToolOp` reaches terminal.
572    pub fn set_detached_wake(&self, wake: Arc<crate::detached_wake::DetachedWakeState>) {
573        if let Ok(mut state) = self.state.write() {
574            state.detached_wake = Some(wake);
575        }
576    }
577
578    /// Recover from a persisted snapshot.
579    ///
580    /// Rebuilds the authority (stripping non-terminal ops), creates fresh
581    /// shell records from specs, and seeds the feed buffer with persisted
582    /// completion entries.
583    pub fn from_recovered(snapshot: PersistedOpsSnapshot) -> Self {
584        let authority = OpsLifecycleAuthority::from_recovered(snapshot.authority_state);
585
586        // Seed the feed buffer from persisted entries
587        let max_retained = authority
588            .canonical_state()
589            .max_completed()
590            .max(256)
591            .saturating_mul(4)
592            .max(1024);
593        let feed_buffer = Arc::new(FeedBuffer::new(max_retained));
594        for entry in &snapshot.completion_entries {
595            feed_buffer.push(entry.clone());
596        }
597
598        // Rebuild shell records from specs (fresh timestamps, no watchers)
599        let mut records = HashMap::new();
600        for (op_id, spec) in &snapshot.operation_specs {
601            // Only rebuild records for operations still in the authority
602            if authority.operation(op_id).is_some() {
603                records.insert(
604                    op_id.clone(),
605                    ShellRecord {
606                        spec: spec.clone(),
607                        peer_handle: None,
608                        watchers: Vec::new(),
609                        created_at: Instant::now(),
610                        started_at: None,
611                        completed_at: None,
612                        created_at_wall: SystemTime::now(),
613                    },
614                );
615            }
616        }
617
618        let state = ShellState {
619            authority,
620            records,
621            pending_wait: None,
622            detached_wake: None,
623            feed_buffer,
624            persist_tx: None,
625            persist_epoch_id: None,
626            persist_cursor_state: None,
627        };
628
629        Self {
630            state: RwLock::new(state),
631        }
632    }
633
634    /// Capture a serializable snapshot of the current state for persistence.
635    ///
636    /// Includes authority state, operation specs, completion entries, and
637    /// cursor values. Cursor values may be stale relative to the agent's
638    /// true position (monotonic staleness, not atomicity).
639    pub fn capture_persistence_snapshot(
640        &self,
641        epoch_id: meerkat_core::RuntimeEpochId,
642        cursor_state: &meerkat_core::EpochCursorState,
643    ) -> PersistedOpsSnapshot {
644        let state = self
645            .state
646            .read()
647            .unwrap_or_else(std::sync::PoisonError::into_inner);
648
649        let operation_specs: HashMap<OperationId, meerkat_core::ops_lifecycle::OperationSpec> =
650            state
651                .records
652                .iter()
653                .map(|(id, record)| (id.clone(), record.spec.clone()))
654                .collect();
655
656        let completion_entries = {
657            let inner = state
658                .feed_buffer
659                .inner
660                .read()
661                .unwrap_or_else(std::sync::PoisonError::into_inner);
662            inner.entries.iter().cloned().collect()
663        };
664
665        let cursors = cursor_state.snapshot();
666
667        PersistedOpsSnapshot {
668            epoch_id,
669            authority_state: state.authority.canonical_state().clone(),
670            operation_specs,
671            completion_entries,
672            cursors,
673        }
674    }
675
676    /// Return a read handle to the completion feed.
677    pub fn completion_feed_handle(&self) -> Arc<dyn CompletionFeed> {
678        let state = self
679            .state
680            .read()
681            .unwrap_or_else(std::sync::PoisonError::into_inner);
682        Arc::new(RuntimeCompletionFeed {
683            buffer: Arc::clone(&state.feed_buffer),
684        })
685    }
686
687    fn read_state(&self) -> Result<RwLockReadGuard<'_, ShellState>, OpsLifecycleError> {
688        self.state
689            .read()
690            .map_err(|_| OpsLifecycleError::Internal("ops lifecycle registry poisoned".into()))
691    }
692
693    fn write_state(&self) -> Result<RwLockWriteGuard<'_, ShellState>, OpsLifecycleError> {
694        self.state
695            .write()
696            .map_err(|_| OpsLifecycleError::Internal("ops lifecycle registry poisoned".into()))
697    }
698
699    fn cancel_wait_all_internal(
700        &self,
701        wait_request_id: &WaitRequestId,
702    ) -> Result<(), OpsLifecycleError> {
703        let mut state = self.write_state()?;
704        match state.authority.apply(OpsLifecycleInput::CancelWaitAll {
705            wait_request_id: wait_request_id.clone(),
706        }) {
707            Ok(_) => {
708                state.pending_wait = None;
709                Ok(())
710            }
711            Err(OpsLifecycleError::WaitNotActive(_)) => {
712                state.pending_wait = None;
713                Ok(())
714            }
715            Err(err) => Err(err),
716        }
717    }
718}
719
720enum WaitAllFutureState {
721    Ready(Option<Result<WaitAllResult, OpsLifecycleError>>),
722    Waiting(tokio::sync::oneshot::Receiver<WaitAllSatisfied>),
723    Done,
724}
725
726struct WaitAllFuture<'a> {
727    registry: &'a RuntimeOpsLifecycleRegistry,
728    wait_request_id: WaitRequestId,
729    operation_ids: Vec<OperationId>,
730    state: WaitAllFutureState,
731}
732
733impl Future for WaitAllFuture<'_> {
734    type Output = Result<WaitAllResult, OpsLifecycleError>;
735
736    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
737        match &mut self.state {
738            WaitAllFutureState::Ready(result) => {
739                let ready = result.take().unwrap_or_else(|| {
740                    Err(OpsLifecycleError::Internal(
741                        "wait_all future polled after completion".into(),
742                    ))
743                });
744                self.state = WaitAllFutureState::Done;
745                Poll::Ready(ready)
746            }
747            WaitAllFutureState::Waiting(receiver) => match std::pin::Pin::new(receiver).poll(cx) {
748                Poll::Pending => Poll::Pending,
749                Poll::Ready(Ok(satisfied)) => {
750                    let outcomes = match self.registry.read_state() {
751                        Ok(state) => state.collect_wait_outcomes(&self.operation_ids),
752                        Err(err) => Err(err),
753                    };
754                    self.state = WaitAllFutureState::Done;
755                    Poll::Ready(outcomes.map(|outcomes| WaitAllResult {
756                        outcomes,
757                        satisfied,
758                    }))
759                }
760                Poll::Ready(Err(_)) => {
761                    self.state = WaitAllFutureState::Done;
762                    Poll::Ready(Err(OpsLifecycleError::Internal(
763                        "wait_all completion channel dropped".into(),
764                    )))
765                }
766            },
767            WaitAllFutureState::Done => Poll::Ready(Err(OpsLifecycleError::Internal(
768                "wait_all future polled after completion".into(),
769            ))),
770        }
771    }
772}
773
774impl Drop for WaitAllFuture<'_> {
775    fn drop(&mut self) {
776        if matches!(self.state, WaitAllFutureState::Waiting(_)) {
777            let _ = self
778                .registry
779                .cancel_wait_all_internal(&self.wait_request_id);
780        }
781    }
782}
783
784impl OpsLifecycleRegistry for RuntimeOpsLifecycleRegistry {
785    fn register_operation(&self, spec: OperationSpec) -> Result<(), OpsLifecycleError> {
786        let mut state = self.write_state()?;
787        let operation_id = spec.id.clone();
788        let kind = spec.kind;
789
790        // Delegate to authority for guard checks and canonical state insertion.
791        let transition = state
792            .authority
793            .apply(OpsLifecycleInput::RegisterOperation {
794                operation_id: operation_id.clone(),
795                kind,
796            })?;
797
798        // Insert shell record.
799        state.records.insert(operation_id, ShellRecord::new(spec));
800
801        // Execute effects (none expected for register, but be correct).
802        state.execute_effects(&transition.effects);
803        Ok(())
804    }
805
806    fn provisioning_succeeded(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
807        let mut state = self.write_state()?;
808
809        let transition = state
810            .authority
811            .apply(OpsLifecycleInput::ProvisioningSucceeded {
812                operation_id: id.clone(),
813            })?;
814
815        // Shell concern: record the started timestamp.
816        if let Some(shell) = state.records.get_mut(id) {
817            shell.started_at = Some(Instant::now());
818        }
819
820        state.execute_effects(&transition.effects);
821        Ok(())
822    }
823
824    fn provisioning_failed(
825        &self,
826        id: &OperationId,
827        error: String,
828    ) -> Result<(), OpsLifecycleError> {
829        let mut state = self.write_state()?;
830
831        let transition = state
832            .authority
833            .apply(OpsLifecycleInput::ProvisioningFailed {
834                operation_id: id.clone(),
835            })?;
836
837        // Patch the real terminal outcome (authority uses placeholder).
838        state
839            .authority
840            .patch_terminal_outcome(id, OperationTerminalOutcome::Failed { error });
841
842        state.execute_effects(&transition.effects);
843        Ok(())
844    }
845
846    fn peer_ready(
847        &self,
848        id: &OperationId,
849        peer: OperationPeerHandle,
850    ) -> Result<(), OpsLifecycleError> {
851        let mut state = self.write_state()?;
852
853        let transition = state.authority.apply(OpsLifecycleInput::PeerReady {
854            operation_id: id.clone(),
855        })?;
856
857        // Shell concern: store the peer handle.
858        if let Some(shell) = state.records.get_mut(id) {
859            shell.peer_handle = Some(peer);
860        }
861
862        state.execute_effects(&transition.effects);
863        Ok(())
864    }
865
866    fn register_watcher(
867        &self,
868        id: &OperationId,
869    ) -> Result<OperationCompletionWatch, OpsLifecycleError> {
870        let mut state = self.write_state()?;
871
872        // Check authority for terminal outcome first (already-resolved path).
873        let canonical = state
874            .authority
875            .operation(id)
876            .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
877
878        if let Some(outcome) = canonical.terminal_outcome() {
879            return Ok(OperationCompletionWatch::already_resolved(outcome.clone()));
880        }
881
882        // Delegate to authority for watcher_count bookkeeping.
883        let _transition = state.authority.apply(OpsLifecycleInput::RegisterWatcher {
884            operation_id: id.clone(),
885        })?;
886
887        // Shell concern: create the channel and store the sender.
888        let shell = state.shell_record_mut(id)?;
889        let (tx, watch) = OperationCompletionWatch::channel();
890        shell.watchers.push(tx);
891        Ok(watch)
892    }
893
894    fn report_progress(
895        &self,
896        id: &OperationId,
897        _update: OperationProgressUpdate,
898    ) -> Result<(), OpsLifecycleError> {
899        let mut state = self.write_state()?;
900
901        let transition = state.authority.apply(OpsLifecycleInput::ProgressReported {
902            operation_id: id.clone(),
903        })?;
904
905        state.execute_effects(&transition.effects);
906        Ok(())
907    }
908
909    fn complete_operation(
910        &self,
911        id: &OperationId,
912        result: OperationResult,
913    ) -> Result<(), OpsLifecycleError> {
914        let mut state = self.write_state()?;
915
916        let transition = state
917            .authority
918            .apply(OpsLifecycleInput::CompleteOperation {
919                operation_id: id.clone(),
920            })?;
921
922        // Patch the real terminal outcome (authority uses placeholder).
923        state
924            .authority
925            .patch_terminal_outcome(id, OperationTerminalOutcome::Completed(result));
926
927        state.execute_effects(&transition.effects);
928        state.maybe_persist();
929        Ok(())
930    }
931
932    fn fail_operation(&self, id: &OperationId, error: String) -> Result<(), OpsLifecycleError> {
933        let mut state = self.write_state()?;
934
935        let transition = state.authority.apply(OpsLifecycleInput::FailOperation {
936            operation_id: id.clone(),
937        })?;
938
939        // Patch the real terminal outcome.
940        state
941            .authority
942            .patch_terminal_outcome(id, OperationTerminalOutcome::Failed { error });
943
944        state.execute_effects(&transition.effects);
945        state.maybe_persist();
946        Ok(())
947    }
948
949    fn abort_provisioning(
950        &self,
951        id: &OperationId,
952        reason: Option<String>,
953    ) -> Result<(), OpsLifecycleError> {
954        let mut state = self.write_state()?;
955
956        let transition = state
957            .authority
958            .apply(OpsLifecycleInput::AbortProvisioning {
959                operation_id: id.clone(),
960            })?;
961
962        state
963            .authority
964            .patch_terminal_outcome(id, OperationTerminalOutcome::Aborted { reason });
965
966        state.execute_effects(&transition.effects);
967        state.maybe_persist();
968        Ok(())
969    }
970
971    fn cancel_operation(
972        &self,
973        id: &OperationId,
974        reason: Option<String>,
975    ) -> Result<(), OpsLifecycleError> {
976        let mut state = self.write_state()?;
977
978        let transition = state.authority.apply(OpsLifecycleInput::CancelOperation {
979            operation_id: id.clone(),
980        })?;
981
982        // Patch the real terminal outcome.
983        state
984            .authority
985            .patch_terminal_outcome(id, OperationTerminalOutcome::Cancelled { reason });
986
987        state.execute_effects(&transition.effects);
988        state.maybe_persist();
989        Ok(())
990    }
991
992    fn request_retire(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
993        let mut state = self.write_state()?;
994
995        let transition = state.authority.apply(OpsLifecycleInput::RetireRequested {
996            operation_id: id.clone(),
997        })?;
998
999        state.execute_effects(&transition.effects);
1000        Ok(())
1001    }
1002
1003    fn mark_retired(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
1004        let mut state = self.write_state()?;
1005
1006        let transition = state.authority.apply(OpsLifecycleInput::RetireCompleted {
1007            operation_id: id.clone(),
1008        })?;
1009
1010        // Patch the real terminal outcome.
1011        state
1012            .authority
1013            .patch_terminal_outcome(id, OperationTerminalOutcome::Retired);
1014
1015        state.execute_effects(&transition.effects);
1016        Ok(())
1017    }
1018
1019    fn snapshot(&self, id: &OperationId) -> Option<OperationLifecycleSnapshot> {
1020        self.read_state().ok().and_then(|state| state.snapshot(id))
1021    }
1022
1023    fn list_operations(&self) -> Vec<OperationLifecycleSnapshot> {
1024        let mut snapshots = self
1025            .read_state()
1026            .map(|state| {
1027                state
1028                    .authority
1029                    .operations()
1030                    .filter_map(|(id, _)| state.snapshot(id))
1031                    .collect::<Vec<_>>()
1032            })
1033            .unwrap_or_default();
1034        snapshots.sort_by(|left, right| left.display_name.cmp(&right.display_name));
1035        snapshots
1036    }
1037
1038    fn terminate_owner(&self, reason: String) -> Result<(), OpsLifecycleError> {
1039        let mut state = self.write_state()?;
1040
1041        let transition = state.authority.apply(OpsLifecycleInput::OwnerTerminated)?;
1042
1043        // Patch all terminal outcomes with the real reason.
1044        // The authority set placeholder empty-string reasons; we patch the real
1045        // reason into each newly-terminated operation.
1046        for effect in &transition.effects {
1047            if let OpsLifecycleEffect::NotifyOpWatcher { operation_id, .. } = effect {
1048                state.authority.patch_terminal_outcome(
1049                    operation_id,
1050                    OperationTerminalOutcome::Terminated {
1051                        reason: reason.clone(),
1052                    },
1053                );
1054            }
1055        }
1056
1057        state.execute_effects(&transition.effects);
1058        Ok(())
1059    }
1060
1061    fn collect_completed(
1062        &self,
1063    ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
1064        let mut state = self.write_state()?;
1065
1066        let collected = state.authority.drain_completed();
1067
1068        // Remove corresponding shell records.
1069        for (id, _) in &collected {
1070            state.records.remove(id);
1071        }
1072
1073        Ok(collected)
1074    }
1075
1076    fn completion_feed(&self) -> Option<Arc<dyn CompletionFeed>> {
1077        Some(self.completion_feed_handle())
1078    }
1079
1080    fn wait_all(
1081        &self,
1082        _run_id: &RunId,
1083        ids: &[OperationId],
1084    ) -> std::pin::Pin<
1085        Box<dyn std::future::Future<Output = Result<WaitAllResult, OpsLifecycleError>> + Send + '_>,
1086    > {
1087        let wait_request_id = WaitRequestId::new();
1088        let owned_ids = ids.to_vec();
1089
1090        let state = match self.write_state() {
1091            Ok(mut state) => {
1092                let transition = match state.authority.apply(OpsLifecycleInput::BeginWaitAll {
1093                    wait_request_id: wait_request_id.clone(),
1094                    operation_ids: owned_ids.clone(),
1095                }) {
1096                    Ok(transition) => transition,
1097                    Err(err) => {
1098                        return Box::pin(WaitAllFuture {
1099                            registry: self,
1100                            wait_request_id,
1101                            operation_ids: owned_ids,
1102                            state: WaitAllFutureState::Ready(Some(Err(err))),
1103                        });
1104                    }
1105                };
1106
1107                let satisfied = transition.effects.iter().find_map(|effect| match effect {
1108                    OpsLifecycleEffect::WaitAllSatisfied {
1109                        wait_request_id,
1110                        operation_ids,
1111                    } => Some(WaitAllSatisfied {
1112                        wait_request_id: wait_request_id.clone(),
1113                        operation_ids: operation_ids.clone(),
1114                    }),
1115                    _ => None,
1116                });
1117
1118                state.execute_effects(&transition.effects);
1119
1120                if let Some(satisfied) = satisfied {
1121                    WaitAllFutureState::Ready(Some(state.collect_wait_outcomes(&owned_ids).map(
1122                        |outcomes| WaitAllResult {
1123                            outcomes,
1124                            satisfied,
1125                        },
1126                    )))
1127                } else {
1128                    if state.pending_wait.is_some() {
1129                        return Box::pin(WaitAllFuture {
1130                            registry: self,
1131                            wait_request_id,
1132                            operation_ids: owned_ids,
1133                            state: WaitAllFutureState::Ready(Some(Err(
1134                                OpsLifecycleError::Internal(
1135                                    "wait_all started while a pending wait sender already existed"
1136                                        .into(),
1137                                ),
1138                            ))),
1139                        });
1140                    }
1141                    let (sender, receiver) = tokio::sync::oneshot::channel();
1142                    state.pending_wait = Some(PendingWaitState {
1143                        wait_request_id: wait_request_id.clone(),
1144                        sender,
1145                    });
1146                    WaitAllFutureState::Waiting(receiver)
1147                }
1148            }
1149            Err(err) => WaitAllFutureState::Ready(Some(Err(err))),
1150        };
1151
1152        Box::pin(WaitAllFuture {
1153            registry: self,
1154            wait_request_id,
1155            operation_ids: owned_ids,
1156            state,
1157        })
1158    }
1159}
1160
1161#[cfg(test)]
1162#[allow(clippy::unwrap_used, clippy::panic)]
1163mod tests {
1164    use super::*;
1165    use meerkat_core::comms::TrustedPeerSpec;
1166    use meerkat_core::lifecycle::RunId;
1167    use meerkat_core::ops_lifecycle::{OperationKind, OpsLifecycleRegistry};
1168    use meerkat_core::types::SessionId;
1169    use uuid::Uuid;
1170
1171    fn test_run_id() -> RunId {
1172        RunId(Uuid::from_u128(1))
1173    }
1174
1175    fn background_spec(name: &str) -> OperationSpec {
1176        OperationSpec {
1177            id: OperationId::new(),
1178            kind: OperationKind::BackgroundToolOp,
1179            owner_session_id: SessionId::new(),
1180            display_name: name.into(),
1181            source_label: "test".into(),
1182            child_session_id: None,
1183            expect_peer_channel: false,
1184        }
1185    }
1186
1187    #[tokio::test]
1188    async fn late_watchers_resolve_immediately() {
1189        let registry = RuntimeOpsLifecycleRegistry::new();
1190        let spec = background_spec("late");
1191        let op_id = spec.id.clone();
1192        registry.register_operation(spec).unwrap();
1193        registry.provisioning_succeeded(&op_id).unwrap();
1194        registry
1195            .complete_operation(
1196                &op_id,
1197                OperationResult {
1198                    id: op_id.clone(),
1199                    content: "done".into(),
1200                    is_error: false,
1201                    duration_ms: 1,
1202                    tokens_used: 0,
1203                },
1204            )
1205            .unwrap();
1206
1207        let watch = registry.register_watcher(&op_id).unwrap();
1208        match watch.wait().await {
1209            OperationTerminalOutcome::Completed(result) => assert_eq!(result.content, "done"),
1210            other => panic!("expected completed outcome, got {other:?}"),
1211        }
1212    }
1213
1214    #[test]
1215    fn peer_ready_requires_peer_expectation() {
1216        let registry = RuntimeOpsLifecycleRegistry::new();
1217        let spec = background_spec("no-peer");
1218        let op_id = spec.id.clone();
1219        registry.register_operation(spec).unwrap();
1220        registry.provisioning_succeeded(&op_id).unwrap();
1221
1222        let result = registry.peer_ready(
1223            &op_id,
1224            OperationPeerHandle {
1225                peer_name: "peer".into(),
1226                trusted_peer: TrustedPeerSpec::new("peer", "peer-id", "inproc://peer").unwrap(),
1227            },
1228        );
1229        assert!(matches!(result, Err(OpsLifecycleError::PeerNotExpected(_))));
1230    }
1231
1232    #[tokio::test]
1233    async fn multi_listener_completion() {
1234        let registry = RuntimeOpsLifecycleRegistry::new();
1235        let spec = background_spec("multi");
1236        let op_id = spec.id.clone();
1237        registry.register_operation(spec).unwrap();
1238        registry.provisioning_succeeded(&op_id).unwrap();
1239
1240        let watch1 = registry.register_watcher(&op_id).unwrap();
1241        let watch2 = registry.register_watcher(&op_id).unwrap();
1242        let watch3 = registry.register_watcher(&op_id).unwrap();
1243
1244        registry
1245            .complete_operation(
1246                &op_id,
1247                OperationResult {
1248                    id: op_id.clone(),
1249                    content: "multi-done".into(),
1250                    is_error: false,
1251                    duration_ms: 1,
1252                    tokens_used: 0,
1253                },
1254            )
1255            .unwrap();
1256
1257        for watch in [watch1, watch2, watch3] {
1258            match watch.wait().await {
1259                OperationTerminalOutcome::Completed(result) => {
1260                    assert_eq!(result.content, "multi-done");
1261                }
1262                other => panic!("expected completed, got {other:?}"),
1263            }
1264        }
1265    }
1266
1267    #[tokio::test]
1268    async fn wait_all_returns_all_outcomes() {
1269        let registry = RuntimeOpsLifecycleRegistry::new();
1270
1271        let spec_a = background_spec("a");
1272        let id_a = spec_a.id.clone();
1273        registry.register_operation(spec_a).unwrap();
1274        registry.provisioning_succeeded(&id_a).unwrap();
1275
1276        let spec_b = background_spec("b");
1277        let id_b = spec_b.id.clone();
1278        registry.register_operation(spec_b).unwrap();
1279        registry.provisioning_succeeded(&id_b).unwrap();
1280
1281        registry
1282            .complete_operation(
1283                &id_a,
1284                OperationResult {
1285                    id: id_a.clone(),
1286                    content: "a-done".into(),
1287                    is_error: false,
1288                    duration_ms: 1,
1289                    tokens_used: 0,
1290                },
1291            )
1292            .unwrap();
1293        registry.fail_operation(&id_b, "b-error".into()).unwrap();
1294
1295        let wait_result = registry
1296            .wait_all(&test_run_id(), &[id_a.clone(), id_b.clone()])
1297            .await
1298            .unwrap();
1299        assert_eq!(wait_result.outcomes.len(), 2);
1300        assert_eq!(wait_result.outcomes[0].0, id_a);
1301        assert!(matches!(
1302            wait_result.outcomes[0].1,
1303            OperationTerminalOutcome::Completed(_)
1304        ));
1305        assert_eq!(wait_result.outcomes[1].0, id_b);
1306        assert!(matches!(
1307            wait_result.outcomes[1].1,
1308            OperationTerminalOutcome::Failed { .. }
1309        ));
1310        // Authority-derived obligation carries the awaited IDs
1311        assert_eq!(wait_result.satisfied.operation_ids.len(), 2);
1312        assert_ne!(wait_result.satisfied.wait_request_id.to_string(), "");
1313    }
1314
1315    /// Exercises the trait `wait_all` path (via `dyn OpsLifecycleRegistry`)
1316    /// which must submit WaitAll through the authority for cross-machine handoff.
1317    #[tokio::test]
1318    async fn wait_all_trait_path_submits_through_authority() {
1319        let registry = RuntimeOpsLifecycleRegistry::new();
1320        let spec = background_spec("trait-wait");
1321        let op_id = spec.id.clone();
1322        registry.register_operation(spec).unwrap();
1323        registry.provisioning_succeeded(&op_id).unwrap();
1324        registry
1325            .complete_operation(
1326                &op_id,
1327                OperationResult {
1328                    id: op_id.clone(),
1329                    content: "done".into(),
1330                    is_error: false,
1331                    duration_ms: 1,
1332                    tokens_used: 0,
1333                },
1334            )
1335            .unwrap();
1336
1337        // Call through trait object to exercise the trait impl, not the inherent method.
1338        let trait_ref: &dyn OpsLifecycleRegistry = &registry;
1339        let wait_result = trait_ref
1340            .wait_all(&test_run_id(), std::slice::from_ref(&op_id))
1341            .await
1342            .unwrap();
1343        assert_eq!(wait_result.outcomes.len(), 1);
1344        assert!(matches!(
1345            wait_result.outcomes[0].1,
1346            OperationTerminalOutcome::Completed(_)
1347        ));
1348        // Obligation carries the validated ID
1349        assert_eq!(wait_result.satisfied.operation_ids, vec![op_id]);
1350        assert_ne!(wait_result.satisfied.wait_request_id.to_string(), "");
1351    }
1352
1353    #[tokio::test]
1354    async fn wait_all_resolves_from_authority_owned_wait_request() {
1355        let registry = RuntimeOpsLifecycleRegistry::new();
1356        let run_id = test_run_id();
1357
1358        let spec = background_spec("pending");
1359        let op_id = spec.id.clone();
1360        registry.register_operation(spec).unwrap();
1361        registry.provisioning_succeeded(&op_id).unwrap();
1362
1363        let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
1364        tokio::pin!(wait_fut);
1365        assert!(
1366            tokio::time::timeout(std::time::Duration::from_millis(10), &mut wait_fut)
1367                .await
1368                .is_err()
1369        );
1370
1371        let active_wait_request_id = {
1372            let state = registry.read_state().unwrap();
1373            let wait_request_id = match state.authority.wait_request_id().cloned() {
1374                Some(wait_request_id) => wait_request_id,
1375                None => panic!("wait request should be active"),
1376            };
1377            assert_eq!(
1378                state.authority.wait_operation_ids(),
1379                std::slice::from_ref(&op_id)
1380            );
1381            wait_request_id
1382        };
1383
1384        registry
1385            .complete_operation(
1386                &op_id,
1387                OperationResult {
1388                    id: op_id.clone(),
1389                    content: "done".into(),
1390                    is_error: false,
1391                    duration_ms: 1,
1392                    tokens_used: 0,
1393                },
1394            )
1395            .unwrap();
1396
1397        let wait_result = wait_fut.await.unwrap();
1398        assert_eq!(
1399            wait_result.satisfied.wait_request_id,
1400            active_wait_request_id
1401        );
1402        assert_eq!(wait_result.satisfied.operation_ids, vec![op_id.clone()]);
1403        assert!(matches!(
1404            wait_result.outcomes.as_slice(),
1405            [(returned_id, OperationTerminalOutcome::Completed(_))] if *returned_id == op_id
1406        ));
1407        assert!(
1408            registry
1409                .read_state()
1410                .unwrap()
1411                .authority
1412                .wait_request_id()
1413                .is_none()
1414        );
1415    }
1416
1417    #[tokio::test]
1418    async fn dropping_wait_all_future_cancels_active_wait_request() {
1419        let registry = RuntimeOpsLifecycleRegistry::new();
1420        let run_id = test_run_id();
1421
1422        let spec = background_spec("cancelled-wait");
1423        let op_id = spec.id.clone();
1424        registry.register_operation(spec).unwrap();
1425        registry.provisioning_succeeded(&op_id).unwrap();
1426
1427        let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
1428        drop(wait_fut);
1429
1430        let state = registry.read_state().unwrap();
1431        assert!(state.authority.wait_request_id().is_none());
1432        assert!(state.authority.wait_operation_ids().is_empty());
1433    }
1434
1435    #[test]
1436    fn collect_completed_drains_terminal_operations() {
1437        let registry = RuntimeOpsLifecycleRegistry::new();
1438
1439        let spec_a = background_spec("a");
1440        let id_a = spec_a.id.clone();
1441        registry.register_operation(spec_a).unwrap();
1442        registry.provisioning_succeeded(&id_a).unwrap();
1443        registry
1444            .complete_operation(
1445                &id_a,
1446                OperationResult {
1447                    id: id_a.clone(),
1448                    content: "done".into(),
1449                    is_error: false,
1450                    duration_ms: 1,
1451                    tokens_used: 0,
1452                },
1453            )
1454            .unwrap();
1455
1456        let spec_b = background_spec("b");
1457        let id_b = spec_b.id.clone();
1458        registry.register_operation(spec_b).unwrap();
1459
1460        let collected = registry.collect_completed().unwrap();
1461        assert_eq!(collected.len(), 1);
1462        assert_eq!(collected[0].0, id_a);
1463
1464        assert!(registry.snapshot(&id_a).is_none());
1465        assert!(registry.snapshot(&id_b).is_some());
1466
1467        let collected2 = registry.collect_completed().unwrap();
1468        assert!(collected2.is_empty());
1469    }
1470
1471    #[test]
1472    fn bounded_completed_retention_evicts_oldest() {
1473        let registry = RuntimeOpsLifecycleRegistry::with_config(OpsLifecycleConfig {
1474            max_completed: 3,
1475            max_concurrent: None,
1476        });
1477
1478        let mut ids = Vec::new();
1479        for i in 0..5 {
1480            let spec = background_spec(&format!("op-{i}"));
1481            let id = spec.id.clone();
1482            registry.register_operation(spec).unwrap();
1483            registry.provisioning_succeeded(&id).unwrap();
1484            registry
1485                .complete_operation(
1486                    &id,
1487                    OperationResult {
1488                        id: id.clone(),
1489                        content: format!("done-{i}"),
1490                        is_error: false,
1491                        duration_ms: 1,
1492                        tokens_used: 0,
1493                    },
1494                )
1495                .unwrap();
1496            ids.push(id);
1497        }
1498
1499        assert!(registry.snapshot(&ids[0]).is_none());
1500        assert!(registry.snapshot(&ids[1]).is_none());
1501        assert!(registry.snapshot(&ids[2]).is_some());
1502        assert!(registry.snapshot(&ids[3]).is_some());
1503        assert!(registry.snapshot(&ids[4]).is_some());
1504    }
1505
1506    #[test]
1507    fn max_concurrent_enforcement() {
1508        let registry = RuntimeOpsLifecycleRegistry::with_config(OpsLifecycleConfig {
1509            max_completed: DEFAULT_MAX_COMPLETED,
1510            max_concurrent: Some(2),
1511        });
1512
1513        let spec_a = background_spec("a");
1514        let id_a = spec_a.id.clone();
1515        registry.register_operation(spec_a).unwrap();
1516
1517        let spec_b = background_spec("b");
1518        registry.register_operation(spec_b).unwrap();
1519
1520        let spec_c = background_spec("c");
1521        let result = registry.register_operation(spec_c);
1522        assert!(matches!(
1523            result,
1524            Err(OpsLifecycleError::MaxConcurrentExceeded {
1525                limit: 2,
1526                active: 2,
1527            })
1528        ));
1529
1530        registry.provisioning_succeeded(&id_a).unwrap();
1531        registry
1532            .complete_operation(
1533                &id_a,
1534                OperationResult {
1535                    id: id_a.clone(),
1536                    content: "done".into(),
1537                    is_error: false,
1538                    duration_ms: 1,
1539                    tokens_used: 0,
1540                },
1541            )
1542            .unwrap();
1543
1544        let spec_d = background_spec("d");
1545        assert!(registry.register_operation(spec_d).is_ok());
1546    }
1547
1548    #[test]
1549    fn snapshot_includes_timestamps() {
1550        let registry = RuntimeOpsLifecycleRegistry::new();
1551        let spec = background_spec("timed");
1552        let op_id = spec.id.clone();
1553        registry.register_operation(spec).unwrap();
1554
1555        let snap1 = registry.snapshot(&op_id).unwrap();
1556        assert!(snap1.created_at_ms > 0);
1557        assert!(snap1.started_at_ms.is_none());
1558        assert!(snap1.completed_at_ms.is_none());
1559        assert!(snap1.elapsed_ms.is_none());
1560
1561        registry.provisioning_succeeded(&op_id).unwrap();
1562        let snap2 = registry.snapshot(&op_id).unwrap();
1563        assert!(snap2.started_at_ms.is_some());
1564        assert!(snap2.started_at_ms.unwrap() >= snap2.created_at_ms);
1565
1566        registry
1567            .complete_operation(
1568                &op_id,
1569                OperationResult {
1570                    id: op_id.clone(),
1571                    content: "done".into(),
1572                    is_error: false,
1573                    duration_ms: 1,
1574                    tokens_used: 0,
1575                },
1576            )
1577            .unwrap();
1578        let snap3 = registry.snapshot(&op_id).unwrap();
1579        assert!(snap3.completed_at_ms.is_some());
1580        assert!(snap3.elapsed_ms.is_some());
1581        assert!(snap3.completed_at_ms.unwrap() >= snap3.started_at_ms.unwrap());
1582    }
1583
1584    #[test]
1585    fn snapshot_includes_peer_handle() {
1586        let registry = RuntimeOpsLifecycleRegistry::new();
1587        let spec = OperationSpec {
1588            id: OperationId::new(),
1589            kind: OperationKind::MobMemberChild,
1590            owner_session_id: SessionId::new(),
1591            display_name: "peer-test".into(),
1592            source_label: "test".into(),
1593            child_session_id: Some(SessionId::new()),
1594            expect_peer_channel: true,
1595        };
1596        let op_id = spec.id.clone();
1597        registry.register_operation(spec).unwrap();
1598        registry.provisioning_succeeded(&op_id).unwrap();
1599
1600        let snap1 = registry.snapshot(&op_id).unwrap();
1601        assert!(snap1.peer_handle.is_none());
1602
1603        let handle = OperationPeerHandle {
1604            peer_name: "member-x".into(),
1605            trusted_peer: TrustedPeerSpec::new("member-x", "peer-id", "inproc://x").unwrap(),
1606        };
1607        registry.peer_ready(&op_id, handle).unwrap();
1608
1609        let snap2 = registry.snapshot(&op_id).unwrap();
1610        assert_eq!(snap2.peer_handle.as_ref().unwrap().peer_name, "member-x");
1611    }
1612}