1use std::collections::{HashMap, HashSet, VecDeque};
19use std::future::Future;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
22use std::task::{Context, Poll};
23
24use meerkat_core::completion_feed::{
25 CompletionBatch, CompletionEntry, CompletionFeed, CompletionSeq,
26};
27
28#[cfg(target_arch = "wasm32")]
29use crate::tokio;
30use meerkat_core::lifecycle::{RunId, WaitRequestId};
31use meerkat_core::ops_lifecycle::{
32 DEFAULT_MAX_COMPLETED, OperationCompletionWatch, OperationId, OperationKind,
33 OperationLifecycleSnapshot, OperationPeerHandle, OperationProgressUpdate, OperationResult,
34 OperationSpec, OperationStatus, OperationTerminalOutcome, OpsLifecycleError,
35 OpsLifecycleRegistry, WaitAllResult, WaitAllSatisfied,
36};
37use meerkat_core::time_compat::{Instant, SystemTime, UNIX_EPOCH};
38
39use crate::meerkat_machine::dsl as mm_dsl;
40
41#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
51pub struct OperationCanonicalState {
52 status: OperationStatus,
53 kind: OperationKind,
54 peer_ready: bool,
55 progress_count: u32,
56 watcher_count: u32,
57 terminal_outcome: Option<OperationTerminalOutcome>,
58 terminal_buffered: bool,
59}
60
61#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
63pub struct RegistryCanonicalState {
64 operations: HashMap<OperationId, OperationCanonicalState>,
65 completed_order: VecDeque<OperationId>,
66 max_completed: usize,
67 max_concurrent: Option<usize>,
68 active_count: usize,
69 wait_request_id: Option<WaitRequestId>,
70 wait_operation_ids: Vec<OperationId>,
71 next_completion_seq: CompletionSeq,
72}
73
74impl RegistryCanonicalState {
75 pub fn max_completed(&self) -> usize {
77 self.max_completed
78 }
79
80 pub fn max_concurrent(&self) -> Option<usize> {
82 self.max_concurrent
83 }
84
85 pub fn operation_count(&self) -> usize {
87 self.operations.len()
88 }
89}
90
91#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
102pub struct PersistedOpsSnapshot {
103 pub epoch_id: meerkat_core::RuntimeEpochId,
105 pub authority_state: RegistryCanonicalState,
107 pub operation_specs: HashMap<OperationId, meerkat_core::ops_lifecycle::OperationSpec>,
109 pub completion_entries: Vec<CompletionEntry>,
111 pub cursors: meerkat_core::EpochCursorSnapshot,
113}
114
115#[derive(Debug)]
116pub struct OpsLifecyclePersistenceRequest {
117 snapshot: PersistedOpsSnapshot,
118 result_tx: std::sync::mpsc::SyncSender<Result<(), OpsLifecycleError>>,
119}
120
121impl OpsLifecyclePersistenceRequest {
122 pub fn snapshot(&self) -> &PersistedOpsSnapshot {
123 &self.snapshot
124 }
125
126 pub fn complete(self, result: Result<(), OpsLifecycleError>) {
127 let _ = self.result_tx.send(result);
128 }
129}
130
131#[derive(Debug)]
140struct FeedBufferInner {
141 entries: VecDeque<CompletionEntry>,
142 watermark: CompletionSeq,
143 max_retained: usize,
144}
145
146#[derive(Debug)]
151struct FeedBuffer {
152 inner: RwLock<FeedBufferInner>,
153 watermark_atomic: AtomicU64,
155 notify: tokio::sync::Notify,
157}
158
159impl FeedBuffer {
160 fn new(max_retained: usize) -> Self {
161 Self {
162 inner: RwLock::new(FeedBufferInner {
163 entries: VecDeque::new(),
164 watermark: 0,
165 max_retained,
166 }),
167 watermark_atomic: AtomicU64::new(0),
168 notify: tokio::sync::Notify::new(),
169 }
170 }
171
172 fn push(&self, entry: CompletionEntry) {
173 let mut inner = self
174 .inner
175 .write()
176 .unwrap_or_else(std::sync::PoisonError::into_inner);
177 let seq = entry.seq;
178 inner.entries.push_back(entry);
179 inner.watermark = seq;
180
181 while inner.entries.len() > inner.max_retained {
183 inner.entries.pop_front();
184 }
185
186 drop(inner);
187
188 self.watermark_atomic.store(seq, Ordering::Release);
189 self.notify.notify_waiters();
190 }
191}
192
193#[derive(Debug, Clone)]
198pub struct RuntimeCompletionFeed {
199 buffer: Arc<FeedBuffer>,
200}
201
202impl CompletionFeed for RuntimeCompletionFeed {
203 fn watermark(&self) -> CompletionSeq {
204 self.buffer.watermark_atomic.load(Ordering::Acquire)
205 }
206
207 fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch {
208 let inner = self
209 .buffer
210 .inner
211 .read()
212 .unwrap_or_else(std::sync::PoisonError::into_inner);
213 let entries: Vec<CompletionEntry> = inner
214 .entries
215 .iter()
216 .filter(|e| e.seq > after_seq)
217 .cloned()
218 .collect();
219 let watermark = inner.watermark;
220 CompletionBatch { entries, watermark }
221 }
222
223 fn wait_for_advance(
224 &self,
225 after_seq: CompletionSeq,
226 ) -> std::pin::Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>> {
227 Box::pin(async move {
228 loop {
229 let notified = self.buffer.notify.notified();
234 let current = self.buffer.watermark_atomic.load(Ordering::Acquire);
235 if current > after_seq {
236 return current;
237 }
238 notified.await;
239 }
240 })
241 }
242}
243
244#[derive(Debug)]
252struct ShellRecord {
253 spec: OperationSpec,
254 peer_handle: Option<OperationPeerHandle>,
255 watchers: Vec<tokio::sync::oneshot::Sender<OperationTerminalOutcome>>,
256 created_at: Instant,
258 started_at: Option<Instant>,
259 completed_at: Option<Instant>,
260 created_at_wall: SystemTime,
262}
263
264#[derive(Debug)]
265struct PendingWaitState {
266 wait_request_id: WaitRequestId,
267 sender: tokio::sync::oneshot::Sender<WaitAllSatisfied>,
268}
269
270enum WaitAllAuthorityPlan {
271 AlreadySatisfied(WaitAllSatisfied),
272 ActivateBarrier,
273}
274
275impl ShellRecord {
276 fn new(spec: OperationSpec) -> Self {
277 Self {
278 spec,
279 peer_handle: None,
280 watchers: Vec::new(),
281 created_at: Instant::now(),
282 started_at: None,
283 completed_at: None,
284 created_at_wall: SystemTime::now(),
285 }
286 }
287
288 fn epoch_millis(wall_anchor: &SystemTime) -> u64 {
289 wall_anchor
290 .duration_since(UNIX_EPOCH)
291 .map(|d| d.as_millis() as u64)
292 .unwrap_or(0)
293 }
294
295 fn epoch_millis_for_instant(&self, instant: Instant) -> u64 {
296 let offset = instant.saturating_duration_since(self.created_at);
299 let wall = self.created_at_wall + offset;
300 Self::epoch_millis(&wall)
301 }
302
303 fn notify_watchers(&mut self, outcome: &OperationTerminalOutcome) {
305 for watcher in std::mem::take(&mut self.watchers) {
306 let _ = watcher.send(outcome.clone());
307 }
308 }
309
310 fn mark_completed(&mut self) {
312 self.completed_at = Some(Instant::now());
313 }
314}
315
316#[derive(Debug)]
321struct ShellState {
322 dsl: DslAuthority,
324 records: HashMap<OperationId, ShellRecord>,
326 pending_wait: Option<PendingWaitState>,
328 completed_order: VecDeque<OperationId>,
330 max_completed: usize,
332 max_concurrent: Option<usize>,
334 wait_request_id: Option<WaitRequestId>,
340 next_completion_seq: CompletionSeq,
342 feed_buffer: Arc<FeedBuffer>,
344 persist_tx: Option<crate::tokio::sync::mpsc::UnboundedSender<OpsLifecyclePersistenceRequest>>,
346 persist_epoch_id: Option<meerkat_core::RuntimeEpochId>,
348 persist_cursor_state: Option<Arc<meerkat_core::EpochCursorState>>,
350}
351
352struct DslAuthority(mm_dsl::MeerkatMachineAuthority);
358
359impl std::fmt::Debug for DslAuthority {
360 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361 f.debug_struct("DslAuthority")
362 .field("state", &self.0.state)
363 .finish()
364 }
365}
366
367fn new_ops_dsl_authority() -> DslAuthority {
372 let state = mm_dsl::MeerkatMachineState {
373 lifecycle_phase: mm_dsl::MeerkatPhase::Idle,
374 ..mm_dsl::MeerkatMachineState::default()
375 };
376 DslAuthority(mm_dsl::MeerkatMachineAuthority::from_state(state))
377}
378
379impl ShellState {
380 fn new(max_completed: usize, max_concurrent: Option<usize>) -> Self {
381 Self {
382 dsl: new_ops_dsl_authority(),
383 records: HashMap::new(),
384 pending_wait: None,
385 completed_order: VecDeque::new(),
386 max_completed,
387 max_concurrent,
388 wait_request_id: None,
389 next_completion_seq: 0,
390 feed_buffer: Arc::new(FeedBuffer::new(max_completed.saturating_mul(4).max(1024))),
395 persist_tx: None,
396 persist_epoch_id: None,
397 persist_cursor_state: None,
398 }
399 }
400
401 fn dsl_apply(
408 &mut self,
409 input: mm_dsl::MeerkatMachineInput,
410 context: &str,
411 ) -> Result<(), OpsLifecycleError> {
412 self.dsl_apply_raw(input).map_err(|err| {
413 OpsLifecycleError::Internal(format!("DSL rejected ops transition ({context}): {err:?}"))
414 })
415 }
416
417 fn dsl_apply_raw(
425 &mut self,
426 input: mm_dsl::MeerkatMachineInput,
427 ) -> Result<(), mm_dsl::MeerkatMachineTransitionError> {
428 mm_dsl::MeerkatMachineMutator::apply(&mut self.dsl.0, input).map(|_transition| ())
429 }
430
431 fn split_outcome(
445 outcome: &OperationTerminalOutcome,
446 ) -> (mm_dsl::OperationTerminalOutcomeKind, String) {
447 match outcome {
448 OperationTerminalOutcome::Completed(result) => (
449 mm_dsl::OperationTerminalOutcomeKind::Completed,
450 serde_json::to_string(result).unwrap_or_default(),
451 ),
452 OperationTerminalOutcome::Failed { error } => (
453 mm_dsl::OperationTerminalOutcomeKind::Failed,
454 serde_json::to_string(error).unwrap_or_default(),
455 ),
456 OperationTerminalOutcome::Aborted { reason } => (
457 mm_dsl::OperationTerminalOutcomeKind::Aborted,
458 serde_json::to_string(reason).unwrap_or_default(),
459 ),
460 OperationTerminalOutcome::Cancelled { reason } => (
461 mm_dsl::OperationTerminalOutcomeKind::Cancelled,
462 serde_json::to_string(reason).unwrap_or_default(),
463 ),
464 OperationTerminalOutcome::Retired => {
465 (mm_dsl::OperationTerminalOutcomeKind::Retired, String::new())
466 }
467 OperationTerminalOutcome::Terminated { reason } => (
468 mm_dsl::OperationTerminalOutcomeKind::Terminated,
469 serde_json::to_string(reason).unwrap_or_default(),
470 ),
471 }
472 }
473
474 fn status(&self, id: &OperationId) -> Option<OperationStatus> {
476 let id_key = mm_dsl::OperationId::from_domain(id).0;
477 self.dsl
478 .0
479 .state
480 .op_statuses
481 .get(&id_key)
482 .copied()
483 .map(OperationStatus::from)
484 }
485
486 fn kind(&self, id: &OperationId) -> Option<OperationKind> {
488 let id_key = mm_dsl::OperationId::from_domain(id).0;
489 self.dsl
490 .0
491 .state
492 .op_kinds
493 .get(&id_key)
494 .copied()
495 .map(OperationKind::from)
496 }
497
498 fn peer_ready(&self, id: &OperationId) -> Option<bool> {
500 let id_key = mm_dsl::OperationId::from_domain(id).0;
501 self.dsl.0.state.op_peer_ready.get(&id_key).copied()
502 }
503
504 fn progress_count(&self, id: &OperationId) -> Option<u32> {
506 let id_key = mm_dsl::OperationId::from_domain(id).0;
507 self.dsl
508 .0
509 .state
510 .op_progress_counts
511 .get(&id_key)
512 .map(|v| (*v).min(u32::MAX as u64) as u32)
513 }
514
515 fn terminal_outcome(&self, id: &OperationId) -> Option<OperationTerminalOutcome> {
519 let id_key = mm_dsl::OperationId::from_domain(id).0;
520 let kind = self
521 .dsl
522 .0
523 .state
524 .op_terminal_outcomes
525 .get(&id_key)
526 .copied()?;
527 let payload = self
528 .dsl
529 .0
530 .state
531 .op_terminal_payload
532 .get(&id_key)
533 .map(String::as_str)
534 .unwrap_or("");
535 match kind {
536 mm_dsl::OperationTerminalOutcomeKind::Completed => {
537 let result = serde_json::from_str::<OperationResult>(payload).ok()?;
538 Some(OperationTerminalOutcome::Completed(result))
539 }
540 mm_dsl::OperationTerminalOutcomeKind::Failed => {
541 let error = serde_json::from_str::<String>(payload).unwrap_or_default();
542 Some(OperationTerminalOutcome::Failed { error })
543 }
544 mm_dsl::OperationTerminalOutcomeKind::Aborted => {
545 let reason = serde_json::from_str::<Option<String>>(payload)
546 .ok()
547 .flatten();
548 Some(OperationTerminalOutcome::Aborted { reason })
549 }
550 mm_dsl::OperationTerminalOutcomeKind::Cancelled => {
551 let reason = serde_json::from_str::<Option<String>>(payload)
552 .ok()
553 .flatten();
554 Some(OperationTerminalOutcome::Cancelled { reason })
555 }
556 mm_dsl::OperationTerminalOutcomeKind::Retired => {
557 Some(OperationTerminalOutcome::Retired)
558 }
559 mm_dsl::OperationTerminalOutcomeKind::Terminated => {
560 let reason = serde_json::from_str::<String>(payload).unwrap_or_default();
561 Some(OperationTerminalOutcome::Terminated { reason })
562 }
563 }
564 }
565
566 fn contains(&self, id: &OperationId) -> bool {
568 let id_key = mm_dsl::OperationId::from_domain(id).0;
569 self.dsl.0.state.op_statuses.contains_key(&id_key)
570 }
571
572 fn active_count(&self) -> usize {
574 self.dsl.0.state.active_op_count as usize
575 }
576
577 fn operation_count(&self) -> usize {
579 self.dsl.0.state.op_statuses.len()
580 }
581
582 fn operation_ids(&self) -> Vec<OperationId> {
584 self.dsl
585 .0
586 .state
587 .op_statuses
588 .keys()
589 .filter_map(|k| serde_json::from_str::<OperationId>(k).ok())
590 .collect()
591 }
592
593 fn next_seq(&mut self) -> CompletionSeq {
595 self.next_completion_seq = self.next_completion_seq.saturating_add(1);
596 self.next_completion_seq
597 }
598
599 fn snapshot(&self, id: &OperationId) -> Option<OperationLifecycleSnapshot> {
601 let shell = self.records.get(id)?;
602 let kind = self.kind(id)?;
603 let status = self.status(id)?;
604 let peer_ready = self.peer_ready(id).unwrap_or(false);
605 let progress_count = self.progress_count(id).unwrap_or(0);
606 let terminal_outcome = self.terminal_outcome(id);
607
608 let created_at_ms = ShellRecord::epoch_millis(&shell.created_at_wall);
609 let started_at_ms = shell.started_at.map(|i| shell.epoch_millis_for_instant(i));
610 let completed_at_ms = shell
611 .completed_at
612 .map(|i| shell.epoch_millis_for_instant(i));
613 let elapsed_ms = shell.completed_at.map(|completed| {
614 completed
615 .saturating_duration_since(shell.created_at)
616 .as_millis() as u64
617 });
618
619 Some(OperationLifecycleSnapshot {
620 id: shell.spec.id.clone(),
621 kind,
622 display_name: shell.spec.display_name.clone(),
623 status,
624 peer_ready,
625 progress_count,
626 watcher_count: shell.watchers.len() as u32,
627 terminal_outcome,
628 child_session_id: shell.spec.child_session_id.clone(),
629 peer_handle: shell.peer_handle.clone(),
630 created_at_ms,
631 started_at_ms,
632 completed_at_ms,
633 elapsed_ms,
634 })
635 }
636
637 fn finalize_terminal(&mut self, id: &OperationId) {
641 let outcome = match self.terminal_outcome(id) {
642 Some(o) => o,
643 None => return,
644 };
645 let kind = self.kind(id);
646
647 if let Some(shell) = self.records.get_mut(id) {
649 shell.notify_watchers(&outcome);
650 shell.mark_completed();
651 }
652
653 let seq = self.next_seq();
655 let display_name = self
656 .records
657 .get(id)
658 .map(|r| r.spec.display_name.clone())
659 .unwrap_or_default();
660 let completed_at_ms = self
661 .records
662 .get(id)
663 .and_then(|r| r.completed_at.map(|i| r.epoch_millis_for_instant(i)));
664 let kind_for_entry = kind.unwrap_or(OperationKind::BackgroundToolOp);
665 self.feed_buffer.push(CompletionEntry {
666 seq,
667 operation_id: id.clone(),
668 kind: kind_for_entry,
669 display_name,
670 terminal_outcome: outcome,
671 completed_at_ms,
672 });
673
674 self.completed_order.push_back(id.clone());
676 while self.completed_order.len() > self.max_completed {
677 if let Some(evicted) = self.completed_order.pop_front() {
678 let evicted_key = mm_dsl::OperationId::from_domain(&evicted).0;
679 self.dsl.0.state.op_statuses.remove(&evicted_key);
680 self.dsl.0.state.op_kinds.remove(&evicted_key);
681 self.dsl.0.state.op_peer_ready.remove(&evicted_key);
682 self.dsl.0.state.op_progress_counts.remove(&evicted_key);
683 self.dsl.0.state.op_terminal_outcomes.remove(&evicted_key);
684 self.dsl.0.state.op_terminal_payload.remove(&evicted_key);
685 self.dsl.0.state.op_completion_seq.remove(&evicted_key);
686 self.records.remove(&evicted);
687 }
688 }
689
690 self.maybe_satisfy_wait();
692 }
693
694 fn wait_operation_ids(&self) -> Vec<OperationId> {
696 self.dsl
697 .0
698 .state
699 .wait_operation_ids
700 .iter()
701 .filter_map(|k| serde_json::from_str::<OperationId>(k).ok())
702 .collect()
703 }
704
705 fn wait_active(&self) -> bool {
707 self.dsl.0.state.wait_active
708 }
709
710 fn begin_wait_all_authority(
711 &mut self,
712 wait_request_id: &WaitRequestId,
713 operation_ids: &[OperationId],
714 ) -> Result<WaitAllAuthorityPlan, OpsLifecycleError> {
715 let mut seen = HashSet::new();
716 for operation_id in operation_ids {
717 if !seen.insert(operation_id.clone()) {
718 return Err(OpsLifecycleError::DuplicateWaitOperation(
719 operation_id.clone(),
720 ));
721 }
722 }
723
724 if self.wait_active() {
725 return Err(OpsLifecycleError::WaitAlreadyActive);
726 }
727
728 for operation_id in operation_ids {
729 if !self.contains(operation_id) {
730 return Err(OpsLifecycleError::NotFound(operation_id.clone()));
731 }
732 }
733
734 let all_terminal = operation_ids.iter().all(|operation_id| {
735 self.status(operation_id)
736 .is_some_and(OperationStatus::is_terminal)
737 });
738 if all_terminal {
739 return Ok(WaitAllAuthorityPlan::AlreadySatisfied(WaitAllSatisfied {
740 wait_request_id: wait_request_id.clone(),
741 operation_ids: operation_ids.to_vec(),
742 }));
743 }
744
745 let dsl_ids: std::collections::BTreeSet<String> = operation_ids
746 .iter()
747 .map(|id| mm_dsl::OperationId::from_domain(id).0)
748 .collect();
749 let dsl_id_tokens: std::collections::BTreeSet<mm_dsl::OperationId> = operation_ids
750 .iter()
751 .map(mm_dsl::OperationId::from_domain)
752 .collect();
753 self.dsl_apply(
754 mm_dsl::MeerkatMachineInput::RequestWaitAll {
755 wait_request_id: mm_dsl::WaitRequestId::from_domain(wait_request_id),
756 operation_ids: dsl_ids,
757 operation_id_tokens: dsl_id_tokens,
758 },
759 "RequestWaitAll",
760 )?;
761 Ok(WaitAllAuthorityPlan::ActivateBarrier)
762 }
763
764 fn owner_termination_targets(&self) -> Vec<(OperationId, OperationStatus)> {
765 self.operation_ids()
766 .into_iter()
767 .filter_map(|id| self.status(&id).map(|status| (id, status)))
768 .filter(|(_, status)| !status.is_terminal())
769 .collect()
770 }
771
772 fn maybe_satisfy_wait(&mut self) {
791 let ids = self.wait_operation_ids();
795 let Some(dsl_wait_request_id) = self.dsl.0.state.wait_request_id.clone() else {
796 return;
797 };
798 let dsl_operation_id_tokens = self.dsl.0.state.wait_operation_id_tokens.clone();
799 if self
800 .dsl_apply_raw(mm_dsl::MeerkatMachineInput::SatisfyWaitAll {
801 wait_request_id: dsl_wait_request_id,
802 operation_id_tokens: dsl_operation_id_tokens,
803 })
804 .is_err()
805 {
806 return;
810 }
811 let wait_id = match self.wait_request_id.take() {
812 Some(id) => id,
813 None => return,
814 };
815 if let Some(pending) = self.pending_wait.take() {
816 if pending.wait_request_id == wait_id {
817 let _ = pending.sender.send(WaitAllSatisfied {
818 wait_request_id: wait_id,
819 operation_ids: ids,
820 });
821 } else {
822 self.pending_wait = Some(pending);
823 }
824 }
825 }
826
827 fn maybe_persist(&self) -> Result<(), OpsLifecycleError> {
833 let (tx, epoch_id, cursor_state) = match (
834 &self.persist_tx,
835 &self.persist_epoch_id,
836 &self.persist_cursor_state,
837 ) {
838 (Some(tx), Some(epoch_id), Some(cs)) => (tx, epoch_id, cs),
839 _ => return Ok(()),
840 };
841
842 let snapshot = self.capture_snapshot(epoch_id.clone(), cursor_state);
843 let (result_tx, result_rx) = std::sync::mpsc::sync_channel(1);
844 let request = OpsLifecyclePersistenceRequest {
845 snapshot,
846 result_tx,
847 };
848
849 tx.send(request).map_err(|_| {
850 OpsLifecycleError::Internal(
851 "ops lifecycle persistence channel closed before terminal snapshot could be queued"
852 .into(),
853 )
854 })?;
855 result_rx.recv().map_err(|_| {
856 OpsLifecycleError::Internal(
857 "ops lifecycle persistence worker dropped terminal snapshot before confirming durability"
858 .into(),
859 )
860 })?
861 }
862
863 fn capture_snapshot(
865 &self,
866 epoch_id: meerkat_core::RuntimeEpochId,
867 cursor_state: &meerkat_core::EpochCursorState,
868 ) -> PersistedOpsSnapshot {
869 let operation_specs: HashMap<OperationId, OperationSpec> = self
870 .records
871 .iter()
872 .map(|(id, record)| (id.clone(), record.spec.clone()))
873 .collect();
874
875 let completion_entries = {
876 let inner = self
877 .feed_buffer
878 .inner
879 .read()
880 .unwrap_or_else(std::sync::PoisonError::into_inner);
881 inner.entries.iter().cloned().collect()
882 };
883
884 let mut operations: HashMap<OperationId, OperationCanonicalState> = HashMap::new();
885 for op_id in self.operation_ids() {
886 let Some(status) = self.status(&op_id) else {
887 continue;
888 };
889 let Some(kind) = self.kind(&op_id) else {
890 continue;
891 };
892 let peer_ready = self.peer_ready(&op_id).unwrap_or(false);
893 let progress_count = self.progress_count(&op_id).unwrap_or(0);
894 let terminal_outcome = self.terminal_outcome(&op_id);
895 let terminal_buffered = terminal_outcome.is_some();
896 let watcher_count = self
897 .records
898 .get(&op_id)
899 .map(|r| r.watchers.len() as u32)
900 .unwrap_or(0);
901 operations.insert(
902 op_id,
903 OperationCanonicalState {
904 status,
905 kind,
906 peer_ready,
907 progress_count,
908 watcher_count,
909 terminal_outcome,
910 terminal_buffered,
911 },
912 );
913 }
914
915 let authority_state = RegistryCanonicalState {
916 operations,
917 completed_order: self.completed_order.clone(),
918 max_completed: self.max_completed,
919 max_concurrent: self.max_concurrent,
920 active_count: self.active_count(),
921 wait_request_id: self.wait_request_id.clone(),
922 wait_operation_ids: self.wait_operation_ids(),
923 next_completion_seq: self.next_completion_seq,
924 };
925
926 PersistedOpsSnapshot {
927 epoch_id,
928 authority_state,
929 operation_specs,
930 completion_entries,
931 cursors: cursor_state.snapshot(),
932 }
933 }
934
935 fn shell_record_mut(
936 &mut self,
937 id: &OperationId,
938 ) -> Result<&mut ShellRecord, OpsLifecycleError> {
939 self.records
940 .get_mut(id)
941 .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))
942 }
943
944 fn collect_wait_outcomes(
945 &self,
946 operation_ids: &[OperationId],
947 ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
948 operation_ids
949 .iter()
950 .map(|operation_id| {
951 let outcome = self.terminal_outcome(operation_id).ok_or_else(|| {
952 OpsLifecycleError::Internal(format!(
953 "wait_all completed without terminal outcome for {operation_id}"
954 ))
955 })?;
956 Ok((operation_id.clone(), outcome))
957 })
958 .collect()
959 }
960}
961
962impl Default for ShellState {
963 fn default() -> Self {
964 Self::new(DEFAULT_MAX_COMPLETED, None)
965 }
966}
967
968#[derive(Debug, Clone)]
974pub struct OpsLifecycleConfig {
975 pub max_completed: usize,
977 pub max_concurrent: Option<usize>,
979}
980
981impl Default for OpsLifecycleConfig {
982 fn default() -> Self {
983 Self {
984 max_completed: DEFAULT_MAX_COMPLETED,
985 max_concurrent: None,
986 }
987 }
988}
989
990#[derive(Debug)]
997pub struct RuntimeOpsLifecycleRegistry {
998 state: RwLock<ShellState>,
999}
1000
1001#[derive(Debug, Clone)]
1002pub(crate) struct RuntimeOpsDiagnosticSnapshot {
1003 pub operation_count: usize,
1004 pub active_count: usize,
1005 pub wait_request_id: Option<WaitRequestId>,
1006 pub pending_wait_present: bool,
1007 pub pending_wait_request_id: Option<WaitRequestId>,
1008 pub wait_operation_ids: Vec<OperationId>,
1009 pub operations: Vec<OperationLifecycleSnapshot>,
1010}
1011
1012impl Default for RuntimeOpsLifecycleRegistry {
1013 fn default() -> Self {
1014 Self {
1015 state: RwLock::new(ShellState::default()),
1016 }
1017 }
1018}
1019
1020impl RuntimeOpsLifecycleRegistry {
1021 pub fn new() -> Self {
1022 Self::default()
1023 }
1024
1025 pub fn with_config(config: OpsLifecycleConfig) -> Self {
1026 Self {
1027 state: RwLock::new(ShellState::new(config.max_completed, config.max_concurrent)),
1028 }
1029 }
1030
1031 pub fn set_persistence_channel(
1037 &self,
1038 tx: crate::tokio::sync::mpsc::UnboundedSender<OpsLifecyclePersistenceRequest>,
1039 epoch_id: meerkat_core::RuntimeEpochId,
1040 cursor_state: Arc<meerkat_core::EpochCursorState>,
1041 ) {
1042 if let Ok(mut state) = self.state.write() {
1043 state.persist_tx = Some(tx);
1044 state.persist_epoch_id = Some(epoch_id);
1045 state.persist_cursor_state = Some(cursor_state);
1046 }
1047 }
1048
1049 pub fn from_recovered(snapshot: PersistedOpsSnapshot) -> Self {
1055 let mut shell = ShellState::new(
1056 snapshot.authority_state.max_completed,
1057 snapshot.authority_state.max_concurrent,
1058 );
1059
1060 let mut retained_ids: HashSet<OperationId> = HashSet::new();
1062 for (op_id, op_state) in snapshot.authority_state.operations {
1063 if !op_state.status.is_terminal() {
1064 continue;
1065 }
1066 let id_key = mm_dsl::OperationId::from_domain(&op_id).0;
1067 shell.dsl.0.state.op_statuses.insert(
1068 id_key.clone(),
1069 mm_dsl::OperationStatus::from(op_state.status),
1070 );
1071 shell
1072 .dsl
1073 .0
1074 .state
1075 .op_kinds
1076 .insert(id_key.clone(), mm_dsl::OperationKind::from(op_state.kind));
1077 shell
1078 .dsl
1079 .0
1080 .state
1081 .op_peer_ready
1082 .insert(id_key.clone(), op_state.peer_ready);
1083 shell
1084 .dsl
1085 .0
1086 .state
1087 .op_progress_counts
1088 .insert(id_key.clone(), op_state.progress_count as u64);
1089 if let Some(outcome) = op_state.terminal_outcome.as_ref() {
1090 let (kind, payload) = ShellState::split_outcome(outcome);
1091 shell
1092 .dsl
1093 .0
1094 .state
1095 .op_terminal_outcomes
1096 .insert(id_key.clone(), kind);
1097 shell
1098 .dsl
1099 .0
1100 .state
1101 .op_terminal_payload
1102 .insert(id_key.clone(), payload);
1103 }
1104 retained_ids.insert(op_id);
1105 }
1106 shell.dsl.0.state.active_op_count = 0;
1108
1109 shell.completed_order = snapshot
1111 .authority_state
1112 .completed_order
1113 .into_iter()
1114 .filter(|id| retained_ids.contains(id))
1115 .collect();
1116
1117 shell.next_completion_seq = snapshot.authority_state.next_completion_seq;
1120
1121 for entry in &snapshot.completion_entries {
1123 shell.feed_buffer.push(entry.clone());
1124 }
1125
1126 for (op_id, spec) in snapshot.operation_specs {
1129 if retained_ids.contains(&op_id) {
1130 shell.records.insert(
1131 op_id,
1132 ShellRecord {
1133 spec,
1134 peer_handle: None,
1135 watchers: Vec::new(),
1136 created_at: Instant::now(),
1137 started_at: None,
1138 completed_at: None,
1139 created_at_wall: SystemTime::now(),
1140 },
1141 );
1142 }
1143 }
1144
1145 Self {
1146 state: RwLock::new(shell),
1147 }
1148 }
1149
1150 pub fn capture_persistence_snapshot(
1156 &self,
1157 epoch_id: meerkat_core::RuntimeEpochId,
1158 cursor_state: &meerkat_core::EpochCursorState,
1159 ) -> PersistedOpsSnapshot {
1160 let state = self
1161 .state
1162 .read()
1163 .unwrap_or_else(std::sync::PoisonError::into_inner);
1164 state.capture_snapshot(epoch_id, cursor_state)
1165 }
1166
1167 pub fn completion_feed_handle(&self) -> Arc<dyn CompletionFeed> {
1169 let state = self
1170 .state
1171 .read()
1172 .unwrap_or_else(std::sync::PoisonError::into_inner);
1173 Arc::new(RuntimeCompletionFeed {
1174 buffer: Arc::clone(&state.feed_buffer),
1175 })
1176 }
1177
1178 pub(crate) fn diagnostic_snapshot(&self) -> RuntimeOpsDiagnosticSnapshot {
1180 let state = self
1181 .state
1182 .read()
1183 .unwrap_or_else(std::sync::PoisonError::into_inner);
1184 let mut operations = state
1185 .operation_ids()
1186 .into_iter()
1187 .filter_map(|id| state.snapshot(&id))
1188 .collect::<Vec<_>>();
1189 operations.sort_by(|left, right| left.display_name.cmp(&right.display_name));
1190 RuntimeOpsDiagnosticSnapshot {
1191 operation_count: state.operation_count(),
1192 active_count: state.active_count(),
1193 wait_request_id: state.wait_request_id.clone(),
1194 pending_wait_present: state.pending_wait.is_some(),
1195 pending_wait_request_id: state
1196 .pending_wait
1197 .as_ref()
1198 .map(|pending_wait| pending_wait.wait_request_id.clone()),
1199 wait_operation_ids: state.wait_operation_ids(),
1200 operations,
1201 }
1202 }
1203
1204 fn read_state(&self) -> Result<RwLockReadGuard<'_, ShellState>, OpsLifecycleError> {
1205 self.state
1206 .read()
1207 .map_err(|_| OpsLifecycleError::Internal("ops lifecycle registry poisoned".into()))
1208 }
1209
1210 fn write_state(&self) -> Result<RwLockWriteGuard<'_, ShellState>, OpsLifecycleError> {
1211 self.state
1212 .write()
1213 .map_err(|_| OpsLifecycleError::Internal("ops lifecycle registry poisoned".into()))
1214 }
1215
1216 fn cancel_wait_all_internal(
1217 &self,
1218 wait_request_id: &WaitRequestId,
1219 ) -> Result<(), OpsLifecycleError> {
1220 let mut state = self.write_state()?;
1221 match state.wait_request_id.as_ref() {
1222 Some(active) if active == wait_request_id => {
1223 state.wait_request_id = None;
1224 state.pending_wait = None;
1225 let _ = state.dsl_apply(
1232 mm_dsl::MeerkatMachineInput::CancelWaitAll,
1233 "CancelWaitAll(cancel)",
1234 );
1235 Ok(())
1236 }
1237 _ => {
1238 state.pending_wait = None;
1239 Ok(())
1240 }
1241 }
1242 }
1243}
1244
1245enum WaitAllFutureState {
1246 Ready(Option<Result<WaitAllResult, OpsLifecycleError>>),
1247 Waiting(tokio::sync::oneshot::Receiver<WaitAllSatisfied>),
1248 Done,
1249}
1250
1251struct WaitAllFuture<'a> {
1252 registry: &'a RuntimeOpsLifecycleRegistry,
1253 wait_request_id: WaitRequestId,
1254 operation_ids: Vec<OperationId>,
1255 state: WaitAllFutureState,
1256}
1257
1258impl Future for WaitAllFuture<'_> {
1259 type Output = Result<WaitAllResult, OpsLifecycleError>;
1260
1261 fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1262 match &mut self.state {
1263 WaitAllFutureState::Ready(result) => {
1264 let ready = result.take().unwrap_or_else(|| {
1265 Err(OpsLifecycleError::Internal(
1266 "wait_all future polled after completion".into(),
1267 ))
1268 });
1269 self.state = WaitAllFutureState::Done;
1270 Poll::Ready(ready)
1271 }
1272 WaitAllFutureState::Waiting(receiver) => match std::pin::Pin::new(receiver).poll(cx) {
1273 Poll::Pending => Poll::Pending,
1274 Poll::Ready(Ok(satisfied)) => {
1275 let outcomes = match self.registry.read_state() {
1276 Ok(state) => state.collect_wait_outcomes(&self.operation_ids),
1277 Err(err) => Err(err),
1278 };
1279 self.state = WaitAllFutureState::Done;
1280 Poll::Ready(outcomes.map(|outcomes| WaitAllResult {
1281 outcomes,
1282 satisfied,
1283 }))
1284 }
1285 Poll::Ready(Err(_)) => {
1286 self.state = WaitAllFutureState::Done;
1287 Poll::Ready(Err(OpsLifecycleError::Internal(
1288 "wait_all completion channel dropped".into(),
1289 )))
1290 }
1291 },
1292 WaitAllFutureState::Done => Poll::Ready(Err(OpsLifecycleError::Internal(
1293 "wait_all future polled after completion".into(),
1294 ))),
1295 }
1296 }
1297}
1298
1299impl Drop for WaitAllFuture<'_> {
1300 fn drop(&mut self) {
1301 if matches!(self.state, WaitAllFutureState::Waiting(_)) {
1302 let _ = self
1303 .registry
1304 .cancel_wait_all_internal(&self.wait_request_id);
1305 }
1306 }
1307}
1308
1309fn classify_op_rejection(
1334 err: mm_dsl::MeerkatMachineTransitionError,
1335 id: &OperationId,
1336 status: OperationStatus,
1337 action: &'static str,
1338) -> OpsLifecycleError {
1339 match err {
1340 mm_dsl::MeerkatMachineTransitionError::GuardRejected { .. } => {
1341 OpsLifecycleError::InvalidTransition {
1342 id: id.clone(),
1343 status,
1344 action,
1345 }
1346 }
1347 other => OpsLifecycleError::Internal(format!(
1348 "DSL rejected ops transition ({action}): {other:?}"
1349 )),
1350 }
1351}
1352
1353fn classify_peer_ready_rejection(
1365 state: &ShellState,
1366 err: mm_dsl::MeerkatMachineTransitionError,
1367 id: &OperationId,
1368 status: OperationStatus,
1369) -> OpsLifecycleError {
1370 match err {
1371 mm_dsl::MeerkatMachineTransitionError::GuardRejected { .. } => {
1372 let kind = state.kind(id);
1373 if kind != Some(OperationKind::MobMemberChild) {
1374 return OpsLifecycleError::PeerNotExpected(id.clone());
1375 }
1376 if state.peer_ready(id).unwrap_or(false) {
1377 return OpsLifecycleError::AlreadyPeerReady(id.clone());
1378 }
1379 OpsLifecycleError::InvalidTransition {
1380 id: id.clone(),
1381 status,
1382 action: "peer_ready",
1383 }
1384 }
1385 other => OpsLifecycleError::Internal(format!(
1386 "DSL rejected ops transition (peer_ready): {other:?}"
1387 )),
1388 }
1389}
1390
1391impl OpsLifecycleRegistry for RuntimeOpsLifecycleRegistry {
1392 fn register_operation(&self, spec: OperationSpec) -> Result<(), OpsLifecycleError> {
1393 let mut state = self.write_state()?;
1394 let operation_id = spec.id.clone();
1395 let kind = spec.kind;
1396
1397 if state.contains(&operation_id) {
1399 return Err(OpsLifecycleError::AlreadyRegistered(operation_id));
1400 }
1401 if let Some(limit) = state.max_concurrent
1403 && state.active_count() >= limit
1404 {
1405 return Err(OpsLifecycleError::MaxConcurrentExceeded {
1406 limit,
1407 active: state.active_count(),
1408 });
1409 }
1410
1411 state.dsl_apply(
1413 mm_dsl::MeerkatMachineInput::RegisterOp {
1414 operation_id: mm_dsl::OperationId::from_domain(&operation_id).0,
1415 kind: mm_dsl::OperationKind::from_domain(&kind),
1416 },
1417 "RegisterOp",
1418 )?;
1419
1420 state.records.insert(operation_id, ShellRecord::new(spec));
1422 Ok(())
1423 }
1424
1425 fn provisioning_succeeded(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
1426 let mut state = self.write_state()?;
1427
1428 let status = state
1429 .status(id)
1430 .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1431
1432 if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::StartOp {
1433 operation_id: mm_dsl::OperationId::from_domain(id).0,
1434 }) {
1435 return Err(classify_op_rejection(
1436 err,
1437 id,
1438 status,
1439 "provisioning_succeeded",
1440 ));
1441 }
1442
1443 if let Some(shell) = state.records.get_mut(id) {
1445 shell.started_at = Some(Instant::now());
1446 }
1447 Ok(())
1448 }
1449
1450 fn provisioning_failed(
1451 &self,
1452 id: &OperationId,
1453 error: String,
1454 ) -> Result<(), OpsLifecycleError> {
1455 let mut state = self.write_state()?;
1456
1457 let status = state
1458 .status(id)
1459 .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1460
1461 let terminal_outcome = OperationTerminalOutcome::Failed { error };
1462 let (outcome_kind, outcome_payload) = ShellState::split_outcome(&terminal_outcome);
1463
1464 if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::FailOp {
1465 operation_id: mm_dsl::OperationId::from_domain(id).0,
1466 outcome: outcome_kind,
1467 payload: outcome_payload,
1468 }) {
1469 return Err(classify_op_rejection(err, id, status, "fail_operation"));
1470 }
1471
1472 state.finalize_terminal(id);
1473 state.maybe_persist()?;
1474 Ok(())
1475 }
1476
1477 fn peer_ready(
1478 &self,
1479 id: &OperationId,
1480 peer: OperationPeerHandle,
1481 ) -> Result<(), OpsLifecycleError> {
1482 let mut state = self.write_state()?;
1483
1484 let status = state
1485 .status(id)
1486 .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1487
1488 if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::PeerReadyOp {
1493 operation_id: mm_dsl::OperationId::from_domain(id).0,
1494 }) {
1495 return Err(classify_peer_ready_rejection(&state, err, id, status));
1496 }
1497
1498 if let Some(shell) = state.records.get_mut(id) {
1500 shell.peer_handle = Some(peer);
1501 }
1502 Ok(())
1503 }
1504
1505 fn register_watcher(
1506 &self,
1507 id: &OperationId,
1508 ) -> Result<OperationCompletionWatch, OpsLifecycleError> {
1509 let mut state = self.write_state()?;
1510
1511 if !state.contains(id) {
1512 return Err(OpsLifecycleError::NotFound(id.clone()));
1513 }
1514
1515 if let Some(outcome) = state.terminal_outcome(id) {
1517 return Ok(OperationCompletionWatch::already_resolved(outcome));
1518 }
1519
1520 let shell = state.shell_record_mut(id)?;
1522 let (tx, watch) = OperationCompletionWatch::channel();
1523 shell.watchers.push(tx);
1524 Ok(watch)
1525 }
1526
1527 fn report_progress(
1528 &self,
1529 id: &OperationId,
1530 _update: OperationProgressUpdate,
1531 ) -> Result<(), OpsLifecycleError> {
1532 let mut state = self.write_state()?;
1533
1534 let status = state
1535 .status(id)
1536 .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1537
1538 if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::ProgressReportedOp {
1539 operation_id: mm_dsl::OperationId::from_domain(id).0,
1540 }) {
1541 return Err(classify_op_rejection(err, id, status, "report_progress"));
1542 }
1543 Ok(())
1544 }
1545
1546 fn complete_operation(
1547 &self,
1548 id: &OperationId,
1549 result: OperationResult,
1550 ) -> Result<(), OpsLifecycleError> {
1551 let mut state = self.write_state()?;
1552
1553 let status = state
1554 .status(id)
1555 .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1556
1557 let terminal_outcome = OperationTerminalOutcome::Completed(result);
1558 let (outcome_kind, outcome_payload) = ShellState::split_outcome(&terminal_outcome);
1559
1560 if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::CompleteOp {
1561 operation_id: mm_dsl::OperationId::from_domain(id).0,
1562 outcome: outcome_kind,
1563 payload: outcome_payload,
1564 }) {
1565 return Err(classify_op_rejection(err, id, status, "complete_operation"));
1566 }
1567
1568 state.finalize_terminal(id);
1569 state.maybe_persist()?;
1570 Ok(())
1571 }
1572
1573 fn fail_operation(&self, id: &OperationId, error: String) -> Result<(), OpsLifecycleError> {
1574 let mut state = self.write_state()?;
1575
1576 let status = state
1577 .status(id)
1578 .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1579
1580 let terminal_outcome = OperationTerminalOutcome::Failed { error };
1581 let (outcome_kind, outcome_payload) = ShellState::split_outcome(&terminal_outcome);
1582
1583 if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::FailOp {
1584 operation_id: mm_dsl::OperationId::from_domain(id).0,
1585 outcome: outcome_kind,
1586 payload: outcome_payload,
1587 }) {
1588 return Err(classify_op_rejection(err, id, status, "fail_operation"));
1589 }
1590
1591 state.finalize_terminal(id);
1592 state.maybe_persist()?;
1593 Ok(())
1594 }
1595
1596 fn abort_provisioning(
1597 &self,
1598 id: &OperationId,
1599 reason: Option<String>,
1600 ) -> Result<(), OpsLifecycleError> {
1601 let mut state = self.write_state()?;
1602
1603 let status = state
1604 .status(id)
1605 .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1606
1607 let terminal_outcome = OperationTerminalOutcome::Aborted { reason };
1608 let (outcome_kind, outcome_payload) = ShellState::split_outcome(&terminal_outcome);
1609
1610 if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::AbortOp {
1611 operation_id: mm_dsl::OperationId::from_domain(id).0,
1612 outcome: outcome_kind,
1613 payload: outcome_payload,
1614 }) {
1615 return Err(classify_op_rejection(err, id, status, "abort_provisioning"));
1616 }
1617
1618 state.finalize_terminal(id);
1619 state.maybe_persist()?;
1620 Ok(())
1621 }
1622
1623 fn cancel_operation(
1624 &self,
1625 id: &OperationId,
1626 reason: Option<String>,
1627 ) -> Result<(), OpsLifecycleError> {
1628 let mut state = self.write_state()?;
1629
1630 let status = state
1631 .status(id)
1632 .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1633
1634 let terminal_outcome = OperationTerminalOutcome::Cancelled { reason };
1635 let (outcome_kind, outcome_payload) = ShellState::split_outcome(&terminal_outcome);
1636
1637 if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::CancelOp {
1638 operation_id: mm_dsl::OperationId::from_domain(id).0,
1639 outcome: outcome_kind,
1640 payload: outcome_payload,
1641 }) {
1642 return Err(classify_op_rejection(err, id, status, "cancel_operation"));
1643 }
1644
1645 state.finalize_terminal(id);
1646 state.maybe_persist()?;
1647 Ok(())
1648 }
1649
1650 fn request_retire(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
1651 let mut state = self.write_state()?;
1652
1653 let status = state
1654 .status(id)
1655 .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1656
1657 if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::RetireRequestedOp {
1658 operation_id: mm_dsl::OperationId::from_domain(id).0,
1659 }) {
1660 return Err(classify_op_rejection(err, id, status, "request_retire"));
1661 }
1662 Ok(())
1663 }
1664
1665 fn mark_retired(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
1666 let mut state = self.write_state()?;
1667
1668 let status = state
1669 .status(id)
1670 .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
1671
1672 let terminal_outcome = OperationTerminalOutcome::Retired;
1673 let (outcome_kind, outcome_payload) = ShellState::split_outcome(&terminal_outcome);
1674
1675 if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::RetireCompletedOp {
1676 operation_id: mm_dsl::OperationId::from_domain(id).0,
1677 outcome: outcome_kind,
1678 payload: outcome_payload,
1679 }) {
1680 return Err(classify_op_rejection(err, id, status, "mark_retired"));
1681 }
1682
1683 state.finalize_terminal(id);
1684 state.maybe_persist()?;
1685 Ok(())
1686 }
1687
1688 fn snapshot(&self, id: &OperationId) -> Option<OperationLifecycleSnapshot> {
1689 self.read_state().ok().and_then(|state| state.snapshot(id))
1690 }
1691
1692 fn list_operations(&self) -> Vec<OperationLifecycleSnapshot> {
1693 let mut snapshots = self
1694 .read_state()
1695 .map(|state| {
1696 state
1697 .operation_ids()
1698 .into_iter()
1699 .filter_map(|id| state.snapshot(&id))
1700 .collect::<Vec<_>>()
1701 })
1702 .unwrap_or_default();
1703 snapshots.sort_by(|left, right| left.display_name.cmp(&right.display_name));
1704 snapshots
1705 }
1706
1707 fn terminate_owner(&self, reason: String) -> Result<(), OpsLifecycleError> {
1708 let mut state = self.write_state()?;
1709
1710 let to_terminate = state.owner_termination_targets();
1711
1712 for (op_id, status) in &to_terminate {
1713 let terminal_outcome = OperationTerminalOutcome::Terminated {
1714 reason: reason.clone(),
1715 };
1716 let (outcome_kind, outcome_payload) = ShellState::split_outcome(&terminal_outcome);
1717
1718 if let Err(err) = state.dsl_apply_raw(mm_dsl::MeerkatMachineInput::TerminateOp {
1719 operation_id: mm_dsl::OperationId::from_domain(op_id).0,
1720 outcome: outcome_kind,
1721 payload: outcome_payload,
1722 }) {
1723 return Err(classify_op_rejection(
1724 err,
1725 op_id,
1726 *status,
1727 "terminate_owner",
1728 ));
1729 }
1730
1731 state.finalize_terminal(op_id);
1732 }
1733
1734 if !to_terminate.is_empty() {
1735 state.maybe_persist()?;
1736 }
1737 Ok(())
1738 }
1739
1740 fn collect_completed(
1741 &self,
1742 ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
1743 let mut state = self.write_state()?;
1744
1745 let ids: Vec<OperationId> = state.completed_order.drain(..).collect();
1746 let mut collected = Vec::with_capacity(ids.len());
1747 for id in ids {
1748 let outcome = state.terminal_outcome(&id);
1749 let id_key = mm_dsl::OperationId::from_domain(&id).0;
1751 state.dsl.0.state.op_statuses.remove(&id_key);
1752 state.dsl.0.state.op_kinds.remove(&id_key);
1753 state.dsl.0.state.op_peer_ready.remove(&id_key);
1754 state.dsl.0.state.op_progress_counts.remove(&id_key);
1755 state.dsl.0.state.op_terminal_outcomes.remove(&id_key);
1756 state.dsl.0.state.op_terminal_payload.remove(&id_key);
1757 state.dsl.0.state.op_completion_seq.remove(&id_key);
1758 state.records.remove(&id);
1759 if let Some(outcome) = outcome {
1760 collected.push((id, outcome));
1761 }
1762 }
1763 Ok(collected)
1764 }
1765
1766 fn completion_feed(&self) -> Option<Arc<dyn CompletionFeed>> {
1767 Some(self.completion_feed_handle())
1768 }
1769
1770 fn wait_all(
1771 &self,
1772 _run_id: &RunId,
1773 ids: &[OperationId],
1774 ) -> std::pin::Pin<
1775 Box<dyn std::future::Future<Output = Result<WaitAllResult, OpsLifecycleError>> + Send + '_>,
1776 > {
1777 let wait_request_id = WaitRequestId::new();
1778 let owned_ids = ids.to_vec();
1779
1780 let state = match self.write_state() {
1781 Ok(mut state) => {
1782 match state.begin_wait_all_authority(&wait_request_id, &owned_ids) {
1783 Ok(WaitAllAuthorityPlan::AlreadySatisfied(satisfied)) => {
1784 let outcomes =
1785 state
1786 .collect_wait_outcomes(&owned_ids)
1787 .map(|outcomes| WaitAllResult {
1788 outcomes,
1789 satisfied,
1790 });
1791 WaitAllFutureState::Ready(Some(outcomes))
1792 }
1793 Ok(WaitAllAuthorityPlan::ActivateBarrier) => {
1794 state.wait_request_id = Some(wait_request_id.clone());
1795
1796 if state.pending_wait.is_some() {
1797 state.wait_request_id = None;
1803 let _ = state.dsl_apply(
1804 mm_dsl::MeerkatMachineInput::CancelWaitAll,
1805 "CancelWaitAll(rollback)",
1806 );
1807 return Box::pin(WaitAllFuture {
1808 registry: self,
1809 wait_request_id,
1810 operation_ids: owned_ids,
1811 state: WaitAllFutureState::Ready(Some(Err(
1812 OpsLifecycleError::Internal(
1813 "wait_all started while a pending wait sender already existed"
1814 .into(),
1815 ),
1816 ))),
1817 });
1818 }
1819 let (sender, receiver) = tokio::sync::oneshot::channel();
1820 state.pending_wait = Some(PendingWaitState {
1821 wait_request_id: wait_request_id.clone(),
1822 sender,
1823 });
1824 WaitAllFutureState::Waiting(receiver)
1825 }
1826 Err(err) => WaitAllFutureState::Ready(Some(Err(err))),
1827 }
1828 }
1829 Err(err) => WaitAllFutureState::Ready(Some(Err(err))),
1830 };
1831
1832 Box::pin(WaitAllFuture {
1833 registry: self,
1834 wait_request_id,
1835 operation_ids: owned_ids,
1836 state,
1837 })
1838 }
1839}
1840
1841#[cfg(test)]
1842#[allow(clippy::unwrap_used, clippy::panic)]
1843mod tests {
1844 use super::*;
1845 use meerkat_core::comms::{PeerId, TrustedPeerDescriptor};
1846 use meerkat_core::lifecycle::RunId;
1847 use meerkat_core::ops_lifecycle::{OperationKind, OpsLifecycleRegistry};
1848 use meerkat_core::types::SessionId;
1849 use std::sync::atomic::Ordering;
1850 use uuid::Uuid;
1851
1852 fn test_run_id() -> RunId {
1853 RunId(Uuid::from_u128(1))
1854 }
1855
1856 fn background_spec(name: &str) -> OperationSpec {
1857 OperationSpec {
1858 id: OperationId::new(),
1859 kind: OperationKind::BackgroundToolOp,
1860 owner_session_id: SessionId::new(),
1861 display_name: name.into(),
1862 source_label: "test".into(),
1863 child_session_id: None,
1864 expect_peer_channel: false,
1865 }
1866 }
1867
1868 #[tokio::test]
1869 async fn late_watchers_resolve_immediately() {
1870 let registry = RuntimeOpsLifecycleRegistry::new();
1871 let spec = background_spec("late");
1872 let op_id = spec.id.clone();
1873 registry.register_operation(spec).unwrap();
1874 registry.provisioning_succeeded(&op_id).unwrap();
1875 registry
1876 .complete_operation(
1877 &op_id,
1878 OperationResult {
1879 id: op_id.clone(),
1880 content: "done".into(),
1881 is_error: false,
1882 duration_ms: 1,
1883 tokens_used: 0,
1884 },
1885 )
1886 .unwrap();
1887
1888 let watch = registry.register_watcher(&op_id).unwrap();
1889 match watch.wait().await {
1890 OperationTerminalOutcome::Completed(result) => assert_eq!(result.content, "done"),
1891 other => panic!("expected completed outcome, got {other:?}"),
1892 }
1893 }
1894
1895 #[test]
1896 fn peer_ready_requires_peer_expectation() {
1897 let registry = RuntimeOpsLifecycleRegistry::new();
1898 let spec = background_spec("no-peer");
1899 let op_id = spec.id.clone();
1900 registry.register_operation(spec).unwrap();
1901 registry.provisioning_succeeded(&op_id).unwrap();
1902
1903 let result = registry.peer_ready(
1904 &op_id,
1905 OperationPeerHandle {
1906 peer_name: meerkat_core::comms::PeerName::new("peer").unwrap(),
1907 trusted_peer: TrustedPeerDescriptor::test_only_unsigned_typed(
1908 "peer",
1909 PeerId::new(),
1910 "inproc://peer",
1911 )
1912 .unwrap(),
1913 },
1914 );
1915 assert!(matches!(result, Err(OpsLifecycleError::PeerNotExpected(_))));
1916 }
1917
1918 #[tokio::test]
1919 async fn multi_listener_completion() {
1920 let registry = RuntimeOpsLifecycleRegistry::new();
1921 let spec = background_spec("multi");
1922 let op_id = spec.id.clone();
1923 registry.register_operation(spec).unwrap();
1924 registry.provisioning_succeeded(&op_id).unwrap();
1925
1926 let watch1 = registry.register_watcher(&op_id).unwrap();
1927 let watch2 = registry.register_watcher(&op_id).unwrap();
1928 let watch3 = registry.register_watcher(&op_id).unwrap();
1929
1930 registry
1931 .complete_operation(
1932 &op_id,
1933 OperationResult {
1934 id: op_id.clone(),
1935 content: "multi-done".into(),
1936 is_error: false,
1937 duration_ms: 1,
1938 tokens_used: 0,
1939 },
1940 )
1941 .unwrap();
1942
1943 for watch in [watch1, watch2, watch3] {
1944 match watch.wait().await {
1945 OperationTerminalOutcome::Completed(result) => {
1946 assert_eq!(result.content, "multi-done");
1947 }
1948 other => panic!("expected completed, got {other:?}"),
1949 }
1950 }
1951 }
1952
1953 #[tokio::test]
1954 async fn wait_all_returns_all_outcomes() {
1955 let registry = RuntimeOpsLifecycleRegistry::new();
1956
1957 let spec_a = background_spec("a");
1958 let id_a = spec_a.id.clone();
1959 registry.register_operation(spec_a).unwrap();
1960 registry.provisioning_succeeded(&id_a).unwrap();
1961
1962 let spec_b = background_spec("b");
1963 let id_b = spec_b.id.clone();
1964 registry.register_operation(spec_b).unwrap();
1965 registry.provisioning_succeeded(&id_b).unwrap();
1966
1967 registry
1968 .complete_operation(
1969 &id_a,
1970 OperationResult {
1971 id: id_a.clone(),
1972 content: "a-done".into(),
1973 is_error: false,
1974 duration_ms: 1,
1975 tokens_used: 0,
1976 },
1977 )
1978 .unwrap();
1979 registry.fail_operation(&id_b, "b-error".into()).unwrap();
1980
1981 let wait_result = registry
1982 .wait_all(&test_run_id(), &[id_a.clone(), id_b.clone()])
1983 .await
1984 .unwrap();
1985 assert_eq!(wait_result.outcomes.len(), 2);
1986 assert_eq!(wait_result.outcomes[0].0, id_a);
1987 assert!(matches!(
1988 wait_result.outcomes[0].1,
1989 OperationTerminalOutcome::Completed(_)
1990 ));
1991 assert_eq!(wait_result.outcomes[1].0, id_b);
1992 assert!(matches!(
1993 wait_result.outcomes[1].1,
1994 OperationTerminalOutcome::Failed { .. }
1995 ));
1996 assert_eq!(wait_result.satisfied.operation_ids.len(), 2);
1998 assert_ne!(wait_result.satisfied.wait_request_id.to_string(), "");
1999 }
2000
2001 #[tokio::test]
2004 async fn wait_all_trait_path_submits_through_authority() {
2005 let registry = RuntimeOpsLifecycleRegistry::new();
2006 let spec = background_spec("trait-wait");
2007 let op_id = spec.id.clone();
2008 registry.register_operation(spec).unwrap();
2009 registry.provisioning_succeeded(&op_id).unwrap();
2010 registry
2011 .complete_operation(
2012 &op_id,
2013 OperationResult {
2014 id: op_id.clone(),
2015 content: "done".into(),
2016 is_error: false,
2017 duration_ms: 1,
2018 tokens_used: 0,
2019 },
2020 )
2021 .unwrap();
2022
2023 let trait_ref: &dyn OpsLifecycleRegistry = ®istry;
2025 let wait_result = trait_ref
2026 .wait_all(&test_run_id(), std::slice::from_ref(&op_id))
2027 .await
2028 .unwrap();
2029 assert_eq!(wait_result.outcomes.len(), 1);
2030 assert!(matches!(
2031 wait_result.outcomes[0].1,
2032 OperationTerminalOutcome::Completed(_)
2033 ));
2034 assert_eq!(wait_result.satisfied.operation_ids, vec![op_id]);
2036 assert_ne!(wait_result.satisfied.wait_request_id.to_string(), "");
2037 }
2038
2039 #[tokio::test]
2040 async fn wait_all_resolves_from_authority_owned_wait_request() {
2041 let registry = RuntimeOpsLifecycleRegistry::new();
2042 let run_id = test_run_id();
2043
2044 let spec = background_spec("pending");
2045 let op_id = spec.id.clone();
2046 registry.register_operation(spec).unwrap();
2047 registry.provisioning_succeeded(&op_id).unwrap();
2048
2049 let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
2050 tokio::pin!(wait_fut);
2051 assert!(
2052 tokio::time::timeout(std::time::Duration::from_millis(10), &mut wait_fut)
2053 .await
2054 .is_err()
2055 );
2056
2057 let active_wait_request_id = {
2058 let state = registry.read_state().unwrap();
2059 let wait_request_id = match state.wait_request_id.clone() {
2060 Some(wait_request_id) => wait_request_id,
2061 None => panic!("wait request should be active"),
2062 };
2063 assert_eq!(
2064 state.wait_operation_ids().as_slice(),
2065 std::slice::from_ref(&op_id)
2066 );
2067 wait_request_id
2068 };
2069
2070 registry
2071 .complete_operation(
2072 &op_id,
2073 OperationResult {
2074 id: op_id.clone(),
2075 content: "done".into(),
2076 is_error: false,
2077 duration_ms: 1,
2078 tokens_used: 0,
2079 },
2080 )
2081 .unwrap();
2082
2083 let wait_result = wait_fut.await.unwrap();
2084 assert_eq!(
2085 wait_result.satisfied.wait_request_id,
2086 active_wait_request_id
2087 );
2088 assert_eq!(wait_result.satisfied.operation_ids, vec![op_id.clone()]);
2089 assert!(matches!(
2090 wait_result.outcomes.as_slice(),
2091 [(returned_id, OperationTerminalOutcome::Completed(_))] if *returned_id == op_id
2092 ));
2093 assert!(registry.read_state().unwrap().wait_request_id.is_none());
2094 }
2095
2096 #[tokio::test]
2097 async fn dropping_wait_all_future_cancels_active_wait_request() {
2098 let registry = RuntimeOpsLifecycleRegistry::new();
2099 let run_id = test_run_id();
2100
2101 let spec = background_spec("cancelled-wait");
2102 let op_id = spec.id.clone();
2103 registry.register_operation(spec).unwrap();
2104 registry.provisioning_succeeded(&op_id).unwrap();
2105
2106 let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
2107 drop(wait_fut);
2108
2109 let state = registry.read_state().unwrap();
2110 assert!(state.wait_request_id.is_none());
2111 assert!(state.wait_operation_ids().is_empty());
2112 assert!(!state.wait_active());
2113 }
2114
2115 #[test]
2116 fn terminate_owner_only_targets_non_terminal_operations() {
2117 let registry = RuntimeOpsLifecycleRegistry::new();
2118
2119 let running_spec = background_spec("running");
2120 let running_id = running_spec.id.clone();
2121 registry.register_operation(running_spec).unwrap();
2122 registry.provisioning_succeeded(&running_id).unwrap();
2123
2124 let completed_spec = background_spec("completed");
2125 let completed_id = completed_spec.id.clone();
2126 registry.register_operation(completed_spec).unwrap();
2127 registry.provisioning_succeeded(&completed_id).unwrap();
2128 registry
2129 .complete_operation(
2130 &completed_id,
2131 OperationResult {
2132 id: completed_id.clone(),
2133 content: "done".into(),
2134 is_error: false,
2135 duration_ms: 1,
2136 tokens_used: 0,
2137 },
2138 )
2139 .unwrap();
2140
2141 registry.terminate_owner("shutdown".into()).unwrap();
2142
2143 assert!(matches!(
2144 registry.snapshot(&running_id).unwrap().status,
2145 OperationStatus::Terminated
2146 ));
2147 assert!(matches!(
2148 registry.snapshot(&completed_id).unwrap().status,
2149 OperationStatus::Completed
2150 ));
2151 }
2152
2153 #[test]
2154 fn collect_completed_drains_terminal_operations() {
2155 let registry = RuntimeOpsLifecycleRegistry::new();
2156
2157 let spec_a = background_spec("a");
2158 let id_a = spec_a.id.clone();
2159 registry.register_operation(spec_a).unwrap();
2160 registry.provisioning_succeeded(&id_a).unwrap();
2161 registry
2162 .complete_operation(
2163 &id_a,
2164 OperationResult {
2165 id: id_a.clone(),
2166 content: "done".into(),
2167 is_error: false,
2168 duration_ms: 1,
2169 tokens_used: 0,
2170 },
2171 )
2172 .unwrap();
2173
2174 let spec_b = background_spec("b");
2175 let id_b = spec_b.id.clone();
2176 registry.register_operation(spec_b).unwrap();
2177
2178 let collected = registry.collect_completed().unwrap();
2179 assert_eq!(collected.len(), 1);
2180 assert_eq!(collected[0].0, id_a);
2181
2182 assert!(registry.snapshot(&id_a).is_none());
2183 assert!(registry.snapshot(&id_b).is_some());
2184
2185 let collected2 = registry.collect_completed().unwrap();
2186 assert!(collected2.is_empty());
2187 }
2188
2189 #[test]
2190 fn bounded_completed_retention_evicts_oldest() {
2191 let registry = RuntimeOpsLifecycleRegistry::with_config(OpsLifecycleConfig {
2192 max_completed: 3,
2193 max_concurrent: None,
2194 });
2195
2196 let mut ids = Vec::new();
2197 for i in 0..5 {
2198 let spec = background_spec(&format!("op-{i}"));
2199 let id = spec.id.clone();
2200 registry.register_operation(spec).unwrap();
2201 registry.provisioning_succeeded(&id).unwrap();
2202 registry
2203 .complete_operation(
2204 &id,
2205 OperationResult {
2206 id: id.clone(),
2207 content: format!("done-{i}"),
2208 is_error: false,
2209 duration_ms: 1,
2210 tokens_used: 0,
2211 },
2212 )
2213 .unwrap();
2214 ids.push(id);
2215 }
2216
2217 assert!(registry.snapshot(&ids[0]).is_none());
2218 assert!(registry.snapshot(&ids[1]).is_none());
2219 assert!(registry.snapshot(&ids[2]).is_some());
2220 assert!(registry.snapshot(&ids[3]).is_some());
2221 assert!(registry.snapshot(&ids[4]).is_some());
2222 }
2223
2224 #[test]
2225 fn max_concurrent_enforcement() {
2226 let registry = RuntimeOpsLifecycleRegistry::with_config(OpsLifecycleConfig {
2227 max_completed: DEFAULT_MAX_COMPLETED,
2228 max_concurrent: Some(2),
2229 });
2230
2231 let spec_a = background_spec("a");
2232 let id_a = spec_a.id.clone();
2233 registry.register_operation(spec_a).unwrap();
2234
2235 let spec_b = background_spec("b");
2236 registry.register_operation(spec_b).unwrap();
2237
2238 let spec_c = background_spec("c");
2239 let result = registry.register_operation(spec_c);
2240 assert!(matches!(
2241 result,
2242 Err(OpsLifecycleError::MaxConcurrentExceeded {
2243 limit: 2,
2244 active: 2,
2245 })
2246 ));
2247
2248 registry.provisioning_succeeded(&id_a).unwrap();
2249 registry
2250 .complete_operation(
2251 &id_a,
2252 OperationResult {
2253 id: id_a.clone(),
2254 content: "done".into(),
2255 is_error: false,
2256 duration_ms: 1,
2257 tokens_used: 0,
2258 },
2259 )
2260 .unwrap();
2261
2262 let spec_d = background_spec("d");
2263 assert!(registry.register_operation(spec_d).is_ok());
2264 }
2265
2266 #[test]
2267 fn snapshot_includes_timestamps() {
2268 let registry = RuntimeOpsLifecycleRegistry::new();
2269 let spec = background_spec("timed");
2270 let op_id = spec.id.clone();
2271 registry.register_operation(spec).unwrap();
2272
2273 let snap1 = registry.snapshot(&op_id).unwrap();
2274 assert!(snap1.created_at_ms > 0);
2275 assert!(snap1.started_at_ms.is_none());
2276 assert!(snap1.completed_at_ms.is_none());
2277 assert!(snap1.elapsed_ms.is_none());
2278
2279 registry.provisioning_succeeded(&op_id).unwrap();
2280 let snap2 = registry.snapshot(&op_id).unwrap();
2281 assert!(snap2.started_at_ms.is_some());
2282 assert!(snap2.started_at_ms.unwrap() >= snap2.created_at_ms);
2283
2284 registry
2285 .complete_operation(
2286 &op_id,
2287 OperationResult {
2288 id: op_id.clone(),
2289 content: "done".into(),
2290 is_error: false,
2291 duration_ms: 1,
2292 tokens_used: 0,
2293 },
2294 )
2295 .unwrap();
2296 let snap3 = registry.snapshot(&op_id).unwrap();
2297 assert!(snap3.completed_at_ms.is_some());
2298 assert!(snap3.elapsed_ms.is_some());
2299 assert!(snap3.completed_at_ms.unwrap() >= snap3.started_at_ms.unwrap());
2300 }
2301
2302 #[test]
2303 fn snapshot_includes_peer_handle() {
2304 let registry = RuntimeOpsLifecycleRegistry::new();
2305 let spec = OperationSpec {
2306 id: OperationId::new(),
2307 kind: OperationKind::MobMemberChild,
2308 owner_session_id: SessionId::new(),
2309 display_name: "peer-test".into(),
2310 source_label: "test".into(),
2311 child_session_id: Some(SessionId::new()),
2312 expect_peer_channel: true,
2313 };
2314 let op_id = spec.id.clone();
2315 registry.register_operation(spec).unwrap();
2316 registry.provisioning_succeeded(&op_id).unwrap();
2317
2318 let snap1 = registry.snapshot(&op_id).unwrap();
2319 assert!(snap1.peer_handle.is_none());
2320
2321 let handle = OperationPeerHandle {
2322 peer_name: meerkat_core::comms::PeerName::new("member-x").unwrap(),
2323 trusted_peer: TrustedPeerDescriptor::test_only_unsigned_typed(
2324 "member-x",
2325 PeerId::new(),
2326 "inproc://x",
2327 )
2328 .unwrap(),
2329 };
2330 registry.peer_ready(&op_id, handle).unwrap();
2331
2332 let snap2 = registry.snapshot(&op_id).unwrap();
2333 assert_eq!(
2334 snap2.peer_handle.as_ref().unwrap().peer_name.as_str(),
2335 "member-x"
2336 );
2337 }
2338}