Skip to main content

meerkat_runtime/
ops_lifecycle.rs

1//! In-memory runtime implementation of the shared async-operation lifecycle seam.
2//!
3//! All canonical lifecycle state mutations are delegated to
4//! [`OpsLifecycleAuthority`] via [`OpsLifecycleMutator::apply`]. This shell
5//! layer owns I/O concerns: watcher channels, timestamps, peer handles, and
6//! snapshot assembly.
7
8use std::collections::HashMap;
9use std::future::Future;
10use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
11use std::task::{Context, Poll};
12
13#[cfg(target_arch = "wasm32")]
14use crate::tokio;
15use meerkat_core::lifecycle::{RunId, WaitRequestId};
16use meerkat_core::ops_lifecycle::{
17    DEFAULT_MAX_COMPLETED, OperationCompletionWatch, OperationId, OperationLifecycleSnapshot,
18    OperationPeerHandle, OperationProgressUpdate, OperationResult, OperationSpec,
19    OperationTerminalOutcome, OpsLifecycleError, OpsLifecycleRegistry, WaitAllResult,
20    WaitAllSatisfied,
21};
22use meerkat_core::time_compat::{Instant, SystemTime, UNIX_EPOCH};
23
24use crate::ops_lifecycle_authority::{
25    OpsLifecycleAuthority, OpsLifecycleEffect, OpsLifecycleInput, OpsLifecycleMutator,
26};
27
28// ---------------------------------------------------------------------------
29// Shell-only per-operation record (not part of canonical machine state)
30// ---------------------------------------------------------------------------
31
32/// Shell-owned data for a single operation. Canonical lifecycle state lives in
33/// the authority; this struct holds I/O concerns that the authority has no
34/// knowledge of.
35#[derive(Debug)]
36struct ShellRecord {
37    spec: OperationSpec,
38    peer_handle: Option<OperationPeerHandle>,
39    watchers: Vec<tokio::sync::oneshot::Sender<OperationTerminalOutcome>>,
40    // Monotonic timestamps for elapsed computation
41    created_at: Instant,
42    started_at: Option<Instant>,
43    completed_at: Option<Instant>,
44    // Wall-clock anchor captured at creation for epoch millis
45    created_at_wall: SystemTime,
46}
47
48#[derive(Debug)]
49struct PendingWaitState {
50    wait_request_id: WaitRequestId,
51    sender: tokio::sync::oneshot::Sender<WaitAllSatisfied>,
52}
53
54impl ShellRecord {
55    fn new(spec: OperationSpec) -> Self {
56        Self {
57            spec,
58            peer_handle: None,
59            watchers: Vec::new(),
60            created_at: Instant::now(),
61            started_at: None,
62            completed_at: None,
63            created_at_wall: SystemTime::now(),
64        }
65    }
66
67    fn epoch_millis(wall_anchor: &SystemTime) -> u64 {
68        wall_anchor
69            .duration_since(UNIX_EPOCH)
70            .map(|d| d.as_millis() as u64)
71            .unwrap_or(0)
72    }
73
74    fn epoch_millis_for_instant(&self, instant: Instant) -> u64 {
75        // Compute wall time for a given instant using the wall-clock anchor:
76        // wall_time = created_at_wall + (instant - created_at)
77        let offset = instant.saturating_duration_since(self.created_at);
78        let wall = self.created_at_wall + offset;
79        Self::epoch_millis(&wall)
80    }
81
82    /// Notify all watchers with the given terminal outcome and drain the list.
83    fn notify_watchers(&mut self, outcome: &OperationTerminalOutcome) {
84        for watcher in std::mem::take(&mut self.watchers) {
85            let _ = watcher.send(outcome.clone());
86        }
87    }
88
89    /// Mark the completion timestamp.
90    fn mark_completed(&mut self) {
91        self.completed_at = Some(Instant::now());
92    }
93}
94
95// ---------------------------------------------------------------------------
96// Combined shell state: authority + shell records
97// ---------------------------------------------------------------------------
98
99#[derive(Debug)]
100struct ShellState {
101    authority: OpsLifecycleAuthority,
102    records: HashMap<OperationId, ShellRecord>,
103    pending_wait: Option<PendingWaitState>,
104}
105
106impl ShellState {
107    fn new(max_completed: usize, max_concurrent: Option<usize>) -> Self {
108        Self {
109            authority: OpsLifecycleAuthority::new(max_completed, max_concurrent),
110            records: HashMap::new(),
111            pending_wait: None,
112        }
113    }
114
115    /// Build a snapshot by combining authority canonical state with shell data.
116    fn snapshot(&self, id: &OperationId) -> Option<OperationLifecycleSnapshot> {
117        let canonical = self.authority.operation(id)?;
118        let shell = self.records.get(id)?;
119
120        let created_at_ms = ShellRecord::epoch_millis(&shell.created_at_wall);
121        let started_at_ms = shell.started_at.map(|i| shell.epoch_millis_for_instant(i));
122        let completed_at_ms = shell
123            .completed_at
124            .map(|i| shell.epoch_millis_for_instant(i));
125        let elapsed_ms = shell.completed_at.map(|completed| {
126            completed
127                .saturating_duration_since(shell.created_at)
128                .as_millis() as u64
129        });
130
131        Some(OperationLifecycleSnapshot {
132            id: shell.spec.id.clone(),
133            kind: canonical.kind(),
134            display_name: shell.spec.display_name.clone(),
135            status: canonical.status(),
136            peer_ready: canonical.peer_ready(),
137            progress_count: canonical.progress_count(),
138            watcher_count: shell.watchers.len() as u32,
139            terminal_outcome: canonical.terminal_outcome().cloned(),
140            child_session_id: shell.spec.child_session_id.clone(),
141            peer_handle: shell.peer_handle.clone(),
142            created_at_ms,
143            started_at_ms,
144            completed_at_ms,
145            elapsed_ms,
146        })
147    }
148
149    /// Execute authority effects on shell state.
150    ///
151    /// **Important:** callers must patch the real terminal outcome on the
152    /// authority (via `patch_terminal_outcome`) *before* calling this method.
153    /// `NotifyOpWatcher` effects read the patched outcome from the authority
154    /// rather than using the placeholder embedded in the effect.
155    fn execute_effects(&mut self, effects: &[OpsLifecycleEffect]) {
156        for effect in effects {
157            match effect {
158                OpsLifecycleEffect::NotifyOpWatcher { operation_id, .. } => {
159                    // Read the real (patched) outcome from the authority.
160                    let outcome = self
161                        .authority
162                        .operation(operation_id)
163                        .and_then(|op| op.terminal_outcome().cloned());
164                    if let Some(outcome) = outcome
165                        && let Some(shell) = self.records.get_mut(operation_id)
166                    {
167                        let watcher_count = shell.watchers.len() as u32;
168                        shell.notify_watchers(&outcome);
169                        shell.mark_completed();
170                        self.authority.watchers_drained(operation_id, watcher_count);
171                    }
172                }
173                OpsLifecycleEffect::ExposeOperationPeer { .. } => {
174                    // Peer handle is stored in shell record by the calling method
175                    // after authority.apply() succeeds. Nothing else to do here.
176                }
177                OpsLifecycleEffect::RetainTerminalRecord { .. } => {
178                    // The authority handles completed_order tracking internally.
179                    // Shell record stays in place until evicted.
180                }
181                OpsLifecycleEffect::EvictCompletedRecord { operation_id } => {
182                    self.records.remove(operation_id);
183                    self.authority.remove_operation(operation_id);
184                }
185                OpsLifecycleEffect::SubmitOpEvent { .. } => {
186                    // Future: emit observability events. Currently a no-op.
187                }
188                OpsLifecycleEffect::WaitAllSatisfied {
189                    wait_request_id,
190                    operation_ids,
191                } => {
192                    if let Some(pending_wait) = self.pending_wait.take() {
193                        if pending_wait.wait_request_id == *wait_request_id {
194                            let _ = pending_wait.sender.send(WaitAllSatisfied {
195                                wait_request_id: wait_request_id.clone(),
196                                operation_ids: operation_ids.clone(),
197                            });
198                        } else {
199                            self.pending_wait = Some(pending_wait);
200                        }
201                    }
202                }
203            }
204        }
205    }
206
207    fn shell_record_mut(
208        &mut self,
209        id: &OperationId,
210    ) -> Result<&mut ShellRecord, OpsLifecycleError> {
211        self.records
212            .get_mut(id)
213            .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))
214    }
215
216    fn collect_wait_outcomes(
217        &self,
218        operation_ids: &[OperationId],
219    ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
220        operation_ids
221            .iter()
222            .map(|operation_id| {
223                let outcome = self
224                    .authority
225                    .operation(operation_id)
226                    .and_then(|op| op.terminal_outcome().cloned())
227                    .ok_or_else(|| {
228                        OpsLifecycleError::Internal(format!(
229                            "wait_all completed without terminal outcome for {operation_id}"
230                        ))
231                    })?;
232                Ok((operation_id.clone(), outcome))
233            })
234            .collect()
235    }
236}
237
238impl Default for ShellState {
239    fn default() -> Self {
240        Self::new(DEFAULT_MAX_COMPLETED, None)
241    }
242}
243
244// ---------------------------------------------------------------------------
245// Public configuration & registry
246// ---------------------------------------------------------------------------
247
248/// Configuration for [`RuntimeOpsLifecycleRegistry`].
249#[derive(Debug, Clone)]
250pub struct OpsLifecycleConfig {
251    /// Maximum number of completed operations to retain (default: 256).
252    pub max_completed: usize,
253    /// Maximum concurrent non-terminal operations (None = unlimited).
254    pub max_concurrent: Option<usize>,
255}
256
257impl Default for OpsLifecycleConfig {
258    fn default() -> Self {
259        Self {
260            max_completed: DEFAULT_MAX_COMPLETED,
261            max_concurrent: None,
262        }
263    }
264}
265
266/// Per-runtime shared registry for async operation lifecycle truth.
267///
268/// All canonical lifecycle state mutations are delegated to
269/// [`OpsLifecycleAuthority`]. This shell manages I/O concerns: watcher
270/// channels, timestamps, peer handles, and snapshot assembly.
271#[derive(Debug)]
272pub struct RuntimeOpsLifecycleRegistry {
273    state: RwLock<ShellState>,
274}
275
276impl Default for RuntimeOpsLifecycleRegistry {
277    fn default() -> Self {
278        Self {
279            state: RwLock::new(ShellState::default()),
280        }
281    }
282}
283
284impl RuntimeOpsLifecycleRegistry {
285    pub fn new() -> Self {
286        Self::default()
287    }
288
289    pub fn with_config(config: OpsLifecycleConfig) -> Self {
290        Self {
291            state: RwLock::new(ShellState::new(config.max_completed, config.max_concurrent)),
292        }
293    }
294
295    fn read_state(&self) -> Result<RwLockReadGuard<'_, ShellState>, OpsLifecycleError> {
296        self.state
297            .read()
298            .map_err(|_| OpsLifecycleError::Internal("ops lifecycle registry poisoned".into()))
299    }
300
301    fn write_state(&self) -> Result<RwLockWriteGuard<'_, ShellState>, OpsLifecycleError> {
302        self.state
303            .write()
304            .map_err(|_| OpsLifecycleError::Internal("ops lifecycle registry poisoned".into()))
305    }
306
307    fn cancel_wait_all_internal(
308        &self,
309        wait_request_id: &WaitRequestId,
310    ) -> Result<(), OpsLifecycleError> {
311        let mut state = self.write_state()?;
312        match state.authority.apply(OpsLifecycleInput::CancelWaitAll {
313            wait_request_id: wait_request_id.clone(),
314        }) {
315            Ok(_) => {
316                state.pending_wait = None;
317                Ok(())
318            }
319            Err(OpsLifecycleError::WaitNotActive(_)) => {
320                state.pending_wait = None;
321                Ok(())
322            }
323            Err(err) => Err(err),
324        }
325    }
326}
327
328enum WaitAllFutureState {
329    Ready(Option<Result<WaitAllResult, OpsLifecycleError>>),
330    Waiting(tokio::sync::oneshot::Receiver<WaitAllSatisfied>),
331    Done,
332}
333
334struct WaitAllFuture<'a> {
335    registry: &'a RuntimeOpsLifecycleRegistry,
336    wait_request_id: WaitRequestId,
337    operation_ids: Vec<OperationId>,
338    state: WaitAllFutureState,
339}
340
341impl Future for WaitAllFuture<'_> {
342    type Output = Result<WaitAllResult, OpsLifecycleError>;
343
344    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
345        match &mut self.state {
346            WaitAllFutureState::Ready(result) => {
347                let ready = result.take().unwrap_or_else(|| {
348                    Err(OpsLifecycleError::Internal(
349                        "wait_all future polled after completion".into(),
350                    ))
351                });
352                self.state = WaitAllFutureState::Done;
353                Poll::Ready(ready)
354            }
355            WaitAllFutureState::Waiting(receiver) => match std::pin::Pin::new(receiver).poll(cx) {
356                Poll::Pending => Poll::Pending,
357                Poll::Ready(Ok(satisfied)) => {
358                    let outcomes = match self.registry.read_state() {
359                        Ok(state) => state.collect_wait_outcomes(&self.operation_ids),
360                        Err(err) => Err(err),
361                    };
362                    self.state = WaitAllFutureState::Done;
363                    Poll::Ready(outcomes.map(|outcomes| WaitAllResult {
364                        outcomes,
365                        satisfied,
366                    }))
367                }
368                Poll::Ready(Err(_)) => {
369                    self.state = WaitAllFutureState::Done;
370                    Poll::Ready(Err(OpsLifecycleError::Internal(
371                        "wait_all completion channel dropped".into(),
372                    )))
373                }
374            },
375            WaitAllFutureState::Done => Poll::Ready(Err(OpsLifecycleError::Internal(
376                "wait_all future polled after completion".into(),
377            ))),
378        }
379    }
380}
381
382impl Drop for WaitAllFuture<'_> {
383    fn drop(&mut self) {
384        if matches!(self.state, WaitAllFutureState::Waiting(_)) {
385            let _ = self
386                .registry
387                .cancel_wait_all_internal(&self.wait_request_id);
388        }
389    }
390}
391
392impl OpsLifecycleRegistry for RuntimeOpsLifecycleRegistry {
393    fn register_operation(&self, spec: OperationSpec) -> Result<(), OpsLifecycleError> {
394        let mut state = self.write_state()?;
395        let operation_id = spec.id.clone();
396        let kind = spec.kind;
397
398        // Delegate to authority for guard checks and canonical state insertion.
399        let transition = state
400            .authority
401            .apply(OpsLifecycleInput::RegisterOperation {
402                operation_id: operation_id.clone(),
403                kind,
404            })?;
405
406        // Insert shell record.
407        state.records.insert(operation_id, ShellRecord::new(spec));
408
409        // Execute effects (none expected for register, but be correct).
410        state.execute_effects(&transition.effects);
411        Ok(())
412    }
413
414    fn provisioning_succeeded(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
415        let mut state = self.write_state()?;
416
417        let transition = state
418            .authority
419            .apply(OpsLifecycleInput::ProvisioningSucceeded {
420                operation_id: id.clone(),
421            })?;
422
423        // Shell concern: record the started timestamp.
424        if let Some(shell) = state.records.get_mut(id) {
425            shell.started_at = Some(Instant::now());
426        }
427
428        state.execute_effects(&transition.effects);
429        Ok(())
430    }
431
432    fn provisioning_failed(
433        &self,
434        id: &OperationId,
435        error: String,
436    ) -> Result<(), OpsLifecycleError> {
437        let mut state = self.write_state()?;
438
439        let transition = state
440            .authority
441            .apply(OpsLifecycleInput::ProvisioningFailed {
442                operation_id: id.clone(),
443            })?;
444
445        // Patch the real terminal outcome (authority uses placeholder).
446        state
447            .authority
448            .patch_terminal_outcome(id, OperationTerminalOutcome::Failed { error });
449
450        state.execute_effects(&transition.effects);
451        Ok(())
452    }
453
454    fn peer_ready(
455        &self,
456        id: &OperationId,
457        peer: OperationPeerHandle,
458    ) -> Result<(), OpsLifecycleError> {
459        let mut state = self.write_state()?;
460
461        let transition = state.authority.apply(OpsLifecycleInput::PeerReady {
462            operation_id: id.clone(),
463        })?;
464
465        // Shell concern: store the peer handle.
466        if let Some(shell) = state.records.get_mut(id) {
467            shell.peer_handle = Some(peer);
468        }
469
470        state.execute_effects(&transition.effects);
471        Ok(())
472    }
473
474    fn register_watcher(
475        &self,
476        id: &OperationId,
477    ) -> Result<OperationCompletionWatch, OpsLifecycleError> {
478        let mut state = self.write_state()?;
479
480        // Check authority for terminal outcome first (already-resolved path).
481        let canonical = state
482            .authority
483            .operation(id)
484            .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
485
486        if let Some(outcome) = canonical.terminal_outcome() {
487            return Ok(OperationCompletionWatch::already_resolved(outcome.clone()));
488        }
489
490        // Delegate to authority for watcher_count bookkeeping.
491        let _transition = state.authority.apply(OpsLifecycleInput::RegisterWatcher {
492            operation_id: id.clone(),
493        })?;
494
495        // Shell concern: create the channel and store the sender.
496        let shell = state.shell_record_mut(id)?;
497        let (tx, watch) = OperationCompletionWatch::channel();
498        shell.watchers.push(tx);
499        Ok(watch)
500    }
501
502    fn report_progress(
503        &self,
504        id: &OperationId,
505        _update: OperationProgressUpdate,
506    ) -> Result<(), OpsLifecycleError> {
507        let mut state = self.write_state()?;
508
509        let transition = state.authority.apply(OpsLifecycleInput::ProgressReported {
510            operation_id: id.clone(),
511        })?;
512
513        state.execute_effects(&transition.effects);
514        Ok(())
515    }
516
517    fn complete_operation(
518        &self,
519        id: &OperationId,
520        result: OperationResult,
521    ) -> Result<(), OpsLifecycleError> {
522        let mut state = self.write_state()?;
523
524        let transition = state
525            .authority
526            .apply(OpsLifecycleInput::CompleteOperation {
527                operation_id: id.clone(),
528            })?;
529
530        // Patch the real terminal outcome (authority uses placeholder).
531        state
532            .authority
533            .patch_terminal_outcome(id, OperationTerminalOutcome::Completed(result));
534
535        state.execute_effects(&transition.effects);
536        Ok(())
537    }
538
539    fn fail_operation(&self, id: &OperationId, error: String) -> Result<(), OpsLifecycleError> {
540        let mut state = self.write_state()?;
541
542        let transition = state.authority.apply(OpsLifecycleInput::FailOperation {
543            operation_id: id.clone(),
544        })?;
545
546        // Patch the real terminal outcome.
547        state
548            .authority
549            .patch_terminal_outcome(id, OperationTerminalOutcome::Failed { error });
550
551        state.execute_effects(&transition.effects);
552        Ok(())
553    }
554
555    fn abort_provisioning(
556        &self,
557        id: &OperationId,
558        reason: Option<String>,
559    ) -> Result<(), OpsLifecycleError> {
560        let mut state = self.write_state()?;
561
562        let transition = state
563            .authority
564            .apply(OpsLifecycleInput::AbortProvisioning {
565                operation_id: id.clone(),
566            })?;
567
568        state
569            .authority
570            .patch_terminal_outcome(id, OperationTerminalOutcome::Aborted { reason });
571
572        state.execute_effects(&transition.effects);
573        Ok(())
574    }
575
576    fn cancel_operation(
577        &self,
578        id: &OperationId,
579        reason: Option<String>,
580    ) -> Result<(), OpsLifecycleError> {
581        let mut state = self.write_state()?;
582
583        let transition = state.authority.apply(OpsLifecycleInput::CancelOperation {
584            operation_id: id.clone(),
585        })?;
586
587        // Patch the real terminal outcome.
588        state
589            .authority
590            .patch_terminal_outcome(id, OperationTerminalOutcome::Cancelled { reason });
591
592        state.execute_effects(&transition.effects);
593        Ok(())
594    }
595
596    fn request_retire(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
597        let mut state = self.write_state()?;
598
599        let transition = state.authority.apply(OpsLifecycleInput::RetireRequested {
600            operation_id: id.clone(),
601        })?;
602
603        state.execute_effects(&transition.effects);
604        Ok(())
605    }
606
607    fn mark_retired(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
608        let mut state = self.write_state()?;
609
610        let transition = state.authority.apply(OpsLifecycleInput::RetireCompleted {
611            operation_id: id.clone(),
612        })?;
613
614        // Patch the real terminal outcome.
615        state
616            .authority
617            .patch_terminal_outcome(id, OperationTerminalOutcome::Retired);
618
619        state.execute_effects(&transition.effects);
620        Ok(())
621    }
622
623    fn snapshot(&self, id: &OperationId) -> Option<OperationLifecycleSnapshot> {
624        self.read_state().ok().and_then(|state| state.snapshot(id))
625    }
626
627    fn list_operations(&self) -> Vec<OperationLifecycleSnapshot> {
628        let mut snapshots = self
629            .read_state()
630            .map(|state| {
631                state
632                    .authority
633                    .operations()
634                    .filter_map(|(id, _)| state.snapshot(id))
635                    .collect::<Vec<_>>()
636            })
637            .unwrap_or_default();
638        snapshots.sort_by(|left, right| left.display_name.cmp(&right.display_name));
639        snapshots
640    }
641
642    fn terminate_owner(&self, reason: String) -> Result<(), OpsLifecycleError> {
643        let mut state = self.write_state()?;
644
645        let transition = state.authority.apply(OpsLifecycleInput::OwnerTerminated)?;
646
647        // Patch all terminal outcomes with the real reason.
648        // The authority set placeholder empty-string reasons; we patch the real
649        // reason into each newly-terminated operation.
650        for effect in &transition.effects {
651            if let OpsLifecycleEffect::NotifyOpWatcher { operation_id, .. } = effect {
652                state.authority.patch_terminal_outcome(
653                    operation_id,
654                    OperationTerminalOutcome::Terminated {
655                        reason: reason.clone(),
656                    },
657                );
658            }
659        }
660
661        state.execute_effects(&transition.effects);
662        Ok(())
663    }
664
665    fn collect_completed(
666        &self,
667    ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
668        let mut state = self.write_state()?;
669
670        let collected = state.authority.drain_completed();
671
672        // Remove corresponding shell records.
673        for (id, _) in &collected {
674            state.records.remove(id);
675        }
676
677        Ok(collected)
678    }
679
680    fn wait_all(
681        &self,
682        _run_id: &RunId,
683        ids: &[OperationId],
684    ) -> std::pin::Pin<
685        Box<dyn std::future::Future<Output = Result<WaitAllResult, OpsLifecycleError>> + Send + '_>,
686    > {
687        let wait_request_id = WaitRequestId::new();
688        let owned_ids = ids.to_vec();
689
690        let state = match self.write_state() {
691            Ok(mut state) => {
692                let transition = match state.authority.apply(OpsLifecycleInput::BeginWaitAll {
693                    wait_request_id: wait_request_id.clone(),
694                    operation_ids: owned_ids.clone(),
695                }) {
696                    Ok(transition) => transition,
697                    Err(err) => {
698                        return Box::pin(WaitAllFuture {
699                            registry: self,
700                            wait_request_id,
701                            operation_ids: owned_ids,
702                            state: WaitAllFutureState::Ready(Some(Err(err))),
703                        });
704                    }
705                };
706
707                let satisfied = transition.effects.iter().find_map(|effect| match effect {
708                    OpsLifecycleEffect::WaitAllSatisfied {
709                        wait_request_id,
710                        operation_ids,
711                    } => Some(WaitAllSatisfied {
712                        wait_request_id: wait_request_id.clone(),
713                        operation_ids: operation_ids.clone(),
714                    }),
715                    _ => None,
716                });
717
718                state.execute_effects(&transition.effects);
719
720                if let Some(satisfied) = satisfied {
721                    WaitAllFutureState::Ready(Some(state.collect_wait_outcomes(&owned_ids).map(
722                        |outcomes| WaitAllResult {
723                            outcomes,
724                            satisfied,
725                        },
726                    )))
727                } else {
728                    if state.pending_wait.is_some() {
729                        return Box::pin(WaitAllFuture {
730                            registry: self,
731                            wait_request_id,
732                            operation_ids: owned_ids,
733                            state: WaitAllFutureState::Ready(Some(Err(
734                                OpsLifecycleError::Internal(
735                                    "wait_all started while a pending wait sender already existed"
736                                        .into(),
737                                ),
738                            ))),
739                        });
740                    }
741                    let (sender, receiver) = tokio::sync::oneshot::channel();
742                    state.pending_wait = Some(PendingWaitState {
743                        wait_request_id: wait_request_id.clone(),
744                        sender,
745                    });
746                    WaitAllFutureState::Waiting(receiver)
747                }
748            }
749            Err(err) => WaitAllFutureState::Ready(Some(Err(err))),
750        };
751
752        Box::pin(WaitAllFuture {
753            registry: self,
754            wait_request_id,
755            operation_ids: owned_ids,
756            state,
757        })
758    }
759}
760
761#[cfg(test)]
762#[allow(clippy::unwrap_used, clippy::panic)]
763mod tests {
764    use super::*;
765    use meerkat_core::comms::TrustedPeerSpec;
766    use meerkat_core::lifecycle::RunId;
767    use meerkat_core::ops_lifecycle::{OperationKind, OpsLifecycleRegistry};
768    use meerkat_core::types::SessionId;
769    use uuid::Uuid;
770
771    fn test_run_id() -> RunId {
772        RunId(Uuid::from_u128(1))
773    }
774
775    fn background_spec(name: &str) -> OperationSpec {
776        OperationSpec {
777            id: OperationId::new(),
778            kind: OperationKind::BackgroundToolOp,
779            owner_session_id: SessionId::new(),
780            display_name: name.into(),
781            source_label: "test".into(),
782            child_session_id: None,
783            expect_peer_channel: false,
784        }
785    }
786
787    #[tokio::test]
788    async fn late_watchers_resolve_immediately() {
789        let registry = RuntimeOpsLifecycleRegistry::new();
790        let spec = background_spec("late");
791        let op_id = spec.id.clone();
792        registry.register_operation(spec).unwrap();
793        registry.provisioning_succeeded(&op_id).unwrap();
794        registry
795            .complete_operation(
796                &op_id,
797                OperationResult {
798                    id: op_id.clone(),
799                    content: "done".into(),
800                    is_error: false,
801                    duration_ms: 1,
802                    tokens_used: 0,
803                },
804            )
805            .unwrap();
806
807        let watch = registry.register_watcher(&op_id).unwrap();
808        match watch.wait().await {
809            OperationTerminalOutcome::Completed(result) => assert_eq!(result.content, "done"),
810            other => panic!("expected completed outcome, got {other:?}"),
811        }
812    }
813
814    #[test]
815    fn peer_ready_requires_peer_expectation() {
816        let registry = RuntimeOpsLifecycleRegistry::new();
817        let spec = background_spec("no-peer");
818        let op_id = spec.id.clone();
819        registry.register_operation(spec).unwrap();
820        registry.provisioning_succeeded(&op_id).unwrap();
821
822        let result = registry.peer_ready(
823            &op_id,
824            OperationPeerHandle {
825                peer_name: "peer".into(),
826                trusted_peer: TrustedPeerSpec::new("peer", "peer-id", "inproc://peer").unwrap(),
827            },
828        );
829        assert!(matches!(result, Err(OpsLifecycleError::PeerNotExpected(_))));
830    }
831
832    #[tokio::test]
833    async fn multi_listener_completion() {
834        let registry = RuntimeOpsLifecycleRegistry::new();
835        let spec = background_spec("multi");
836        let op_id = spec.id.clone();
837        registry.register_operation(spec).unwrap();
838        registry.provisioning_succeeded(&op_id).unwrap();
839
840        let watch1 = registry.register_watcher(&op_id).unwrap();
841        let watch2 = registry.register_watcher(&op_id).unwrap();
842        let watch3 = registry.register_watcher(&op_id).unwrap();
843
844        registry
845            .complete_operation(
846                &op_id,
847                OperationResult {
848                    id: op_id.clone(),
849                    content: "multi-done".into(),
850                    is_error: false,
851                    duration_ms: 1,
852                    tokens_used: 0,
853                },
854            )
855            .unwrap();
856
857        for watch in [watch1, watch2, watch3] {
858            match watch.wait().await {
859                OperationTerminalOutcome::Completed(result) => {
860                    assert_eq!(result.content, "multi-done");
861                }
862                other => panic!("expected completed, got {other:?}"),
863            }
864        }
865    }
866
867    #[tokio::test]
868    async fn wait_all_returns_all_outcomes() {
869        let registry = RuntimeOpsLifecycleRegistry::new();
870
871        let spec_a = background_spec("a");
872        let id_a = spec_a.id.clone();
873        registry.register_operation(spec_a).unwrap();
874        registry.provisioning_succeeded(&id_a).unwrap();
875
876        let spec_b = background_spec("b");
877        let id_b = spec_b.id.clone();
878        registry.register_operation(spec_b).unwrap();
879        registry.provisioning_succeeded(&id_b).unwrap();
880
881        registry
882            .complete_operation(
883                &id_a,
884                OperationResult {
885                    id: id_a.clone(),
886                    content: "a-done".into(),
887                    is_error: false,
888                    duration_ms: 1,
889                    tokens_used: 0,
890                },
891            )
892            .unwrap();
893        registry.fail_operation(&id_b, "b-error".into()).unwrap();
894
895        let wait_result = registry
896            .wait_all(&test_run_id(), &[id_a.clone(), id_b.clone()])
897            .await
898            .unwrap();
899        assert_eq!(wait_result.outcomes.len(), 2);
900        assert_eq!(wait_result.outcomes[0].0, id_a);
901        assert!(matches!(
902            wait_result.outcomes[0].1,
903            OperationTerminalOutcome::Completed(_)
904        ));
905        assert_eq!(wait_result.outcomes[1].0, id_b);
906        assert!(matches!(
907            wait_result.outcomes[1].1,
908            OperationTerminalOutcome::Failed { .. }
909        ));
910        // Authority-derived obligation carries the awaited IDs
911        assert_eq!(wait_result.satisfied.operation_ids.len(), 2);
912        assert_ne!(wait_result.satisfied.wait_request_id.to_string(), "");
913    }
914
915    /// Exercises the trait `wait_all` path (via `dyn OpsLifecycleRegistry`)
916    /// which must submit WaitAll through the authority for cross-machine handoff.
917    #[tokio::test]
918    async fn wait_all_trait_path_submits_through_authority() {
919        let registry = RuntimeOpsLifecycleRegistry::new();
920        let spec = background_spec("trait-wait");
921        let op_id = spec.id.clone();
922        registry.register_operation(spec).unwrap();
923        registry.provisioning_succeeded(&op_id).unwrap();
924        registry
925            .complete_operation(
926                &op_id,
927                OperationResult {
928                    id: op_id.clone(),
929                    content: "done".into(),
930                    is_error: false,
931                    duration_ms: 1,
932                    tokens_used: 0,
933                },
934            )
935            .unwrap();
936
937        // Call through trait object to exercise the trait impl, not the inherent method.
938        let trait_ref: &dyn OpsLifecycleRegistry = &registry;
939        let wait_result = trait_ref
940            .wait_all(&test_run_id(), std::slice::from_ref(&op_id))
941            .await
942            .unwrap();
943        assert_eq!(wait_result.outcomes.len(), 1);
944        assert!(matches!(
945            wait_result.outcomes[0].1,
946            OperationTerminalOutcome::Completed(_)
947        ));
948        // Obligation carries the validated ID
949        assert_eq!(wait_result.satisfied.operation_ids, vec![op_id]);
950        assert_ne!(wait_result.satisfied.wait_request_id.to_string(), "");
951    }
952
953    #[tokio::test]
954    async fn wait_all_resolves_from_authority_owned_wait_request() {
955        let registry = RuntimeOpsLifecycleRegistry::new();
956        let run_id = test_run_id();
957
958        let spec = background_spec("pending");
959        let op_id = spec.id.clone();
960        registry.register_operation(spec).unwrap();
961        registry.provisioning_succeeded(&op_id).unwrap();
962
963        let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
964        tokio::pin!(wait_fut);
965        assert!(
966            tokio::time::timeout(std::time::Duration::from_millis(10), &mut wait_fut)
967                .await
968                .is_err()
969        );
970
971        let active_wait_request_id = {
972            let state = registry.read_state().unwrap();
973            let wait_request_id = match state.authority.wait_request_id().cloned() {
974                Some(wait_request_id) => wait_request_id,
975                None => panic!("wait request should be active"),
976            };
977            assert_eq!(
978                state.authority.wait_operation_ids(),
979                std::slice::from_ref(&op_id)
980            );
981            wait_request_id
982        };
983
984        registry
985            .complete_operation(
986                &op_id,
987                OperationResult {
988                    id: op_id.clone(),
989                    content: "done".into(),
990                    is_error: false,
991                    duration_ms: 1,
992                    tokens_used: 0,
993                },
994            )
995            .unwrap();
996
997        let wait_result = wait_fut.await.unwrap();
998        assert_eq!(
999            wait_result.satisfied.wait_request_id,
1000            active_wait_request_id
1001        );
1002        assert_eq!(wait_result.satisfied.operation_ids, vec![op_id.clone()]);
1003        assert!(matches!(
1004            wait_result.outcomes.as_slice(),
1005            [(returned_id, OperationTerminalOutcome::Completed(_))] if *returned_id == op_id
1006        ));
1007        assert!(
1008            registry
1009                .read_state()
1010                .unwrap()
1011                .authority
1012                .wait_request_id()
1013                .is_none()
1014        );
1015    }
1016
1017    #[tokio::test]
1018    async fn dropping_wait_all_future_cancels_active_wait_request() {
1019        let registry = RuntimeOpsLifecycleRegistry::new();
1020        let run_id = test_run_id();
1021
1022        let spec = background_spec("cancelled-wait");
1023        let op_id = spec.id.clone();
1024        registry.register_operation(spec).unwrap();
1025        registry.provisioning_succeeded(&op_id).unwrap();
1026
1027        let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
1028        drop(wait_fut);
1029
1030        let state = registry.read_state().unwrap();
1031        assert!(state.authority.wait_request_id().is_none());
1032        assert!(state.authority.wait_operation_ids().is_empty());
1033    }
1034
1035    #[test]
1036    fn collect_completed_drains_terminal_operations() {
1037        let registry = RuntimeOpsLifecycleRegistry::new();
1038
1039        let spec_a = background_spec("a");
1040        let id_a = spec_a.id.clone();
1041        registry.register_operation(spec_a).unwrap();
1042        registry.provisioning_succeeded(&id_a).unwrap();
1043        registry
1044            .complete_operation(
1045                &id_a,
1046                OperationResult {
1047                    id: id_a.clone(),
1048                    content: "done".into(),
1049                    is_error: false,
1050                    duration_ms: 1,
1051                    tokens_used: 0,
1052                },
1053            )
1054            .unwrap();
1055
1056        let spec_b = background_spec("b");
1057        let id_b = spec_b.id.clone();
1058        registry.register_operation(spec_b).unwrap();
1059
1060        let collected = registry.collect_completed().unwrap();
1061        assert_eq!(collected.len(), 1);
1062        assert_eq!(collected[0].0, id_a);
1063
1064        assert!(registry.snapshot(&id_a).is_none());
1065        assert!(registry.snapshot(&id_b).is_some());
1066
1067        let collected2 = registry.collect_completed().unwrap();
1068        assert!(collected2.is_empty());
1069    }
1070
1071    #[test]
1072    fn bounded_completed_retention_evicts_oldest() {
1073        let registry = RuntimeOpsLifecycleRegistry::with_config(OpsLifecycleConfig {
1074            max_completed: 3,
1075            max_concurrent: None,
1076        });
1077
1078        let mut ids = Vec::new();
1079        for i in 0..5 {
1080            let spec = background_spec(&format!("op-{i}"));
1081            let id = spec.id.clone();
1082            registry.register_operation(spec).unwrap();
1083            registry.provisioning_succeeded(&id).unwrap();
1084            registry
1085                .complete_operation(
1086                    &id,
1087                    OperationResult {
1088                        id: id.clone(),
1089                        content: format!("done-{i}"),
1090                        is_error: false,
1091                        duration_ms: 1,
1092                        tokens_used: 0,
1093                    },
1094                )
1095                .unwrap();
1096            ids.push(id);
1097        }
1098
1099        assert!(registry.snapshot(&ids[0]).is_none());
1100        assert!(registry.snapshot(&ids[1]).is_none());
1101        assert!(registry.snapshot(&ids[2]).is_some());
1102        assert!(registry.snapshot(&ids[3]).is_some());
1103        assert!(registry.snapshot(&ids[4]).is_some());
1104    }
1105
1106    #[test]
1107    fn max_concurrent_enforcement() {
1108        let registry = RuntimeOpsLifecycleRegistry::with_config(OpsLifecycleConfig {
1109            max_completed: DEFAULT_MAX_COMPLETED,
1110            max_concurrent: Some(2),
1111        });
1112
1113        let spec_a = background_spec("a");
1114        let id_a = spec_a.id.clone();
1115        registry.register_operation(spec_a).unwrap();
1116
1117        let spec_b = background_spec("b");
1118        registry.register_operation(spec_b).unwrap();
1119
1120        let spec_c = background_spec("c");
1121        let result = registry.register_operation(spec_c);
1122        assert!(matches!(
1123            result,
1124            Err(OpsLifecycleError::MaxConcurrentExceeded {
1125                limit: 2,
1126                active: 2,
1127            })
1128        ));
1129
1130        registry.provisioning_succeeded(&id_a).unwrap();
1131        registry
1132            .complete_operation(
1133                &id_a,
1134                OperationResult {
1135                    id: id_a.clone(),
1136                    content: "done".into(),
1137                    is_error: false,
1138                    duration_ms: 1,
1139                    tokens_used: 0,
1140                },
1141            )
1142            .unwrap();
1143
1144        let spec_d = background_spec("d");
1145        assert!(registry.register_operation(spec_d).is_ok());
1146    }
1147
1148    #[test]
1149    fn snapshot_includes_timestamps() {
1150        let registry = RuntimeOpsLifecycleRegistry::new();
1151        let spec = background_spec("timed");
1152        let op_id = spec.id.clone();
1153        registry.register_operation(spec).unwrap();
1154
1155        let snap1 = registry.snapshot(&op_id).unwrap();
1156        assert!(snap1.created_at_ms > 0);
1157        assert!(snap1.started_at_ms.is_none());
1158        assert!(snap1.completed_at_ms.is_none());
1159        assert!(snap1.elapsed_ms.is_none());
1160
1161        registry.provisioning_succeeded(&op_id).unwrap();
1162        let snap2 = registry.snapshot(&op_id).unwrap();
1163        assert!(snap2.started_at_ms.is_some());
1164        assert!(snap2.started_at_ms.unwrap() >= snap2.created_at_ms);
1165
1166        registry
1167            .complete_operation(
1168                &op_id,
1169                OperationResult {
1170                    id: op_id.clone(),
1171                    content: "done".into(),
1172                    is_error: false,
1173                    duration_ms: 1,
1174                    tokens_used: 0,
1175                },
1176            )
1177            .unwrap();
1178        let snap3 = registry.snapshot(&op_id).unwrap();
1179        assert!(snap3.completed_at_ms.is_some());
1180        assert!(snap3.elapsed_ms.is_some());
1181        assert!(snap3.completed_at_ms.unwrap() >= snap3.started_at_ms.unwrap());
1182    }
1183
1184    #[test]
1185    fn snapshot_includes_peer_handle() {
1186        let registry = RuntimeOpsLifecycleRegistry::new();
1187        let spec = OperationSpec {
1188            id: OperationId::new(),
1189            kind: OperationKind::MobMemberChild,
1190            owner_session_id: SessionId::new(),
1191            display_name: "peer-test".into(),
1192            source_label: "test".into(),
1193            child_session_id: Some(SessionId::new()),
1194            expect_peer_channel: true,
1195        };
1196        let op_id = spec.id.clone();
1197        registry.register_operation(spec).unwrap();
1198        registry.provisioning_succeeded(&op_id).unwrap();
1199
1200        let snap1 = registry.snapshot(&op_id).unwrap();
1201        assert!(snap1.peer_handle.is_none());
1202
1203        let handle = OperationPeerHandle {
1204            peer_name: "member-x".into(),
1205            trusted_peer: TrustedPeerSpec::new("member-x", "peer-id", "inproc://x").unwrap(),
1206        };
1207        registry.peer_ready(&op_id, handle).unwrap();
1208
1209        let snap2 = registry.snapshot(&op_id).unwrap();
1210        assert_eq!(snap2.peer_handle.as_ref().unwrap().peer_name, "member-x");
1211    }
1212}