1use std::collections::{HashMap, VecDeque};
9use std::future::Future;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
12use std::task::{Context, Poll};
13
14use meerkat_core::completion_feed::{
15 CompletionBatch, CompletionEntry, CompletionFeed, CompletionSeq,
16};
17
18#[cfg(target_arch = "wasm32")]
19use crate::tokio;
20use meerkat_core::lifecycle::{RunId, WaitRequestId};
21use meerkat_core::ops_lifecycle::{
22 DEFAULT_MAX_COMPLETED, OperationCompletionWatch, OperationId, OperationKind,
23 OperationLifecycleSnapshot, OperationPeerHandle, OperationProgressUpdate, OperationResult,
24 OperationSpec, OperationTerminalOutcome, OpsLifecycleError, OpsLifecycleRegistry,
25 WaitAllResult, WaitAllSatisfied,
26};
27use meerkat_core::time_compat::{Instant, SystemTime, UNIX_EPOCH};
28
29use crate::ops_lifecycle_authority::{
30 OpsLifecycleAuthority, OpsLifecycleEffect, OpsLifecycleInput, OpsLifecycleMutator,
31};
32
33#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
43pub struct PersistedOpsSnapshot {
44 pub epoch_id: meerkat_core::RuntimeEpochId,
46 pub authority_state: crate::ops_lifecycle_authority::RegistryCanonicalState,
48 pub operation_specs: HashMap<OperationId, meerkat_core::ops_lifecycle::OperationSpec>,
50 pub completion_entries: Vec<CompletionEntry>,
52 pub cursors: meerkat_core::EpochCursorSnapshot,
54}
55
56#[derive(Debug)]
65struct FeedBufferInner {
66 entries: VecDeque<CompletionEntry>,
67 watermark: CompletionSeq,
68 max_retained: usize,
69}
70
71#[derive(Debug)]
76struct FeedBuffer {
77 inner: RwLock<FeedBufferInner>,
78 watermark_atomic: AtomicU64,
80 notify: tokio::sync::Notify,
82}
83
84impl FeedBuffer {
85 fn new(max_retained: usize) -> Self {
86 Self {
87 inner: RwLock::new(FeedBufferInner {
88 entries: VecDeque::new(),
89 watermark: 0,
90 max_retained,
91 }),
92 watermark_atomic: AtomicU64::new(0),
93 notify: tokio::sync::Notify::new(),
94 }
95 }
96
97 fn push(&self, entry: CompletionEntry) {
98 let mut inner = self
99 .inner
100 .write()
101 .unwrap_or_else(std::sync::PoisonError::into_inner);
102 let seq = entry.seq;
103 inner.entries.push_back(entry);
104 inner.watermark = seq;
105
106 while inner.entries.len() > inner.max_retained {
108 inner.entries.pop_front();
109 }
110
111 drop(inner);
112
113 self.watermark_atomic.store(seq, Ordering::Release);
114 self.notify.notify_waiters();
115 }
116}
117
118#[derive(Debug, Clone)]
123pub struct RuntimeCompletionFeed {
124 buffer: Arc<FeedBuffer>,
125}
126
127impl CompletionFeed for RuntimeCompletionFeed {
128 fn watermark(&self) -> CompletionSeq {
129 self.buffer.watermark_atomic.load(Ordering::Acquire)
130 }
131
132 fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch {
133 let inner = self
134 .buffer
135 .inner
136 .read()
137 .unwrap_or_else(std::sync::PoisonError::into_inner);
138 let entries: Vec<CompletionEntry> = inner
139 .entries
140 .iter()
141 .filter(|e| e.seq > after_seq)
142 .cloned()
143 .collect();
144 let watermark = inner.watermark;
145 CompletionBatch { entries, watermark }
146 }
147
148 fn wait_for_advance(
149 &self,
150 after_seq: CompletionSeq,
151 ) -> std::pin::Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>> {
152 Box::pin(async move {
153 loop {
154 let notified = self.buffer.notify.notified();
159 let current = self.buffer.watermark_atomic.load(Ordering::Acquire);
160 if current > after_seq {
161 return current;
162 }
163 notified.await;
164 }
165 })
166 }
167}
168
169#[derive(Debug)]
177struct ShellRecord {
178 spec: OperationSpec,
179 peer_handle: Option<OperationPeerHandle>,
180 watchers: Vec<tokio::sync::oneshot::Sender<OperationTerminalOutcome>>,
181 created_at: Instant,
183 started_at: Option<Instant>,
184 completed_at: Option<Instant>,
185 created_at_wall: SystemTime,
187}
188
189#[derive(Debug)]
190struct PendingWaitState {
191 wait_request_id: WaitRequestId,
192 sender: tokio::sync::oneshot::Sender<WaitAllSatisfied>,
193}
194
195impl ShellRecord {
196 fn new(spec: OperationSpec) -> Self {
197 Self {
198 spec,
199 peer_handle: None,
200 watchers: Vec::new(),
201 created_at: Instant::now(),
202 started_at: None,
203 completed_at: None,
204 created_at_wall: SystemTime::now(),
205 }
206 }
207
208 fn epoch_millis(wall_anchor: &SystemTime) -> u64 {
209 wall_anchor
210 .duration_since(UNIX_EPOCH)
211 .map(|d| d.as_millis() as u64)
212 .unwrap_or(0)
213 }
214
215 fn epoch_millis_for_instant(&self, instant: Instant) -> u64 {
216 let offset = instant.saturating_duration_since(self.created_at);
219 let wall = self.created_at_wall + offset;
220 Self::epoch_millis(&wall)
221 }
222
223 fn notify_watchers(&mut self, outcome: &OperationTerminalOutcome) {
225 for watcher in std::mem::take(&mut self.watchers) {
226 let _ = watcher.send(outcome.clone());
227 }
228 }
229
230 fn mark_completed(&mut self) {
232 self.completed_at = Some(Instant::now());
233 }
234}
235
236#[derive(Debug)]
241struct ShellState {
242 authority: OpsLifecycleAuthority,
243 records: HashMap<OperationId, ShellRecord>,
244 pending_wait: Option<PendingWaitState>,
245 detached_wake: Option<Arc<crate::detached_wake::DetachedWakeState>>,
249 feed_buffer: Arc<FeedBuffer>,
251 persist_tx: Option<crate::tokio::sync::mpsc::Sender<PersistedOpsSnapshot>>,
253 persist_epoch_id: Option<meerkat_core::RuntimeEpochId>,
255 persist_cursor_state: Option<Arc<meerkat_core::EpochCursorState>>,
257}
258
259impl ShellState {
260 fn new(max_completed: usize, max_concurrent: Option<usize>) -> Self {
261 Self {
262 authority: OpsLifecycleAuthority::new(max_completed, max_concurrent),
263 records: HashMap::new(),
264 pending_wait: None,
265 detached_wake: None,
266 feed_buffer: Arc::new(FeedBuffer::new(max_completed.saturating_mul(4).max(1024))),
271 persist_tx: None,
272 persist_epoch_id: None,
273 persist_cursor_state: None,
274 }
275 }
276
277 fn snapshot(&self, id: &OperationId) -> Option<OperationLifecycleSnapshot> {
279 let canonical = self.authority.operation(id)?;
280 let shell = self.records.get(id)?;
281
282 let created_at_ms = ShellRecord::epoch_millis(&shell.created_at_wall);
283 let started_at_ms = shell.started_at.map(|i| shell.epoch_millis_for_instant(i));
284 let completed_at_ms = shell
285 .completed_at
286 .map(|i| shell.epoch_millis_for_instant(i));
287 let elapsed_ms = shell.completed_at.map(|completed| {
288 completed
289 .saturating_duration_since(shell.created_at)
290 .as_millis() as u64
291 });
292
293 Some(OperationLifecycleSnapshot {
294 id: shell.spec.id.clone(),
295 kind: canonical.kind(),
296 display_name: shell.spec.display_name.clone(),
297 status: canonical.status(),
298 peer_ready: canonical.peer_ready(),
299 progress_count: canonical.progress_count(),
300 watcher_count: shell.watchers.len() as u32,
301 terminal_outcome: canonical.terminal_outcome().cloned(),
302 child_session_id: shell.spec.child_session_id.clone(),
303 peer_handle: shell.peer_handle.clone(),
304 created_at_ms,
305 started_at_ms,
306 completed_at_ms,
307 elapsed_ms,
308 })
309 }
310
311 fn execute_effects(&mut self, effects: &[OpsLifecycleEffect]) {
318 for effect in effects {
319 match effect {
320 OpsLifecycleEffect::NotifyOpWatcher { operation_id, .. } => {
321 let outcome = self
323 .authority
324 .operation(operation_id)
325 .and_then(|op| op.terminal_outcome().cloned());
326 if let Some(outcome) = outcome
327 && let Some(shell) = self.records.get_mut(operation_id)
328 {
329 let watcher_count = shell.watchers.len() as u32;
330 shell.notify_watchers(&outcome);
331 shell.mark_completed();
332 self.authority.watchers_drained(operation_id, watcher_count);
333 }
334 if let Some(ref wake) = self.detached_wake
336 && self
337 .authority
338 .operation(operation_id)
339 .is_some_and(|op| op.kind() == OperationKind::BackgroundToolOp)
340 {
341 wake.pending.store(true, Ordering::Release);
342 wake.notify.notify_one(); }
344 }
345 OpsLifecycleEffect::ExposeOperationPeer { .. } => {
346 }
349 OpsLifecycleEffect::RetainTerminalRecord { .. } => {
350 }
353 OpsLifecycleEffect::EvictCompletedRecord { operation_id } => {
354 self.records.remove(operation_id);
355 self.authority.remove_operation(operation_id);
356 }
357 OpsLifecycleEffect::CompletionProduced {
358 seq,
359 operation_id,
360 kind,
361 } => {
362 let (display_name, terminal_outcome, completed_at_ms) =
365 if let Some(canonical) = self.authority.operation(operation_id) {
366 let outcome = canonical.terminal_outcome().cloned().unwrap_or(
367 OperationTerminalOutcome::Terminated {
368 reason: "missing outcome".into(),
369 },
370 );
371 let completed_ms = self.records.get(operation_id).and_then(|r| {
372 r.completed_at.map(|i| r.epoch_millis_for_instant(i))
373 });
374 let name = self
375 .records
376 .get(operation_id)
377 .map(|r| r.spec.display_name.clone())
378 .unwrap_or_default();
379 (name, outcome, completed_ms)
380 } else {
381 (
382 String::new(),
383 OperationTerminalOutcome::Terminated {
384 reason: "unknown operation".into(),
385 },
386 None,
387 )
388 };
389
390 self.feed_buffer.push(CompletionEntry {
391 seq: *seq,
392 operation_id: operation_id.clone(),
393 kind: *kind,
394 display_name,
395 terminal_outcome,
396 completed_at_ms,
397 });
398 }
399 OpsLifecycleEffect::SubmitOpEvent { .. } => {
400 }
402 OpsLifecycleEffect::WaitAllSatisfied {
403 wait_request_id,
404 operation_ids,
405 } => {
406 if let Some(pending_wait) = self.pending_wait.take() {
407 if pending_wait.wait_request_id == *wait_request_id {
408 let _ = pending_wait.sender.send(WaitAllSatisfied {
409 wait_request_id: wait_request_id.clone(),
410 operation_ids: operation_ids.clone(),
411 });
412 } else {
413 self.pending_wait = Some(pending_wait);
414 }
415 }
416 }
417 }
418 }
419 }
420
421 fn maybe_persist(&self) {
426 let (tx, epoch_id, cursor_state) = match (
427 &self.persist_tx,
428 &self.persist_epoch_id,
429 &self.persist_cursor_state,
430 ) {
431 (Some(tx), Some(epoch_id), Some(cs)) => (tx, epoch_id, cs),
432 _ => return,
433 };
434
435 let operation_specs: HashMap<OperationId, meerkat_core::ops_lifecycle::OperationSpec> =
436 self.records
437 .iter()
438 .map(|(id, record)| (id.clone(), record.spec.clone()))
439 .collect();
440
441 let completion_entries = {
442 let inner = self
443 .feed_buffer
444 .inner
445 .read()
446 .unwrap_or_else(std::sync::PoisonError::into_inner);
447 inner.entries.iter().cloned().collect()
448 };
449
450 let snapshot = PersistedOpsSnapshot {
451 epoch_id: epoch_id.clone(),
452 authority_state: self.authority.canonical_state().clone(),
453 operation_specs,
454 completion_entries,
455 cursors: cursor_state.snapshot(),
456 };
457
458 if tx.try_send(snapshot).is_err() {
460 tracing::warn!("ops lifecycle persistence channel full or closed; snapshot dropped");
461 }
462 }
463
464 fn shell_record_mut(
465 &mut self,
466 id: &OperationId,
467 ) -> Result<&mut ShellRecord, OpsLifecycleError> {
468 self.records
469 .get_mut(id)
470 .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))
471 }
472
473 fn collect_wait_outcomes(
474 &self,
475 operation_ids: &[OperationId],
476 ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
477 operation_ids
478 .iter()
479 .map(|operation_id| {
480 let outcome = self
481 .authority
482 .operation(operation_id)
483 .and_then(|op| op.terminal_outcome().cloned())
484 .ok_or_else(|| {
485 OpsLifecycleError::Internal(format!(
486 "wait_all completed without terminal outcome for {operation_id}"
487 ))
488 })?;
489 Ok((operation_id.clone(), outcome))
490 })
491 .collect()
492 }
493}
494
495impl Default for ShellState {
496 fn default() -> Self {
497 Self::new(DEFAULT_MAX_COMPLETED, None)
498 }
499}
500
501#[derive(Debug, Clone)]
507pub struct OpsLifecycleConfig {
508 pub max_completed: usize,
510 pub max_concurrent: Option<usize>,
512}
513
514impl Default for OpsLifecycleConfig {
515 fn default() -> Self {
516 Self {
517 max_completed: DEFAULT_MAX_COMPLETED,
518 max_concurrent: None,
519 }
520 }
521}
522
523#[derive(Debug)]
529pub struct RuntimeOpsLifecycleRegistry {
530 state: RwLock<ShellState>,
531}
532
533impl Default for RuntimeOpsLifecycleRegistry {
534 fn default() -> Self {
535 Self {
536 state: RwLock::new(ShellState::default()),
537 }
538 }
539}
540
541impl RuntimeOpsLifecycleRegistry {
542 pub fn new() -> Self {
543 Self::default()
544 }
545
546 pub fn with_config(config: OpsLifecycleConfig) -> Self {
547 Self {
548 state: RwLock::new(ShellState::new(config.max_completed, config.max_concurrent)),
549 }
550 }
551
552 pub fn set_persistence_channel(
558 &self,
559 tx: crate::tokio::sync::mpsc::Sender<PersistedOpsSnapshot>,
560 epoch_id: meerkat_core::RuntimeEpochId,
561 cursor_state: Arc<meerkat_core::EpochCursorState>,
562 ) {
563 if let Ok(mut state) = self.state.write() {
564 state.persist_tx = Some(tx);
565 state.persist_epoch_id = Some(epoch_id);
566 state.persist_cursor_state = Some(cursor_state);
567 }
568 }
569
570 pub fn set_detached_wake(&self, wake: Arc<crate::detached_wake::DetachedWakeState>) {
573 if let Ok(mut state) = self.state.write() {
574 state.detached_wake = Some(wake);
575 }
576 }
577
578 pub fn from_recovered(snapshot: PersistedOpsSnapshot) -> Self {
584 let authority = OpsLifecycleAuthority::from_recovered(snapshot.authority_state);
585
586 let max_retained = authority
588 .canonical_state()
589 .max_completed()
590 .max(256)
591 .saturating_mul(4)
592 .max(1024);
593 let feed_buffer = Arc::new(FeedBuffer::new(max_retained));
594 for entry in &snapshot.completion_entries {
595 feed_buffer.push(entry.clone());
596 }
597
598 let mut records = HashMap::new();
600 for (op_id, spec) in &snapshot.operation_specs {
601 if authority.operation(op_id).is_some() {
603 records.insert(
604 op_id.clone(),
605 ShellRecord {
606 spec: spec.clone(),
607 peer_handle: None,
608 watchers: Vec::new(),
609 created_at: Instant::now(),
610 started_at: None,
611 completed_at: None,
612 created_at_wall: SystemTime::now(),
613 },
614 );
615 }
616 }
617
618 let state = ShellState {
619 authority,
620 records,
621 pending_wait: None,
622 detached_wake: None,
623 feed_buffer,
624 persist_tx: None,
625 persist_epoch_id: None,
626 persist_cursor_state: None,
627 };
628
629 Self {
630 state: RwLock::new(state),
631 }
632 }
633
634 pub fn capture_persistence_snapshot(
640 &self,
641 epoch_id: meerkat_core::RuntimeEpochId,
642 cursor_state: &meerkat_core::EpochCursorState,
643 ) -> PersistedOpsSnapshot {
644 let state = self
645 .state
646 .read()
647 .unwrap_or_else(std::sync::PoisonError::into_inner);
648
649 let operation_specs: HashMap<OperationId, meerkat_core::ops_lifecycle::OperationSpec> =
650 state
651 .records
652 .iter()
653 .map(|(id, record)| (id.clone(), record.spec.clone()))
654 .collect();
655
656 let completion_entries = {
657 let inner = state
658 .feed_buffer
659 .inner
660 .read()
661 .unwrap_or_else(std::sync::PoisonError::into_inner);
662 inner.entries.iter().cloned().collect()
663 };
664
665 let cursors = cursor_state.snapshot();
666
667 PersistedOpsSnapshot {
668 epoch_id,
669 authority_state: state.authority.canonical_state().clone(),
670 operation_specs,
671 completion_entries,
672 cursors,
673 }
674 }
675
676 pub fn completion_feed_handle(&self) -> Arc<dyn CompletionFeed> {
678 let state = self
679 .state
680 .read()
681 .unwrap_or_else(std::sync::PoisonError::into_inner);
682 Arc::new(RuntimeCompletionFeed {
683 buffer: Arc::clone(&state.feed_buffer),
684 })
685 }
686
687 fn read_state(&self) -> Result<RwLockReadGuard<'_, ShellState>, OpsLifecycleError> {
688 self.state
689 .read()
690 .map_err(|_| OpsLifecycleError::Internal("ops lifecycle registry poisoned".into()))
691 }
692
693 fn write_state(&self) -> Result<RwLockWriteGuard<'_, ShellState>, OpsLifecycleError> {
694 self.state
695 .write()
696 .map_err(|_| OpsLifecycleError::Internal("ops lifecycle registry poisoned".into()))
697 }
698
699 fn cancel_wait_all_internal(
700 &self,
701 wait_request_id: &WaitRequestId,
702 ) -> Result<(), OpsLifecycleError> {
703 let mut state = self.write_state()?;
704 match state.authority.apply(OpsLifecycleInput::CancelWaitAll {
705 wait_request_id: wait_request_id.clone(),
706 }) {
707 Ok(_) => {
708 state.pending_wait = None;
709 Ok(())
710 }
711 Err(OpsLifecycleError::WaitNotActive(_)) => {
712 state.pending_wait = None;
713 Ok(())
714 }
715 Err(err) => Err(err),
716 }
717 }
718}
719
720enum WaitAllFutureState {
721 Ready(Option<Result<WaitAllResult, OpsLifecycleError>>),
722 Waiting(tokio::sync::oneshot::Receiver<WaitAllSatisfied>),
723 Done,
724}
725
726struct WaitAllFuture<'a> {
727 registry: &'a RuntimeOpsLifecycleRegistry,
728 wait_request_id: WaitRequestId,
729 operation_ids: Vec<OperationId>,
730 state: WaitAllFutureState,
731}
732
733impl Future for WaitAllFuture<'_> {
734 type Output = Result<WaitAllResult, OpsLifecycleError>;
735
736 fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
737 match &mut self.state {
738 WaitAllFutureState::Ready(result) => {
739 let ready = result.take().unwrap_or_else(|| {
740 Err(OpsLifecycleError::Internal(
741 "wait_all future polled after completion".into(),
742 ))
743 });
744 self.state = WaitAllFutureState::Done;
745 Poll::Ready(ready)
746 }
747 WaitAllFutureState::Waiting(receiver) => match std::pin::Pin::new(receiver).poll(cx) {
748 Poll::Pending => Poll::Pending,
749 Poll::Ready(Ok(satisfied)) => {
750 let outcomes = match self.registry.read_state() {
751 Ok(state) => state.collect_wait_outcomes(&self.operation_ids),
752 Err(err) => Err(err),
753 };
754 self.state = WaitAllFutureState::Done;
755 Poll::Ready(outcomes.map(|outcomes| WaitAllResult {
756 outcomes,
757 satisfied,
758 }))
759 }
760 Poll::Ready(Err(_)) => {
761 self.state = WaitAllFutureState::Done;
762 Poll::Ready(Err(OpsLifecycleError::Internal(
763 "wait_all completion channel dropped".into(),
764 )))
765 }
766 },
767 WaitAllFutureState::Done => Poll::Ready(Err(OpsLifecycleError::Internal(
768 "wait_all future polled after completion".into(),
769 ))),
770 }
771 }
772}
773
774impl Drop for WaitAllFuture<'_> {
775 fn drop(&mut self) {
776 if matches!(self.state, WaitAllFutureState::Waiting(_)) {
777 let _ = self
778 .registry
779 .cancel_wait_all_internal(&self.wait_request_id);
780 }
781 }
782}
783
784impl OpsLifecycleRegistry for RuntimeOpsLifecycleRegistry {
785 fn register_operation(&self, spec: OperationSpec) -> Result<(), OpsLifecycleError> {
786 let mut state = self.write_state()?;
787 let operation_id = spec.id.clone();
788 let kind = spec.kind;
789
790 let transition = state
792 .authority
793 .apply(OpsLifecycleInput::RegisterOperation {
794 operation_id: operation_id.clone(),
795 kind,
796 })?;
797
798 state.records.insert(operation_id, ShellRecord::new(spec));
800
801 state.execute_effects(&transition.effects);
803 Ok(())
804 }
805
806 fn provisioning_succeeded(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
807 let mut state = self.write_state()?;
808
809 let transition = state
810 .authority
811 .apply(OpsLifecycleInput::ProvisioningSucceeded {
812 operation_id: id.clone(),
813 })?;
814
815 if let Some(shell) = state.records.get_mut(id) {
817 shell.started_at = Some(Instant::now());
818 }
819
820 state.execute_effects(&transition.effects);
821 Ok(())
822 }
823
824 fn provisioning_failed(
825 &self,
826 id: &OperationId,
827 error: String,
828 ) -> Result<(), OpsLifecycleError> {
829 let mut state = self.write_state()?;
830
831 let transition = state
832 .authority
833 .apply(OpsLifecycleInput::ProvisioningFailed {
834 operation_id: id.clone(),
835 })?;
836
837 state
839 .authority
840 .patch_terminal_outcome(id, OperationTerminalOutcome::Failed { error });
841
842 state.execute_effects(&transition.effects);
843 Ok(())
844 }
845
846 fn peer_ready(
847 &self,
848 id: &OperationId,
849 peer: OperationPeerHandle,
850 ) -> Result<(), OpsLifecycleError> {
851 let mut state = self.write_state()?;
852
853 let transition = state.authority.apply(OpsLifecycleInput::PeerReady {
854 operation_id: id.clone(),
855 })?;
856
857 if let Some(shell) = state.records.get_mut(id) {
859 shell.peer_handle = Some(peer);
860 }
861
862 state.execute_effects(&transition.effects);
863 Ok(())
864 }
865
866 fn register_watcher(
867 &self,
868 id: &OperationId,
869 ) -> Result<OperationCompletionWatch, OpsLifecycleError> {
870 let mut state = self.write_state()?;
871
872 let canonical = state
874 .authority
875 .operation(id)
876 .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
877
878 if let Some(outcome) = canonical.terminal_outcome() {
879 return Ok(OperationCompletionWatch::already_resolved(outcome.clone()));
880 }
881
882 let _transition = state.authority.apply(OpsLifecycleInput::RegisterWatcher {
884 operation_id: id.clone(),
885 })?;
886
887 let shell = state.shell_record_mut(id)?;
889 let (tx, watch) = OperationCompletionWatch::channel();
890 shell.watchers.push(tx);
891 Ok(watch)
892 }
893
894 fn report_progress(
895 &self,
896 id: &OperationId,
897 _update: OperationProgressUpdate,
898 ) -> Result<(), OpsLifecycleError> {
899 let mut state = self.write_state()?;
900
901 let transition = state.authority.apply(OpsLifecycleInput::ProgressReported {
902 operation_id: id.clone(),
903 })?;
904
905 state.execute_effects(&transition.effects);
906 Ok(())
907 }
908
909 fn complete_operation(
910 &self,
911 id: &OperationId,
912 result: OperationResult,
913 ) -> Result<(), OpsLifecycleError> {
914 let mut state = self.write_state()?;
915
916 let transition = state
917 .authority
918 .apply(OpsLifecycleInput::CompleteOperation {
919 operation_id: id.clone(),
920 })?;
921
922 state
924 .authority
925 .patch_terminal_outcome(id, OperationTerminalOutcome::Completed(result));
926
927 state.execute_effects(&transition.effects);
928 state.maybe_persist();
929 Ok(())
930 }
931
932 fn fail_operation(&self, id: &OperationId, error: String) -> Result<(), OpsLifecycleError> {
933 let mut state = self.write_state()?;
934
935 let transition = state.authority.apply(OpsLifecycleInput::FailOperation {
936 operation_id: id.clone(),
937 })?;
938
939 state
941 .authority
942 .patch_terminal_outcome(id, OperationTerminalOutcome::Failed { error });
943
944 state.execute_effects(&transition.effects);
945 state.maybe_persist();
946 Ok(())
947 }
948
949 fn abort_provisioning(
950 &self,
951 id: &OperationId,
952 reason: Option<String>,
953 ) -> Result<(), OpsLifecycleError> {
954 let mut state = self.write_state()?;
955
956 let transition = state
957 .authority
958 .apply(OpsLifecycleInput::AbortProvisioning {
959 operation_id: id.clone(),
960 })?;
961
962 state
963 .authority
964 .patch_terminal_outcome(id, OperationTerminalOutcome::Aborted { reason });
965
966 state.execute_effects(&transition.effects);
967 state.maybe_persist();
968 Ok(())
969 }
970
971 fn cancel_operation(
972 &self,
973 id: &OperationId,
974 reason: Option<String>,
975 ) -> Result<(), OpsLifecycleError> {
976 let mut state = self.write_state()?;
977
978 let transition = state.authority.apply(OpsLifecycleInput::CancelOperation {
979 operation_id: id.clone(),
980 })?;
981
982 state
984 .authority
985 .patch_terminal_outcome(id, OperationTerminalOutcome::Cancelled { reason });
986
987 state.execute_effects(&transition.effects);
988 state.maybe_persist();
989 Ok(())
990 }
991
992 fn request_retire(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
993 let mut state = self.write_state()?;
994
995 let transition = state.authority.apply(OpsLifecycleInput::RetireRequested {
996 operation_id: id.clone(),
997 })?;
998
999 state.execute_effects(&transition.effects);
1000 Ok(())
1001 }
1002
1003 fn mark_retired(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
1004 let mut state = self.write_state()?;
1005
1006 let transition = state.authority.apply(OpsLifecycleInput::RetireCompleted {
1007 operation_id: id.clone(),
1008 })?;
1009
1010 state
1012 .authority
1013 .patch_terminal_outcome(id, OperationTerminalOutcome::Retired);
1014
1015 state.execute_effects(&transition.effects);
1016 Ok(())
1017 }
1018
1019 fn snapshot(&self, id: &OperationId) -> Option<OperationLifecycleSnapshot> {
1020 self.read_state().ok().and_then(|state| state.snapshot(id))
1021 }
1022
1023 fn list_operations(&self) -> Vec<OperationLifecycleSnapshot> {
1024 let mut snapshots = self
1025 .read_state()
1026 .map(|state| {
1027 state
1028 .authority
1029 .operations()
1030 .filter_map(|(id, _)| state.snapshot(id))
1031 .collect::<Vec<_>>()
1032 })
1033 .unwrap_or_default();
1034 snapshots.sort_by(|left, right| left.display_name.cmp(&right.display_name));
1035 snapshots
1036 }
1037
1038 fn terminate_owner(&self, reason: String) -> Result<(), OpsLifecycleError> {
1039 let mut state = self.write_state()?;
1040
1041 let transition = state.authority.apply(OpsLifecycleInput::OwnerTerminated)?;
1042
1043 for effect in &transition.effects {
1047 if let OpsLifecycleEffect::NotifyOpWatcher { operation_id, .. } = effect {
1048 state.authority.patch_terminal_outcome(
1049 operation_id,
1050 OperationTerminalOutcome::Terminated {
1051 reason: reason.clone(),
1052 },
1053 );
1054 }
1055 }
1056
1057 state.execute_effects(&transition.effects);
1058 Ok(())
1059 }
1060
1061 fn collect_completed(
1062 &self,
1063 ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
1064 let mut state = self.write_state()?;
1065
1066 let collected = state.authority.drain_completed();
1067
1068 for (id, _) in &collected {
1070 state.records.remove(id);
1071 }
1072
1073 Ok(collected)
1074 }
1075
1076 fn completion_feed(&self) -> Option<Arc<dyn CompletionFeed>> {
1077 Some(self.completion_feed_handle())
1078 }
1079
1080 fn wait_all(
1081 &self,
1082 _run_id: &RunId,
1083 ids: &[OperationId],
1084 ) -> std::pin::Pin<
1085 Box<dyn std::future::Future<Output = Result<WaitAllResult, OpsLifecycleError>> + Send + '_>,
1086 > {
1087 let wait_request_id = WaitRequestId::new();
1088 let owned_ids = ids.to_vec();
1089
1090 let state = match self.write_state() {
1091 Ok(mut state) => {
1092 let transition = match state.authority.apply(OpsLifecycleInput::BeginWaitAll {
1093 wait_request_id: wait_request_id.clone(),
1094 operation_ids: owned_ids.clone(),
1095 }) {
1096 Ok(transition) => transition,
1097 Err(err) => {
1098 return Box::pin(WaitAllFuture {
1099 registry: self,
1100 wait_request_id,
1101 operation_ids: owned_ids,
1102 state: WaitAllFutureState::Ready(Some(Err(err))),
1103 });
1104 }
1105 };
1106
1107 let satisfied = transition.effects.iter().find_map(|effect| match effect {
1108 OpsLifecycleEffect::WaitAllSatisfied {
1109 wait_request_id,
1110 operation_ids,
1111 } => Some(WaitAllSatisfied {
1112 wait_request_id: wait_request_id.clone(),
1113 operation_ids: operation_ids.clone(),
1114 }),
1115 _ => None,
1116 });
1117
1118 state.execute_effects(&transition.effects);
1119
1120 if let Some(satisfied) = satisfied {
1121 WaitAllFutureState::Ready(Some(state.collect_wait_outcomes(&owned_ids).map(
1122 |outcomes| WaitAllResult {
1123 outcomes,
1124 satisfied,
1125 },
1126 )))
1127 } else {
1128 if state.pending_wait.is_some() {
1129 return Box::pin(WaitAllFuture {
1130 registry: self,
1131 wait_request_id,
1132 operation_ids: owned_ids,
1133 state: WaitAllFutureState::Ready(Some(Err(
1134 OpsLifecycleError::Internal(
1135 "wait_all started while a pending wait sender already existed"
1136 .into(),
1137 ),
1138 ))),
1139 });
1140 }
1141 let (sender, receiver) = tokio::sync::oneshot::channel();
1142 state.pending_wait = Some(PendingWaitState {
1143 wait_request_id: wait_request_id.clone(),
1144 sender,
1145 });
1146 WaitAllFutureState::Waiting(receiver)
1147 }
1148 }
1149 Err(err) => WaitAllFutureState::Ready(Some(Err(err))),
1150 };
1151
1152 Box::pin(WaitAllFuture {
1153 registry: self,
1154 wait_request_id,
1155 operation_ids: owned_ids,
1156 state,
1157 })
1158 }
1159}
1160
1161#[cfg(test)]
1162#[allow(clippy::unwrap_used, clippy::panic)]
1163mod tests {
1164 use super::*;
1165 use meerkat_core::comms::TrustedPeerSpec;
1166 use meerkat_core::lifecycle::RunId;
1167 use meerkat_core::ops_lifecycle::{OperationKind, OpsLifecycleRegistry};
1168 use meerkat_core::types::SessionId;
1169 use uuid::Uuid;
1170
1171 fn test_run_id() -> RunId {
1172 RunId(Uuid::from_u128(1))
1173 }
1174
1175 fn background_spec(name: &str) -> OperationSpec {
1176 OperationSpec {
1177 id: OperationId::new(),
1178 kind: OperationKind::BackgroundToolOp,
1179 owner_session_id: SessionId::new(),
1180 display_name: name.into(),
1181 source_label: "test".into(),
1182 child_session_id: None,
1183 expect_peer_channel: false,
1184 }
1185 }
1186
1187 #[tokio::test]
1188 async fn late_watchers_resolve_immediately() {
1189 let registry = RuntimeOpsLifecycleRegistry::new();
1190 let spec = background_spec("late");
1191 let op_id = spec.id.clone();
1192 registry.register_operation(spec).unwrap();
1193 registry.provisioning_succeeded(&op_id).unwrap();
1194 registry
1195 .complete_operation(
1196 &op_id,
1197 OperationResult {
1198 id: op_id.clone(),
1199 content: "done".into(),
1200 is_error: false,
1201 duration_ms: 1,
1202 tokens_used: 0,
1203 },
1204 )
1205 .unwrap();
1206
1207 let watch = registry.register_watcher(&op_id).unwrap();
1208 match watch.wait().await {
1209 OperationTerminalOutcome::Completed(result) => assert_eq!(result.content, "done"),
1210 other => panic!("expected completed outcome, got {other:?}"),
1211 }
1212 }
1213
1214 #[test]
1215 fn peer_ready_requires_peer_expectation() {
1216 let registry = RuntimeOpsLifecycleRegistry::new();
1217 let spec = background_spec("no-peer");
1218 let op_id = spec.id.clone();
1219 registry.register_operation(spec).unwrap();
1220 registry.provisioning_succeeded(&op_id).unwrap();
1221
1222 let result = registry.peer_ready(
1223 &op_id,
1224 OperationPeerHandle {
1225 peer_name: "peer".into(),
1226 trusted_peer: TrustedPeerSpec::new("peer", "peer-id", "inproc://peer").unwrap(),
1227 },
1228 );
1229 assert!(matches!(result, Err(OpsLifecycleError::PeerNotExpected(_))));
1230 }
1231
1232 #[tokio::test]
1233 async fn multi_listener_completion() {
1234 let registry = RuntimeOpsLifecycleRegistry::new();
1235 let spec = background_spec("multi");
1236 let op_id = spec.id.clone();
1237 registry.register_operation(spec).unwrap();
1238 registry.provisioning_succeeded(&op_id).unwrap();
1239
1240 let watch1 = registry.register_watcher(&op_id).unwrap();
1241 let watch2 = registry.register_watcher(&op_id).unwrap();
1242 let watch3 = registry.register_watcher(&op_id).unwrap();
1243
1244 registry
1245 .complete_operation(
1246 &op_id,
1247 OperationResult {
1248 id: op_id.clone(),
1249 content: "multi-done".into(),
1250 is_error: false,
1251 duration_ms: 1,
1252 tokens_used: 0,
1253 },
1254 )
1255 .unwrap();
1256
1257 for watch in [watch1, watch2, watch3] {
1258 match watch.wait().await {
1259 OperationTerminalOutcome::Completed(result) => {
1260 assert_eq!(result.content, "multi-done");
1261 }
1262 other => panic!("expected completed, got {other:?}"),
1263 }
1264 }
1265 }
1266
1267 #[tokio::test]
1268 async fn wait_all_returns_all_outcomes() {
1269 let registry = RuntimeOpsLifecycleRegistry::new();
1270
1271 let spec_a = background_spec("a");
1272 let id_a = spec_a.id.clone();
1273 registry.register_operation(spec_a).unwrap();
1274 registry.provisioning_succeeded(&id_a).unwrap();
1275
1276 let spec_b = background_spec("b");
1277 let id_b = spec_b.id.clone();
1278 registry.register_operation(spec_b).unwrap();
1279 registry.provisioning_succeeded(&id_b).unwrap();
1280
1281 registry
1282 .complete_operation(
1283 &id_a,
1284 OperationResult {
1285 id: id_a.clone(),
1286 content: "a-done".into(),
1287 is_error: false,
1288 duration_ms: 1,
1289 tokens_used: 0,
1290 },
1291 )
1292 .unwrap();
1293 registry.fail_operation(&id_b, "b-error".into()).unwrap();
1294
1295 let wait_result = registry
1296 .wait_all(&test_run_id(), &[id_a.clone(), id_b.clone()])
1297 .await
1298 .unwrap();
1299 assert_eq!(wait_result.outcomes.len(), 2);
1300 assert_eq!(wait_result.outcomes[0].0, id_a);
1301 assert!(matches!(
1302 wait_result.outcomes[0].1,
1303 OperationTerminalOutcome::Completed(_)
1304 ));
1305 assert_eq!(wait_result.outcomes[1].0, id_b);
1306 assert!(matches!(
1307 wait_result.outcomes[1].1,
1308 OperationTerminalOutcome::Failed { .. }
1309 ));
1310 assert_eq!(wait_result.satisfied.operation_ids.len(), 2);
1312 assert_ne!(wait_result.satisfied.wait_request_id.to_string(), "");
1313 }
1314
1315 #[tokio::test]
1318 async fn wait_all_trait_path_submits_through_authority() {
1319 let registry = RuntimeOpsLifecycleRegistry::new();
1320 let spec = background_spec("trait-wait");
1321 let op_id = spec.id.clone();
1322 registry.register_operation(spec).unwrap();
1323 registry.provisioning_succeeded(&op_id).unwrap();
1324 registry
1325 .complete_operation(
1326 &op_id,
1327 OperationResult {
1328 id: op_id.clone(),
1329 content: "done".into(),
1330 is_error: false,
1331 duration_ms: 1,
1332 tokens_used: 0,
1333 },
1334 )
1335 .unwrap();
1336
1337 let trait_ref: &dyn OpsLifecycleRegistry = ®istry;
1339 let wait_result = trait_ref
1340 .wait_all(&test_run_id(), std::slice::from_ref(&op_id))
1341 .await
1342 .unwrap();
1343 assert_eq!(wait_result.outcomes.len(), 1);
1344 assert!(matches!(
1345 wait_result.outcomes[0].1,
1346 OperationTerminalOutcome::Completed(_)
1347 ));
1348 assert_eq!(wait_result.satisfied.operation_ids, vec![op_id]);
1350 assert_ne!(wait_result.satisfied.wait_request_id.to_string(), "");
1351 }
1352
1353 #[tokio::test]
1354 async fn wait_all_resolves_from_authority_owned_wait_request() {
1355 let registry = RuntimeOpsLifecycleRegistry::new();
1356 let run_id = test_run_id();
1357
1358 let spec = background_spec("pending");
1359 let op_id = spec.id.clone();
1360 registry.register_operation(spec).unwrap();
1361 registry.provisioning_succeeded(&op_id).unwrap();
1362
1363 let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
1364 tokio::pin!(wait_fut);
1365 assert!(
1366 tokio::time::timeout(std::time::Duration::from_millis(10), &mut wait_fut)
1367 .await
1368 .is_err()
1369 );
1370
1371 let active_wait_request_id = {
1372 let state = registry.read_state().unwrap();
1373 let wait_request_id = match state.authority.wait_request_id().cloned() {
1374 Some(wait_request_id) => wait_request_id,
1375 None => panic!("wait request should be active"),
1376 };
1377 assert_eq!(
1378 state.authority.wait_operation_ids(),
1379 std::slice::from_ref(&op_id)
1380 );
1381 wait_request_id
1382 };
1383
1384 registry
1385 .complete_operation(
1386 &op_id,
1387 OperationResult {
1388 id: op_id.clone(),
1389 content: "done".into(),
1390 is_error: false,
1391 duration_ms: 1,
1392 tokens_used: 0,
1393 },
1394 )
1395 .unwrap();
1396
1397 let wait_result = wait_fut.await.unwrap();
1398 assert_eq!(
1399 wait_result.satisfied.wait_request_id,
1400 active_wait_request_id
1401 );
1402 assert_eq!(wait_result.satisfied.operation_ids, vec![op_id.clone()]);
1403 assert!(matches!(
1404 wait_result.outcomes.as_slice(),
1405 [(returned_id, OperationTerminalOutcome::Completed(_))] if *returned_id == op_id
1406 ));
1407 assert!(
1408 registry
1409 .read_state()
1410 .unwrap()
1411 .authority
1412 .wait_request_id()
1413 .is_none()
1414 );
1415 }
1416
1417 #[tokio::test]
1418 async fn dropping_wait_all_future_cancels_active_wait_request() {
1419 let registry = RuntimeOpsLifecycleRegistry::new();
1420 let run_id = test_run_id();
1421
1422 let spec = background_spec("cancelled-wait");
1423 let op_id = spec.id.clone();
1424 registry.register_operation(spec).unwrap();
1425 registry.provisioning_succeeded(&op_id).unwrap();
1426
1427 let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
1428 drop(wait_fut);
1429
1430 let state = registry.read_state().unwrap();
1431 assert!(state.authority.wait_request_id().is_none());
1432 assert!(state.authority.wait_operation_ids().is_empty());
1433 }
1434
1435 #[test]
1436 fn collect_completed_drains_terminal_operations() {
1437 let registry = RuntimeOpsLifecycleRegistry::new();
1438
1439 let spec_a = background_spec("a");
1440 let id_a = spec_a.id.clone();
1441 registry.register_operation(spec_a).unwrap();
1442 registry.provisioning_succeeded(&id_a).unwrap();
1443 registry
1444 .complete_operation(
1445 &id_a,
1446 OperationResult {
1447 id: id_a.clone(),
1448 content: "done".into(),
1449 is_error: false,
1450 duration_ms: 1,
1451 tokens_used: 0,
1452 },
1453 )
1454 .unwrap();
1455
1456 let spec_b = background_spec("b");
1457 let id_b = spec_b.id.clone();
1458 registry.register_operation(spec_b).unwrap();
1459
1460 let collected = registry.collect_completed().unwrap();
1461 assert_eq!(collected.len(), 1);
1462 assert_eq!(collected[0].0, id_a);
1463
1464 assert!(registry.snapshot(&id_a).is_none());
1465 assert!(registry.snapshot(&id_b).is_some());
1466
1467 let collected2 = registry.collect_completed().unwrap();
1468 assert!(collected2.is_empty());
1469 }
1470
1471 #[test]
1472 fn bounded_completed_retention_evicts_oldest() {
1473 let registry = RuntimeOpsLifecycleRegistry::with_config(OpsLifecycleConfig {
1474 max_completed: 3,
1475 max_concurrent: None,
1476 });
1477
1478 let mut ids = Vec::new();
1479 for i in 0..5 {
1480 let spec = background_spec(&format!("op-{i}"));
1481 let id = spec.id.clone();
1482 registry.register_operation(spec).unwrap();
1483 registry.provisioning_succeeded(&id).unwrap();
1484 registry
1485 .complete_operation(
1486 &id,
1487 OperationResult {
1488 id: id.clone(),
1489 content: format!("done-{i}"),
1490 is_error: false,
1491 duration_ms: 1,
1492 tokens_used: 0,
1493 },
1494 )
1495 .unwrap();
1496 ids.push(id);
1497 }
1498
1499 assert!(registry.snapshot(&ids[0]).is_none());
1500 assert!(registry.snapshot(&ids[1]).is_none());
1501 assert!(registry.snapshot(&ids[2]).is_some());
1502 assert!(registry.snapshot(&ids[3]).is_some());
1503 assert!(registry.snapshot(&ids[4]).is_some());
1504 }
1505
1506 #[test]
1507 fn max_concurrent_enforcement() {
1508 let registry = RuntimeOpsLifecycleRegistry::with_config(OpsLifecycleConfig {
1509 max_completed: DEFAULT_MAX_COMPLETED,
1510 max_concurrent: Some(2),
1511 });
1512
1513 let spec_a = background_spec("a");
1514 let id_a = spec_a.id.clone();
1515 registry.register_operation(spec_a).unwrap();
1516
1517 let spec_b = background_spec("b");
1518 registry.register_operation(spec_b).unwrap();
1519
1520 let spec_c = background_spec("c");
1521 let result = registry.register_operation(spec_c);
1522 assert!(matches!(
1523 result,
1524 Err(OpsLifecycleError::MaxConcurrentExceeded {
1525 limit: 2,
1526 active: 2,
1527 })
1528 ));
1529
1530 registry.provisioning_succeeded(&id_a).unwrap();
1531 registry
1532 .complete_operation(
1533 &id_a,
1534 OperationResult {
1535 id: id_a.clone(),
1536 content: "done".into(),
1537 is_error: false,
1538 duration_ms: 1,
1539 tokens_used: 0,
1540 },
1541 )
1542 .unwrap();
1543
1544 let spec_d = background_spec("d");
1545 assert!(registry.register_operation(spec_d).is_ok());
1546 }
1547
1548 #[test]
1549 fn snapshot_includes_timestamps() {
1550 let registry = RuntimeOpsLifecycleRegistry::new();
1551 let spec = background_spec("timed");
1552 let op_id = spec.id.clone();
1553 registry.register_operation(spec).unwrap();
1554
1555 let snap1 = registry.snapshot(&op_id).unwrap();
1556 assert!(snap1.created_at_ms > 0);
1557 assert!(snap1.started_at_ms.is_none());
1558 assert!(snap1.completed_at_ms.is_none());
1559 assert!(snap1.elapsed_ms.is_none());
1560
1561 registry.provisioning_succeeded(&op_id).unwrap();
1562 let snap2 = registry.snapshot(&op_id).unwrap();
1563 assert!(snap2.started_at_ms.is_some());
1564 assert!(snap2.started_at_ms.unwrap() >= snap2.created_at_ms);
1565
1566 registry
1567 .complete_operation(
1568 &op_id,
1569 OperationResult {
1570 id: op_id.clone(),
1571 content: "done".into(),
1572 is_error: false,
1573 duration_ms: 1,
1574 tokens_used: 0,
1575 },
1576 )
1577 .unwrap();
1578 let snap3 = registry.snapshot(&op_id).unwrap();
1579 assert!(snap3.completed_at_ms.is_some());
1580 assert!(snap3.elapsed_ms.is_some());
1581 assert!(snap3.completed_at_ms.unwrap() >= snap3.started_at_ms.unwrap());
1582 }
1583
1584 #[test]
1585 fn snapshot_includes_peer_handle() {
1586 let registry = RuntimeOpsLifecycleRegistry::new();
1587 let spec = OperationSpec {
1588 id: OperationId::new(),
1589 kind: OperationKind::MobMemberChild,
1590 owner_session_id: SessionId::new(),
1591 display_name: "peer-test".into(),
1592 source_label: "test".into(),
1593 child_session_id: Some(SessionId::new()),
1594 expect_peer_channel: true,
1595 };
1596 let op_id = spec.id.clone();
1597 registry.register_operation(spec).unwrap();
1598 registry.provisioning_succeeded(&op_id).unwrap();
1599
1600 let snap1 = registry.snapshot(&op_id).unwrap();
1601 assert!(snap1.peer_handle.is_none());
1602
1603 let handle = OperationPeerHandle {
1604 peer_name: "member-x".into(),
1605 trusted_peer: TrustedPeerSpec::new("member-x", "peer-id", "inproc://x").unwrap(),
1606 };
1607 registry.peer_ready(&op_id, handle).unwrap();
1608
1609 let snap2 = registry.snapshot(&op_id).unwrap();
1610 assert_eq!(snap2.peer_handle.as_ref().unwrap().peer_name, "member-x");
1611 }
1612}