1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
18use std::future::Future;
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
21use std::task::{Context, Poll};
22
23use meerkat_core::completion_feed::{
24 CompletionBatch, CompletionEntry, CompletionFeed, CompletionSeq,
25};
26
27#[cfg(target_arch = "wasm32")]
28use crate::tokio;
29use meerkat_core::lifecycle::{RunId, WaitRequestId};
30use meerkat_core::ops_lifecycle::{
31 CompletionCursorConsumer, DEFAULT_MAX_COMPLETED, OperationCompletionWakeClass,
32 OperationCompletionWatch, OperationId, OperationKind, OperationLifecycleAction,
33 OperationLifecycleSnapshot, OperationPeerHandle, OperationProgressUpdate,
34 OperationPublicResultClass, OperationResult, OperationSource, OperationSpec, OperationStatus,
35 OperationTerminalOutcome, OpsLifecycleError, OpsLifecycleRegistry, WaitAllResult,
36 WaitAllSatisfied,
37};
38use meerkat_core::time_compat::{Instant, SystemTime, UNIX_EPOCH};
39use meerkat_core::types::SessionId;
40
41use crate::meerkat_machine::dsl as mm_dsl;
42
43fn deserialize_required_operation_source<'de, D>(
54 deserializer: D,
55) -> Result<Option<OperationSource>, D::Error>
56where
57 D: serde::Deserializer<'de>,
58{
59 <Option<OperationSource> as serde::Deserialize>::deserialize(deserializer)
60}
61
62#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
64pub struct OperationCanonicalState {
65 status: OperationStatus,
66 kind: OperationKind,
67 #[serde(deserialize_with = "deserialize_required_operation_source")]
68 operation_source: Option<OperationSource>,
69 peer_ready: bool,
70 progress_count: u32,
71 watcher_count: u32,
72 terminal_outcome: Option<OperationTerminalOutcome>,
73 #[serde(default, skip_serializing_if = "Option::is_none")]
74 completion_sequence: Option<CompletionSeq>,
75 terminal_buffered: bool,
76}
77
78#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
80pub struct CompletionFeedCanonicalState {
81 seq: CompletionSeq,
82 kind: OperationKind,
83 terminal_outcome: OperationTerminalOutcome,
84}
85
86#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
88pub struct RegistryCanonicalState {
89 operations: HashMap<OperationId, OperationCanonicalState>,
90 completion_feed_entries: HashMap<OperationId, CompletionFeedCanonicalState>,
91 completed_order: VecDeque<OperationId>,
92 max_completed: usize,
93 max_concurrent: Option<usize>,
94 active_count: usize,
95 wait_request_id: Option<WaitRequestId>,
96 wait_operation_ids: Vec<OperationId>,
97 next_completion_seq: CompletionSeq,
98}
99
100impl RegistryCanonicalState {
101 pub fn max_completed(&self) -> usize {
103 self.max_completed
104 }
105
106 pub fn max_concurrent(&self) -> Option<usize> {
108 self.max_concurrent
109 }
110
111 pub fn operation_count(&self) -> usize {
113 self.operations.len()
114 }
115
116 pub fn completion_feed_count(&self) -> usize {
118 self.completion_feed_entries.len()
119 }
120}
121
122#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
133pub struct PersistedOpsSnapshot {
134 pub epoch_id: meerkat_core::RuntimeEpochId,
136 pub authority_state: RegistryCanonicalState,
138 pub operation_specs: HashMap<OperationId, meerkat_core::ops_lifecycle::OperationSpec>,
140 pub completion_entries: Vec<CompletionEntry>,
143 pub cursors: meerkat_core::EpochCursorSnapshot,
145}
146
147#[derive(Debug)]
148pub struct OpsLifecyclePersistenceRequest {
149 snapshot: PersistedOpsSnapshot,
150 result_tx: std::sync::mpsc::SyncSender<Result<(), OpsLifecycleError>>,
151}
152
153impl OpsLifecyclePersistenceRequest {
154 pub fn snapshot(&self) -> &PersistedOpsSnapshot {
155 &self.snapshot
156 }
157
158 pub fn complete(self, result: Result<(), OpsLifecycleError>) {
159 let _ = self.result_tx.send(result);
160 }
161}
162
163#[derive(Debug)]
172struct FeedBufferInner {
173 entries: VecDeque<CompletionEntry>,
174 watermark: CompletionSeq,
175 max_retained: usize,
176}
177
178#[derive(Debug)]
183struct FeedBuffer {
184 inner: RwLock<FeedBufferInner>,
185 watermark_atomic: AtomicU64,
187 notify: tokio::sync::Notify,
189}
190
191impl FeedBuffer {
192 fn new(max_retained: usize) -> Self {
193 Self {
194 inner: RwLock::new(FeedBufferInner {
195 entries: VecDeque::new(),
196 watermark: 0,
197 max_retained,
198 }),
199 watermark_atomic: AtomicU64::new(0),
200 notify: tokio::sync::Notify::new(),
201 }
202 }
203
204 fn push(&self, entry: CompletionEntry) {
205 let mut inner = self
206 .inner
207 .write()
208 .unwrap_or_else(std::sync::PoisonError::into_inner);
209 let seq = entry.seq;
210 inner.entries.push_back(entry);
211 inner.watermark = seq;
212
213 while inner.entries.len() > inner.max_retained {
215 inner.entries.pop_front();
216 }
217
218 drop(inner);
219
220 self.watermark_atomic.store(seq, Ordering::Release);
221 self.notify.notify_waiters();
222 }
223}
224
225#[derive(Debug, Clone)]
230pub struct RuntimeCompletionFeed {
231 buffer: Arc<FeedBuffer>,
232}
233
234impl CompletionFeed for RuntimeCompletionFeed {
235 fn watermark(&self) -> CompletionSeq {
236 self.buffer.watermark_atomic.load(Ordering::Acquire)
237 }
238
239 fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch {
240 let inner = self
241 .buffer
242 .inner
243 .read()
244 .unwrap_or_else(std::sync::PoisonError::into_inner);
245 let entries: Vec<CompletionEntry> = inner
246 .entries
247 .iter()
248 .filter(|e| e.seq > after_seq)
249 .cloned()
250 .collect();
251 let watermark = inner.watermark;
252 CompletionBatch { entries, watermark }
253 }
254
255 fn wait_for_advance(
256 &self,
257 after_seq: CompletionSeq,
258 ) -> std::pin::Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>> {
259 Box::pin(async move {
260 loop {
261 let notified = self.buffer.notify.notified();
266 let current = self.buffer.watermark_atomic.load(Ordering::Acquire);
267 if current > after_seq {
268 return current;
269 }
270 notified.await;
271 }
272 })
273 }
274}
275
276#[derive(Debug)]
281struct OperationCompletionNotifier {
282 tx: tokio::sync::oneshot::Sender<OperationTerminalOutcome>,
283}
284
285impl OperationCompletionNotifier {
286 fn new(tx: tokio::sync::oneshot::Sender<OperationTerminalOutcome>) -> Self {
287 Self { tx }
288 }
289
290 fn notify_after_generated_terminal(self, outcome: &OperationTerminalOutcome) {
291 let _ = self.tx.send(outcome.clone());
292 }
293}
294
295fn operation_completion_watch_from_receiver(
296 rx: tokio::sync::oneshot::Receiver<OperationTerminalOutcome>,
297) -> OperationCompletionWatch {
298 Box::pin(async move {
299 rx.await
300 .map_err(|_| meerkat_core::ops_lifecycle::OperationCompletionWatchError::ChannelClosed)
301 })
302}
303
304fn resolved_operation_completion_watch(
305 outcome: OperationTerminalOutcome,
306) -> OperationCompletionWatch {
307 Box::pin(async move { Ok(outcome) })
308}
309
310#[derive(Debug)]
314struct ShellRecord {
315 spec: OperationSpec,
316 peer_handle: Option<OperationPeerHandle>,
317 watchers: Vec<OperationCompletionNotifier>,
321 created_at: Instant,
323 started_at: Option<Instant>,
324 completed_at: Option<Instant>,
325 created_at_wall: SystemTime,
327}
328
329#[derive(Debug)]
330struct PendingWaitState {
331 wait_request_id: WaitRequestId,
332 sender: tokio::sync::oneshot::Sender<WaitAllSatisfied>,
333}
334
335enum WaitAllAuthorityPlan {
336 AlreadySatisfied(WaitAllSatisfied),
337 ActivateBarrier,
338}
339
340#[derive(Debug, Clone, Copy, PartialEq, Eq)]
341enum RecoveredOperationRecordDisposition {
342 Retain,
343 Discard,
344}
345
346impl ShellRecord {
347 fn new(spec: OperationSpec) -> Self {
348 Self {
349 spec,
350 peer_handle: None,
351 watchers: Vec::new(),
352 created_at: Instant::now(),
353 started_at: None,
354 completed_at: None,
355 created_at_wall: SystemTime::now(),
356 }
357 }
358
359 fn epoch_millis(wall_anchor: &SystemTime) -> u64 {
360 wall_anchor
361 .duration_since(UNIX_EPOCH)
362 .map(|d| d.as_millis() as u64)
363 .unwrap_or(0)
364 }
365
366 fn epoch_millis_for_instant(&self, instant: Instant) -> u64 {
367 let offset = instant.saturating_duration_since(self.created_at);
370 let wall = self.created_at_wall + offset;
371 Self::epoch_millis(&wall)
372 }
373
374 fn notify_watchers(&mut self, outcome: &OperationTerminalOutcome) {
376 for watcher in std::mem::take(&mut self.watchers) {
377 watcher.notify_after_generated_terminal(outcome);
378 }
379 }
380
381 fn mark_completed(&mut self) {
383 self.completed_at = Some(Instant::now());
384 }
385}
386
387#[derive(Debug)]
392struct ShellState {
393 dsl: DslAuthority,
395 records: HashMap<OperationId, ShellRecord>,
397 pending_wait: Option<PendingWaitState>,
399 completed_order: VecDeque<OperationId>,
401 max_completed: usize,
403 max_concurrent: Option<usize>,
405 wait_request_id: Option<WaitRequestId>,
411 feed_buffer: Arc<FeedBuffer>,
413 persist_tx: Option<crate::tokio::sync::mpsc::UnboundedSender<OpsLifecyclePersistenceRequest>>,
415 persist_epoch_id: Option<meerkat_core::RuntimeEpochId>,
417 persist_cursor_state: Option<Arc<meerkat_core::EpochCursorState>>,
419}
420
421struct DslAuthority(Box<mm_dsl::MeerkatMachineAuthority>);
427
428impl std::fmt::Debug for DslAuthority {
429 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
430 f.debug_struct("DslAuthority")
431 .field("state", self.0.state())
432 .finish()
433 }
434}
435
436fn new_ops_dsl_authority() -> DslAuthority {
440 DslAuthority(Box::new(
441 crate::meerkat_machine::dsl_authority::new_initialized_authority(
442 "ops lifecycle DSL Initialize must be accepted",
443 ),
444 ))
445}
446
447impl ShellState {
448 fn new(max_completed: usize, max_concurrent: Option<usize>) -> Self {
449 tracing::info!("RuntimeOpsLifecycleRegistry::ShellState creating dsl");
450 let dsl = new_ops_dsl_authority();
451 tracing::info!("RuntimeOpsLifecycleRegistry::ShellState created dsl");
452 let feed_capacity = max_completed.saturating_mul(4).max(1024);
453 tracing::info!(
454 feed_capacity,
455 "RuntimeOpsLifecycleRegistry::ShellState creating feed buffer"
456 );
457 let feed_buffer = Arc::new(FeedBuffer::new(feed_capacity));
458 tracing::info!("RuntimeOpsLifecycleRegistry::ShellState created feed buffer");
459 Self {
460 dsl,
461 records: HashMap::new(),
462 pending_wait: None,
463 completed_order: VecDeque::new(),
464 max_completed,
465 max_concurrent,
466 wait_request_id: None,
467 feed_buffer,
472 persist_tx: None,
473 persist_epoch_id: None,
474 persist_cursor_state: None,
475 }
476 }
477
478 fn dsl_apply(
485 &mut self,
486 input: mm_dsl::MeerkatMachineInput,
487 context: &str,
488 ) -> Result<(), OpsLifecycleError> {
489 self.dsl_apply_raw(input).map_err(|err| {
490 OpsLifecycleError::Internal(format!("DSL rejected ops transition ({context}): {err:?}"))
491 })
492 }
493
494 fn dsl_apply_raw(
501 &mut self,
502 input: mm_dsl::MeerkatMachineInput,
503 ) -> Result<(), mm_dsl::MeerkatMachineTransitionError> {
504 mm_dsl::MeerkatMachineMutator::apply(&mut *self.dsl.0, input).map(|_transition| ())
505 }
506
507 fn dsl_apply_with_effects(
508 &mut self,
509 input: mm_dsl::MeerkatMachineInput,
510 context: &str,
511 ) -> Result<Vec<mm_dsl::MeerkatMachineEffect>, OpsLifecycleError> {
512 let transition =
513 mm_dsl::MeerkatMachineMutator::apply(&mut *self.dsl.0, input).map_err(|err| {
514 OpsLifecycleError::Internal(format!(
515 "DSL rejected ops transition ({context}): {err:?}"
516 ))
517 })?;
518 Ok(transition.into_effects())
519 }
520
521 fn checked_terminal_payload(
526 kind: mm_dsl::OperationTerminalOutcomeKind,
527 payload: &OperationTerminalOutcome,
528 authority: &str,
529 operation_id: &str,
530 ) -> Result<OperationTerminalOutcome, OpsLifecycleError> {
531 if mm_dsl::OperationTerminalOutcomeKind::from(payload) != kind {
532 return Err(OpsLifecycleError::Internal(format!(
533 "{authority} payload variant for {operation_id} does not match terminal outcome discriminant"
534 )));
535 }
536 Ok(payload.clone())
537 }
538
539 fn status(&self, id: &OperationId) -> Option<OperationStatus> {
541 let id_key = mm_dsl::OperationId::from_domain(id).0;
542 self.dsl
543 .0
544 .state()
545 .op_statuses
546 .get(&id_key)
547 .copied()
548 .map(OperationStatus::from)
549 }
550
551 fn require_status(&self, id: &OperationId) -> Result<OperationStatus, OpsLifecycleError> {
552 self.status(id).ok_or_else(|| {
553 OpsLifecycleError::Internal(format!(
554 "generated op lifecycle authority missing status for {id}"
555 ))
556 })
557 }
558
559 fn kind(&self, id: &OperationId) -> Option<OperationKind> {
561 let id_key = mm_dsl::OperationId::from_domain(id).0;
562 self.dsl
563 .0
564 .state()
565 .op_kinds
566 .get(&id_key)
567 .copied()
568 .map(OperationKind::from)
569 }
570
571 fn require_kind(&self, id: &OperationId) -> Result<OperationKind, OpsLifecycleError> {
572 self.kind(id).ok_or_else(|| {
573 OpsLifecycleError::Internal(format!(
574 "generated op lifecycle authority missing kind for {id}"
575 ))
576 })
577 }
578
579 fn operation_source(
580 &self,
581 id: &OperationId,
582 ) -> Result<Option<OperationSource>, OpsLifecycleError> {
583 let id_key = mm_dsl::OperationId::from_domain(id).0;
584 self.dsl
585 .0
586 .state()
587 .op_sources
588 .get(&id_key)
589 .map(|source| {
590 source.to_domain().map_err(|error| {
591 OpsLifecycleError::Internal(format!(
592 "generated operation source authority has invalid source for {id}: {error}"
593 ))
594 })
595 })
596 .transpose()
597 }
598
599 fn child_session_id_from_operation_source(
600 operation_source: Option<&OperationSource>,
601 ) -> Option<SessionId> {
602 match operation_source {
603 Some(OperationSource::SessionChild { session_id }) => Some(session_id.clone()),
604 Some(OperationSource::BackendPeer { .. }) | None => None,
605 }
606 }
607
608 fn align_spec_child_session_id_to_source(
609 spec: &mut OperationSpec,
610 operation_source: Option<&OperationSource>,
611 ) {
612 spec.child_session_id = Self::child_session_id_from_operation_source(operation_source);
613 }
614
615 fn peer_ready(&self, id: &OperationId) -> Option<bool> {
617 let id_key = mm_dsl::OperationId::from_domain(id).0;
618 self.dsl.0.state().op_peer_ready.get(&id_key).copied()
619 }
620
621 fn require_peer_ready(&self, id: &OperationId) -> Result<bool, OpsLifecycleError> {
622 self.peer_ready(id).ok_or_else(|| {
623 OpsLifecycleError::Internal(format!(
624 "generated op peer wiring authority missing peer-ready fact for {id}"
625 ))
626 })
627 }
628
629 fn progress_count(&self, id: &OperationId) -> Option<u32> {
631 let id_key = mm_dsl::OperationId::from_domain(id).0;
632 self.dsl
633 .0
634 .state()
635 .op_progress_counts
636 .get(&id_key)
637 .map(|v| (*v).min(u32::MAX as u64) as u32)
638 }
639
640 fn require_progress_count(&self, id: &OperationId) -> Result<u32, OpsLifecycleError> {
641 self.progress_count(id).ok_or_else(|| {
642 OpsLifecycleError::Internal(format!(
643 "generated op progress authority missing progress count for {id}"
644 ))
645 })
646 }
647
648 fn terminal_outcome(
652 &self,
653 id: &OperationId,
654 ) -> Result<Option<OperationTerminalOutcome>, OpsLifecycleError> {
655 let id_key = mm_dsl::OperationId::from_domain(id).0;
656 let state = self.dsl.0.state();
657 let status = self.status(id);
658 let terminal = match status {
659 Some(status) => Self::operation_status_is_terminal(id, status)?,
660 None => false,
661 };
662 let kind = state.op_terminal_outcomes.get(&id_key).copied();
663 let Some(kind) = kind else {
664 if terminal {
665 return Err(OpsLifecycleError::Internal(format!(
666 "generated op terminal authority missing terminal outcome for {id}"
667 )));
668 }
669 return Ok(None);
670 };
671 if !terminal {
672 return Err(OpsLifecycleError::Internal(format!(
673 "generated op terminal authority has terminal outcome for non-terminal {id}"
674 )));
675 }
676 let payload = state.op_terminal_payload.get(&id_key).ok_or_else(|| {
677 OpsLifecycleError::Internal(format!(
678 "generated op terminal authority missing terminal payload for {id}"
679 ))
680 })?;
681 Self::checked_terminal_payload(kind, payload, "generated op terminal authority", &id_key)
682 .map(Some)
683 }
684
685 fn contains(&self, id: &OperationId) -> bool {
687 let id_key = mm_dsl::OperationId::from_domain(id).0;
688 self.dsl.0.state().op_statuses.contains_key(&id_key)
689 }
690
691 fn active_count(&self) -> usize {
693 self.dsl.0.state().active_op_count as usize
694 }
695
696 fn operation_count(&self) -> usize {
698 self.dsl.0.state().op_statuses.len()
699 }
700
701 fn operation_ids(&self) -> Result<Vec<OperationId>, OpsLifecycleError> {
703 let mut ids = BTreeSet::new();
704 let state = self.dsl.0.state();
705 Self::collect_operation_id_keys(&mut ids, "op_statuses", state.op_statuses.keys())?;
706 Self::collect_operation_id_keys(&mut ids, "op_kinds", state.op_kinds.keys())?;
707 Self::collect_operation_id_keys(&mut ids, "op_sources", state.op_sources.keys())?;
708 Self::collect_operation_id_keys(&mut ids, "op_peer_ready", state.op_peer_ready.keys())?;
709 Self::collect_operation_id_keys(
710 &mut ids,
711 "op_progress_counts",
712 state.op_progress_counts.keys(),
713 )?;
714 Self::collect_operation_id_keys(
715 &mut ids,
716 "op_terminal_outcomes",
717 state.op_terminal_outcomes.keys(),
718 )?;
719 Self::collect_operation_id_keys(
720 &mut ids,
721 "op_terminal_payload",
722 state.op_terminal_payload.keys(),
723 )?;
724 Self::collect_operation_id_keys(
725 &mut ids,
726 "op_completion_seq",
727 state.op_completion_seq.keys(),
728 )?;
729 ids.extend(self.records.keys().cloned());
730 Ok(ids.into_iter().collect())
731 }
732
733 fn collect_operation_id_keys<'a, I>(
734 ids: &mut BTreeSet<OperationId>,
735 field: &str,
736 keys: I,
737 ) -> Result<(), OpsLifecycleError>
738 where
739 I: IntoIterator<Item = &'a String>,
740 {
741 for key in keys {
742 let id = serde_json::from_str::<OperationId>(key).map_err(|error| {
743 OpsLifecycleError::Internal(format!(
744 "generated operation identity authority used invalid operation id key in {field}: {key}: {error}"
745 ))
746 })?;
747 ids.insert(id);
748 }
749 Ok(())
750 }
751
752 fn has_generated_operation_record_fact(&self, id: &OperationId) -> bool {
753 let id_key = mm_dsl::OperationId::from_domain(id).0;
754 let state = self.dsl.0.state();
755 state.op_statuses.contains_key(&id_key)
756 || state.op_kinds.contains_key(&id_key)
757 || state.op_sources.contains_key(&id_key)
758 || state.op_peer_ready.contains_key(&id_key)
759 || state.op_progress_counts.contains_key(&id_key)
760 || state.op_terminal_outcomes.contains_key(&id_key)
761 || state.op_terminal_payload.contains_key(&id_key)
762 || state.op_completion_seq.contains_key(&id_key)
763 }
764
765 fn completion_sequence(&self, id: &OperationId) -> Option<CompletionSeq> {
767 let id_key = mm_dsl::OperationId::from_domain(id).0;
768 self.dsl.0.state().op_completion_seq.get(&id_key).copied()
769 }
770
771 fn completion_feed_authority_entries(
772 &self,
773 ) -> Result<HashMap<OperationId, CompletionFeedCanonicalState>, OpsLifecycleError> {
774 let state = self.dsl.0.state();
775 let sequence_keys: BTreeSet<String> =
776 state.completion_feed_sequences.keys().cloned().collect();
777 let companion_domains: [(&str, BTreeSet<String>); 3] = [
778 (
779 "completion_feed_kinds",
780 state.completion_feed_kinds.keys().cloned().collect(),
781 ),
782 (
783 "completion_feed_terminal_outcomes",
784 state
785 .completion_feed_terminal_outcomes
786 .keys()
787 .cloned()
788 .collect(),
789 ),
790 (
791 "completion_feed_terminal_payload",
792 state
793 .completion_feed_terminal_payload
794 .keys()
795 .cloned()
796 .collect(),
797 ),
798 ];
799 for (field, keys) in companion_domains {
800 if keys != sequence_keys {
801 return Err(OpsLifecycleError::Internal(format!(
802 "generated completion feed authority has mismatched {field} domain"
803 )));
804 }
805 }
806
807 let mut entries = HashMap::new();
808 for (id_key, seq) in &state.completion_feed_sequences {
809 if !state.completion_sequence_claims.contains(seq) {
810 return Err(OpsLifecycleError::Internal(format!(
811 "generated completion feed authority sequence {seq} for {id_key} is not claimed"
812 )));
813 }
814 let operation_id = serde_json::from_str::<OperationId>(id_key).map_err(|error| {
815 OpsLifecycleError::Internal(format!(
816 "generated completion feed authority used invalid operation id key {id_key}: {error}"
817 ))
818 })?;
819 let kind = state
820 .completion_feed_kinds
821 .get(id_key)
822 .copied()
823 .map(OperationKind::from)
824 .ok_or_else(|| {
825 OpsLifecycleError::Internal(format!(
826 "generated completion feed authority missing kind for {id_key}"
827 ))
828 })?;
829 let outcome_kind = state
830 .completion_feed_terminal_outcomes
831 .get(id_key)
832 .copied()
833 .ok_or_else(|| {
834 OpsLifecycleError::Internal(format!(
835 "generated completion feed authority missing terminal outcome for {id_key}"
836 ))
837 })?;
838 let payload = state
839 .completion_feed_terminal_payload
840 .get(id_key)
841 .ok_or_else(|| {
842 OpsLifecycleError::Internal(format!(
843 "generated completion feed authority missing terminal payload for {id_key}"
844 ))
845 })?;
846 let terminal_outcome = Self::checked_terminal_payload(
847 outcome_kind,
848 payload,
849 "generated completion feed authority",
850 id_key,
851 )?;
852 entries.insert(
853 operation_id,
854 CompletionFeedCanonicalState {
855 seq: *seq,
856 kind,
857 terminal_outcome,
858 },
859 );
860 }
861 Ok(entries)
862 }
863
864 fn completion_cursor(&self, consumer: CompletionCursorConsumer) -> CompletionSeq {
865 let state = self.dsl.0.state();
866 match consumer {
867 CompletionCursorConsumer::AgentApplied => state.completion_agent_applied_cursor,
868 CompletionCursorConsumer::RuntimeObserved => state.completion_runtime_observed_cursor,
869 CompletionCursorConsumer::RuntimeInjected => state.completion_runtime_injected_cursor,
870 }
871 }
872
873 fn completion_cursor_snapshot(&self) -> meerkat_core::EpochCursorSnapshot {
874 meerkat_core::EpochCursorSnapshot {
875 agent_applied_cursor: self.completion_cursor(CompletionCursorConsumer::AgentApplied),
876 runtime_observed_seq: self.completion_cursor(CompletionCursorConsumer::RuntimeObserved),
877 runtime_last_injected_seq: self
878 .completion_cursor(CompletionCursorConsumer::RuntimeInjected),
879 }
880 }
881
882 fn snapshot(
884 &self,
885 id: &OperationId,
886 ) -> Result<Option<OperationLifecycleSnapshot>, OpsLifecycleError> {
887 let Some(shell) = self.records.get(id) else {
888 if self.has_generated_operation_record_fact(id) {
889 return Err(OpsLifecycleError::Internal(format!(
890 "generated op lifecycle authority has operation facts without shell projection record for {id}"
891 )));
892 }
893 return Ok(None);
894 };
895 let kind = self.require_kind(id)?;
896 let status = self.require_status(id)?;
897 let terminal = Self::operation_status_is_terminal(id, status)?;
898 let public_result_class = Self::operation_public_result_class(id, status)?;
899 let peer_ready = self.require_peer_ready(id)?;
900 let progress_count = self.require_progress_count(id)?;
901 let operation_source = self.operation_source(id)?;
902 let terminal_outcome = self.terminal_outcome(id)?;
903
904 let created_at_ms = ShellRecord::epoch_millis(&shell.created_at_wall);
905 let started_at_ms = shell.started_at.map(|i| shell.epoch_millis_for_instant(i));
906 let completed_at_ms = shell
907 .completed_at
908 .map(|i| shell.epoch_millis_for_instant(i));
909 let elapsed_ms = shell.completed_at.map(|completed| {
910 completed
911 .saturating_duration_since(shell.created_at)
912 .as_millis() as u64
913 });
914
915 Ok(Some(OperationLifecycleSnapshot {
916 id: shell.spec.id.clone(),
917 kind,
918 display_name: shell.spec.display_name.clone(),
919 child_session_id: Self::child_session_id_from_operation_source(
920 operation_source.as_ref(),
921 ),
922 operation_source,
923 status,
924 terminal,
925 public_result_class,
926 peer_ready,
927 progress_count,
928 watcher_count: shell.watchers.len() as u32,
929 terminal_outcome,
930 peer_handle: shell.peer_handle.clone(),
931 created_at_ms,
932 started_at_ms,
933 completed_at_ms,
934 elapsed_ms,
935 }))
936 }
937
938 fn finalize_terminal(&mut self, id: &OperationId) -> Result<(), OpsLifecycleError> {
943 let outcome = self.terminal_outcome(id)?.ok_or_else(|| {
944 OpsLifecycleError::Internal(format!(
945 "generated op terminal transition did not mint terminal outcome for {id}"
946 ))
947 })?;
948 let kind = self.require_kind(id)?;
949
950 if let Some(shell) = self.records.get_mut(id) {
952 shell.notify_watchers(&outcome);
953 shell.mark_completed();
954 }
955
956 if Self::operation_durability_class(id, kind)? == mm_dsl::OperationDurabilityClass::Discard
957 {
958 self.dsl_apply(
959 mm_dsl::MeerkatMachineInput::CollectCompletedOp {
960 operation_id: mm_dsl::OperationId::from_domain(id).0,
961 },
962 "CollectCompletedOp",
963 )?;
964 self.records.remove(id);
965 self.completed_order.retain(|queued| queued != id);
966 return Ok(());
967 }
968
969 if Self::operation_completion_feed_class(id, kind)?
970 == mm_dsl::OperationCompletionFeedClass::Emit
971 {
972 let feed_authority = self
973 .completion_feed_authority_entries()?
974 .remove(id)
975 .ok_or_else(|| {
976 OpsLifecycleError::Internal(format!(
977 "generated op terminal transition did not mint completion feed authority for {id}"
978 ))
979 })?;
980 if feed_authority.kind != kind || feed_authority.terminal_outcome != outcome {
981 return Err(OpsLifecycleError::Internal(format!(
982 "generated completion feed authority drifted from terminal op authority for {id}"
983 )));
984 }
985 let seq = self.completion_sequence(id).ok_or_else(|| {
986 OpsLifecycleError::Internal(format!(
987 "generated op terminal transition did not mint completion sequence for {id}"
988 ))
989 })?;
990 if feed_authority.seq != seq {
991 return Err(OpsLifecycleError::Internal(format!(
992 "generated completion feed authority sequence drifted for {id}"
993 )));
994 }
995 let display_name = self
996 .records
997 .get(id)
998 .map(|r| r.spec.display_name.clone())
999 .unwrap_or_default();
1000 let completed_at_ms = self
1001 .records
1002 .get(id)
1003 .and_then(|r| r.completed_at.map(|i| r.epoch_millis_for_instant(i)));
1004 self.feed_buffer.push(CompletionEntry {
1005 seq: feed_authority.seq,
1006 operation_id: id.clone(),
1007 kind: feed_authority.kind,
1008 display_name,
1009 terminal_outcome: feed_authority.terminal_outcome,
1010 completed_at_ms,
1011 });
1012 }
1013
1014 self.completed_order.push_back(id.clone());
1016 while self.completed_order.len() > self.max_completed {
1017 if let Some(evicted) = self.completed_order.pop_front() {
1018 self.dsl_apply(
1019 mm_dsl::MeerkatMachineInput::EvictCompletedOp {
1020 operation_id: mm_dsl::OperationId::from_domain(&evicted).0,
1021 },
1022 "EvictCompletedOp",
1023 )?;
1024 self.records.remove(&evicted);
1025 }
1026 }
1027
1028 self.maybe_satisfy_wait()?;
1033 Ok(())
1034 }
1035
1036 fn wait_operation_ids(&self) -> Result<Vec<OperationId>, OpsLifecycleError> {
1038 self.dsl
1039 .0
1040 .state()
1041 .wait_operation_ids
1042 .iter()
1043 .map(|key| {
1044 serde_json::from_str::<OperationId>(key).map_err(|error| {
1045 OpsLifecycleError::Internal(format!(
1046 "generated wait operation identity authority used invalid operation id key {key}: {error}"
1047 ))
1048 })
1049 })
1050 .collect()
1051 }
1052
1053 #[cfg(test)]
1055 fn wait_active(&self) -> bool {
1056 self.dsl.0.state().wait_active
1057 }
1058
1059 fn wait_all_satisfied_from_effects(
1060 effects: &[mm_dsl::MeerkatMachineEffect],
1061 ) -> Result<Option<WaitAllSatisfied>, OpsLifecycleError> {
1062 let mut satisfied = None;
1063 for effect in effects {
1064 let mm_dsl::MeerkatMachineEffect::WaitAllSatisfied {
1065 wait_request_id,
1066 run_id,
1067 operation_ids,
1068 } = effect
1069 else {
1070 continue;
1071 };
1072 if satisfied.is_some() {
1073 return Err(OpsLifecycleError::Internal(
1074 "generated wait_all authority emitted multiple satisfaction effects".into(),
1075 ));
1076 }
1077 let wait_uuid = uuid::Uuid::parse_str(&wait_request_id.0).map_err(|err| {
1078 OpsLifecycleError::Internal(format!(
1079 "generated wait_all authority emitted invalid wait request id '{}': {err}",
1080 wait_request_id.0
1081 ))
1082 })?;
1083 let mut ids = Vec::with_capacity(operation_ids.len());
1084 for operation_id in operation_ids {
1085 ids.push(
1086 serde_json::from_str::<OperationId>(&operation_id.0).map_err(|err| {
1087 OpsLifecycleError::Internal(format!(
1088 "generated wait_all authority emitted invalid operation id '{}': {err}",
1089 operation_id.0
1090 ))
1091 })?,
1092 );
1093 }
1094 satisfied = Some(WaitAllSatisfied {
1095 wait_request_id: WaitRequestId::from_uuid(wait_uuid),
1096 run_id: RunId::from_uuid(uuid::Uuid::parse_str(&run_id.0).map_err(|err| {
1097 OpsLifecycleError::Internal(format!(
1098 "generated wait_all authority emitted invalid run id '{}': {err}",
1099 run_id.0
1100 ))
1101 })?),
1102 operation_ids: ids,
1103 });
1104 }
1105 Ok(satisfied)
1106 }
1107
1108 fn parse_wait_all_operation_id(
1109 raw: &str,
1110 context: &str,
1111 ) -> Result<OperationId, OpsLifecycleError> {
1112 serde_json::from_str::<OperationId>(raw).map_err(|err| {
1113 OpsLifecycleError::Internal(format!(
1114 "generated wait_all authority emitted invalid {context} operation id '{raw}': {err}"
1115 ))
1116 })
1117 }
1118
1119 fn duplicate_wait_operation_id(operation_ids: &[OperationId]) -> Option<OperationId> {
1120 let mut seen = HashSet::new();
1121 operation_ids
1122 .iter()
1123 .find(|operation_id| !seen.insert((*operation_id).clone()))
1124 .cloned()
1125 }
1126
1127 fn wait_all_admission_error_from_effects(
1128 wait_request_id: &WaitRequestId,
1129 effects: &[mm_dsl::MeerkatMachineEffect],
1130 ) -> Result<Option<OpsLifecycleError>, OpsLifecycleError> {
1131 let mut admission = None;
1132 for effect in effects {
1133 let mm_dsl::MeerkatMachineEffect::WaitAllAdmissionResolved {
1134 wait_request_id: resolved_wait_request_id,
1135 result,
1136 reject_reason,
1137 rejected_operation_id,
1138 } = effect
1139 else {
1140 continue;
1141 };
1142 if admission.is_some() {
1143 return Err(OpsLifecycleError::Internal(
1144 "generated wait_all authority emitted multiple admission results".into(),
1145 ));
1146 }
1147 let resolved_uuid =
1148 uuid::Uuid::parse_str(&resolved_wait_request_id.0).map_err(|err| {
1149 OpsLifecycleError::Internal(format!(
1150 "generated wait_all authority emitted invalid wait request id '{}': {err}",
1151 resolved_wait_request_id.0
1152 ))
1153 })?;
1154 let resolved_wait_request_id = WaitRequestId::from_uuid(resolved_uuid);
1155 if &resolved_wait_request_id != wait_request_id {
1156 return Err(OpsLifecycleError::Internal(format!(
1157 "generated wait_all authority resolved wait request {resolved_wait_request_id} while shell requested {wait_request_id}"
1158 )));
1159 }
1160 admission = Some(match result {
1161 mm_dsl::WaitAllAdmissionResultKind::Accept => {
1162 if reject_reason.is_some() || rejected_operation_id.is_some() {
1163 return Err(OpsLifecycleError::Internal(
1164 "generated wait_all authority accepted with rejection payload".into(),
1165 ));
1166 }
1167 None
1168 }
1169 mm_dsl::WaitAllAdmissionResultKind::Reject => {
1170 let reason = reject_reason.ok_or_else(|| {
1171 OpsLifecycleError::Internal(
1172 "generated wait_all authority rejected without reason".into(),
1173 )
1174 })?;
1175 let error = match reason {
1176 mm_dsl::WaitAllRejectReasonKind::DuplicateOperation => {
1177 let raw = rejected_operation_id.as_deref().ok_or_else(|| {
1178 OpsLifecycleError::Internal(
1179 "generated wait_all authority rejected duplicate without operation id"
1180 .into(),
1181 )
1182 })?;
1183 OpsLifecycleError::DuplicateWaitOperation(
1184 Self::parse_wait_all_operation_id(raw, "duplicate")?,
1185 )
1186 }
1187 mm_dsl::WaitAllRejectReasonKind::WaitAlreadyActive => {
1188 if rejected_operation_id.is_some() {
1189 return Err(OpsLifecycleError::Internal(
1190 "generated wait_all authority rejected active wait with operation id"
1191 .into(),
1192 ));
1193 }
1194 OpsLifecycleError::WaitAlreadyActive
1195 }
1196 mm_dsl::WaitAllRejectReasonKind::OperationNotFound => {
1197 let raw = rejected_operation_id.as_deref().ok_or_else(|| {
1198 OpsLifecycleError::Internal(
1199 "generated wait_all authority rejected missing operation without operation id"
1200 .into(),
1201 )
1202 })?;
1203 OpsLifecycleError::NotFound(Self::parse_wait_all_operation_id(
1204 raw, "missing",
1205 )?)
1206 }
1207 };
1208 Some(error)
1209 }
1210 });
1211 }
1212 admission.ok_or_else(|| {
1213 OpsLifecycleError::Internal(
1214 "generated wait_all authority emitted no admission result".into(),
1215 )
1216 })
1217 }
1218
1219 fn resolve_wait_all_admission(
1220 &mut self,
1221 wait_request_id: &WaitRequestId,
1222 operation_ids: &[OperationId],
1223 dsl_ids: &BTreeSet<String>,
1224 dsl_id_tokens: &BTreeSet<mm_dsl::OperationId>,
1225 operation_token_by_id: &BTreeMap<String, mm_dsl::OperationId>,
1226 operation_id_by_token: &BTreeMap<mm_dsl::OperationId, String>,
1227 ) -> Result<(), OpsLifecycleError> {
1228 let duplicate = Self::duplicate_wait_operation_id(operation_ids)
1229 .map(|operation_id| mm_dsl::OperationId::from_domain(&operation_id).0);
1230 let not_found = operation_ids
1231 .iter()
1232 .find(|operation_id| !self.contains(operation_id))
1233 .map(|operation_id| mm_dsl::OperationId::from_domain(operation_id).0);
1234 let dsl_id_sequence: Vec<String> = operation_ids
1235 .iter()
1236 .map(|id| mm_dsl::OperationId::from_domain(id).0)
1237 .collect();
1238 let effects = self.dsl_apply_with_effects(
1239 mm_dsl::MeerkatMachineInput::ResolveWaitAllAdmission {
1240 wait_request_id: mm_dsl::WaitRequestId::from_domain(wait_request_id),
1241 operation_id_sequence: dsl_id_sequence,
1242 operation_ids: dsl_ids.clone(),
1243 operation_id_tokens: dsl_id_tokens.clone(),
1244 operation_token_by_id: operation_token_by_id.clone(),
1245 operation_id_by_token: operation_id_by_token.clone(),
1246 duplicate_operation_id: duplicate,
1247 not_found_operation_id: not_found,
1248 },
1249 "ResolveWaitAllAdmission",
1250 )?;
1251 if let Some(error) = Self::wait_all_admission_error_from_effects(wait_request_id, &effects)?
1252 {
1253 return Err(error);
1254 }
1255 Ok(())
1256 }
1257
1258 fn try_satisfy_wait_all_authority(
1259 &mut self,
1260 ) -> Result<Option<WaitAllSatisfied>, OpsLifecycleError> {
1261 let Some(dsl_wait_request_id) = self.dsl.0.state().wait_request_id.clone() else {
1262 return Ok(None);
1263 };
1264 let Some(dsl_run_id) = self.dsl.0.state().wait_run_id.clone() else {
1265 return Err(OpsLifecycleError::Internal(
1266 "generated wait_all authority has active wait without run id".into(),
1267 ));
1268 };
1269 let dsl_operation_id_tokens = self.dsl.0.state().wait_operation_id_tokens.clone();
1270 let transition = match mm_dsl::MeerkatMachineMutator::apply(
1271 &mut *self.dsl.0,
1272 mm_dsl::MeerkatMachineInput::SatisfyWaitAll {
1273 wait_request_id: dsl_wait_request_id,
1274 run_id: dsl_run_id,
1275 operation_id_tokens: dsl_operation_id_tokens,
1276 },
1277 ) {
1278 Ok(transition) => transition,
1279 Err(mm_dsl::MeerkatMachineTransitionError::GuardRejected { .. }) => return Ok(None),
1280 Err(err) => {
1281 return Err(OpsLifecycleError::Internal(format!(
1282 "DSL rejected ops transition (SatisfyWaitAll): {err:?}"
1283 )));
1284 }
1285 };
1286 Self::wait_all_satisfied_from_effects(transition.effects())?
1287 .ok_or_else(|| {
1288 OpsLifecycleError::Internal(
1289 "generated wait_all authority accepted satisfaction without effect".into(),
1290 )
1291 })
1292 .map(Some)
1293 }
1294
1295 fn begin_wait_all_authority(
1296 &mut self,
1297 run_id: &RunId,
1298 wait_request_id: &WaitRequestId,
1299 operation_ids: &[OperationId],
1300 ) -> Result<WaitAllAuthorityPlan, OpsLifecycleError> {
1301 let mut dsl_ids = BTreeSet::new();
1302 let mut dsl_id_tokens = BTreeSet::new();
1303 let mut operation_token_by_id = BTreeMap::new();
1304 let mut operation_id_by_token = BTreeMap::new();
1305 for id in operation_ids {
1306 let token = mm_dsl::OperationId::from_domain(id);
1307 let raw_id = token.0.clone();
1308 dsl_ids.insert(raw_id.clone());
1309 dsl_id_tokens.insert(token.clone());
1310 operation_token_by_id.insert(raw_id.clone(), token.clone());
1311 operation_id_by_token.insert(token, raw_id);
1312 }
1313 self.resolve_wait_all_admission(
1314 wait_request_id,
1315 operation_ids,
1316 &dsl_ids,
1317 &dsl_id_tokens,
1318 &operation_token_by_id,
1319 &operation_id_by_token,
1320 )?;
1321 self.dsl_apply(
1322 mm_dsl::MeerkatMachineInput::RequestWaitAll {
1323 run_id: mm_dsl::RunId::from_domain(run_id),
1324 wait_request_id: mm_dsl::WaitRequestId::from_domain(wait_request_id),
1325 operation_id_sequence: operation_ids
1326 .iter()
1327 .map(|id| mm_dsl::OperationId::from_domain(id).0)
1328 .collect(),
1329 operation_ids: dsl_ids,
1330 operation_id_tokens: dsl_id_tokens,
1331 operation_token_by_id,
1332 operation_id_by_token,
1333 },
1334 "RequestWaitAll",
1335 )?;
1336 if let Some(satisfied) = self.try_satisfy_wait_all_authority()? {
1337 return Ok(WaitAllAuthorityPlan::AlreadySatisfied(satisfied));
1338 }
1339 Ok(WaitAllAuthorityPlan::ActivateBarrier)
1340 }
1341
1342 fn owner_termination_targets(
1343 &self,
1344 ) -> Result<Vec<(OperationId, OperationStatus)>, OpsLifecycleError> {
1345 let mut targets = Vec::new();
1346 for id in self.operation_ids()? {
1347 let status = self.require_status(&id)?;
1348 if !Self::operation_status_is_terminal(&id, status)? {
1349 targets.push((id, status));
1350 }
1351 }
1352 Ok(targets)
1353 }
1354
1355 fn operation_status_is_terminal(
1356 operation_id: &OperationId,
1357 status: OperationStatus,
1358 ) -> Result<bool, OpsLifecycleError> {
1359 let operation_id_key = mm_dsl::OperationId::from_domain(operation_id).0;
1360 let effects = Self::apply_stateless_classifier(
1361 mm_dsl::MeerkatMachineInput::ClassifyOperationTerminality {
1362 operation_id: operation_id_key.clone(),
1363 status: mm_dsl::OperationStatus::from(status),
1364 },
1365 "ClassifyOperationTerminality",
1366 )?;
1367 let mut terminal = None;
1368 for effect in effects {
1369 match effect {
1370 mm_dsl::MeerkatMachineEffect::OperationTerminal { operation_id }
1371 if operation_id == operation_id_key =>
1372 {
1373 terminal = Some(true);
1374 }
1375 mm_dsl::MeerkatMachineEffect::OperationNonTerminal { operation_id }
1376 if operation_id == operation_id_key =>
1377 {
1378 terminal = Some(false);
1379 }
1380 other => {
1381 return Err(OpsLifecycleError::Internal(format!(
1382 "unexpected generated operation terminality effect: {other:?}"
1383 )));
1384 }
1385 }
1386 }
1387 terminal.ok_or_else(|| {
1388 OpsLifecycleError::Internal(format!(
1389 "generated operation terminality authority emitted no effect for {operation_id}"
1390 ))
1391 })
1392 }
1393
1394 fn operation_public_result_class(
1395 operation_id: &OperationId,
1396 status: OperationStatus,
1397 ) -> Result<OperationPublicResultClass, OpsLifecycleError> {
1398 let operation_id_key = mm_dsl::OperationId::from_domain(operation_id).0;
1399 let effects = Self::apply_stateless_classifier(
1400 mm_dsl::MeerkatMachineInput::ClassifyOperationPublicResult {
1401 operation_id: operation_id_key.clone(),
1402 status: mm_dsl::OperationStatus::from(status),
1403 },
1404 "ClassifyOperationPublicResult",
1405 )?;
1406 let mut result = None;
1407 for effect in effects {
1408 match effect {
1409 mm_dsl::MeerkatMachineEffect::OperationPublicResultClassified {
1410 operation_id,
1411 result: classified,
1412 } if operation_id == operation_id_key => {
1413 result = Some(OperationPublicResultClass::from(classified));
1414 }
1415 other => {
1416 return Err(OpsLifecycleError::Internal(format!(
1417 "unexpected generated operation public-result effect: {other:?}"
1418 )));
1419 }
1420 }
1421 }
1422 result.ok_or_else(|| {
1423 OpsLifecycleError::Internal(format!(
1424 "generated operation public-result authority emitted no effect for {operation_id}"
1425 ))
1426 })
1427 }
1428
1429 fn operation_transition_rejection_is_idempotent(
1430 operation_id: &OperationId,
1431 action: OperationLifecycleAction,
1432 status: OperationStatus,
1433 ) -> Result<bool, OpsLifecycleError> {
1434 let operation_id_key = mm_dsl::OperationId::from_domain(operation_id).0;
1435 let action = mm_dsl::OpLifecycleActionKind::from(action);
1436 let status = mm_dsl::OperationStatus::from(status);
1437 let effects = Self::apply_stateless_classifier(
1438 mm_dsl::MeerkatMachineInput::ClassifyOperationTransitionIdempotence {
1439 operation_id: operation_id_key.clone(),
1440 action,
1441 status,
1442 },
1443 "ClassifyOperationTransitionIdempotence",
1444 )?;
1445 let mut idempotent = None;
1446 for effect in effects {
1447 match effect {
1448 mm_dsl::MeerkatMachineEffect::OperationTransitionIdempotentSuccess {
1449 operation_id,
1450 action: effect_action,
1451 status: effect_status,
1452 } if operation_id == operation_id_key
1453 && effect_action == action
1454 && effect_status == status =>
1455 {
1456 idempotent = Some(true);
1457 }
1458 mm_dsl::MeerkatMachineEffect::OperationTransitionNotIdempotent {
1459 operation_id,
1460 action: effect_action,
1461 status: effect_status,
1462 } if operation_id == operation_id_key
1463 && effect_action == action
1464 && effect_status == status =>
1465 {
1466 idempotent = Some(false);
1467 }
1468 other => {
1469 return Err(OpsLifecycleError::Internal(format!(
1470 "unexpected generated operation transition-idempotence effect: {other:?}"
1471 )));
1472 }
1473 }
1474 }
1475 idempotent.ok_or_else(|| {
1476 OpsLifecycleError::Internal(format!(
1477 "generated operation transition-idempotence authority emitted no effect for {operation_id}"
1478 ))
1479 })
1480 }
1481
1482 fn operation_completion_feed_class(
1483 operation_id: &OperationId,
1484 kind: OperationKind,
1485 ) -> Result<mm_dsl::OperationCompletionFeedClass, OpsLifecycleError> {
1486 let operation_id_key = mm_dsl::OperationId::from_domain(operation_id).0;
1487 let kind = mm_dsl::OperationKind::from(kind);
1488 let effects = Self::apply_stateless_classifier(
1489 mm_dsl::MeerkatMachineInput::ClassifyOperationCompletionFeed {
1490 operation_id: operation_id_key.clone(),
1491 kind,
1492 },
1493 "ClassifyOperationCompletionFeed",
1494 )?;
1495 let mut class = None;
1496 for effect in effects {
1497 match effect {
1498 mm_dsl::MeerkatMachineEffect::OperationCompletionFeedClassified {
1499 operation_id,
1500 result,
1501 } if operation_id == operation_id_key => {
1502 class = Some(result);
1503 }
1504 other => {
1505 return Err(OpsLifecycleError::Internal(format!(
1506 "unexpected generated operation completion-feed effect: {other:?}"
1507 )));
1508 }
1509 }
1510 }
1511 class.ok_or_else(|| {
1512 OpsLifecycleError::Internal(format!(
1513 "generated operation completion-feed authority emitted no effect for {operation_id}"
1514 ))
1515 })
1516 }
1517
1518 fn operation_completion_wake_class(
1519 operation_id: &OperationId,
1520 kind: OperationKind,
1521 ) -> Result<OperationCompletionWakeClass, OpsLifecycleError> {
1522 let operation_id_key = mm_dsl::OperationId::from_domain(operation_id).0;
1523 let kind = mm_dsl::OperationKind::from(kind);
1524 let effects = Self::apply_stateless_classifier(
1525 mm_dsl::MeerkatMachineInput::ClassifyOperationCompletionWake {
1526 operation_id: operation_id_key.clone(),
1527 kind,
1528 },
1529 "ClassifyOperationCompletionWake",
1530 )?;
1531 let mut class = None;
1532 for effect in effects {
1533 match effect {
1534 mm_dsl::MeerkatMachineEffect::OperationCompletionWakeClassified {
1535 operation_id,
1536 result,
1537 } if operation_id == operation_id_key => {
1538 class = Some(OperationCompletionWakeClass::from(result));
1539 }
1540 other => {
1541 return Err(OpsLifecycleError::Internal(format!(
1542 "unexpected generated operation completion-wake effect: {other:?}"
1543 )));
1544 }
1545 }
1546 }
1547 class.ok_or_else(|| {
1548 OpsLifecycleError::Internal(format!(
1549 "generated operation completion-wake authority emitted no effect for {operation_id}"
1550 ))
1551 })
1552 }
1553
1554 fn operation_durability_class(
1555 operation_id: &OperationId,
1556 kind: OperationKind,
1557 ) -> Result<mm_dsl::OperationDurabilityClass, OpsLifecycleError> {
1558 let operation_id_key = mm_dsl::OperationId::from_domain(operation_id).0;
1559 let kind = mm_dsl::OperationKind::from(kind);
1560 let effects = Self::apply_stateless_classifier(
1561 mm_dsl::MeerkatMachineInput::ClassifyOperationDurability {
1562 operation_id: operation_id_key.clone(),
1563 kind,
1564 },
1565 "ClassifyOperationDurability",
1566 )?;
1567 let mut class = None;
1568 for effect in effects {
1569 match effect {
1570 mm_dsl::MeerkatMachineEffect::OperationDurabilityClassified {
1571 operation_id,
1572 result,
1573 } if operation_id == operation_id_key => {
1574 class = Some(result);
1575 }
1576 other => {
1577 return Err(OpsLifecycleError::Internal(format!(
1578 "unexpected generated operation durability effect: {other:?}"
1579 )));
1580 }
1581 }
1582 }
1583 class.ok_or_else(|| {
1584 OpsLifecycleError::Internal(format!(
1585 "generated operation durability authority emitted no effect for {operation_id}"
1586 ))
1587 })
1588 }
1589
1590 fn recovered_operation_record_disposition(
1591 operation_id: &OperationId,
1592 status: OperationStatus,
1593 kind: OperationKind,
1594 terminal_outcome_present: bool,
1595 terminal_payload_present: bool,
1596 completion_sequence_present: bool,
1597 ) -> Result<RecoveredOperationRecordDisposition, OpsLifecycleError> {
1598 let operation_id_key = mm_dsl::OperationId::from_domain(operation_id).0;
1599 let effects = Self::apply_stateless_classifier(
1600 mm_dsl::MeerkatMachineInput::ClassifyRecoveredOperationRecord {
1601 operation_id: operation_id_key.clone(),
1602 status: mm_dsl::OperationStatus::from(status),
1603 kind: mm_dsl::OperationKind::from(kind),
1604 terminal_outcome_present,
1605 terminal_payload_present,
1606 completion_sequence_present,
1607 },
1608 "ClassifyRecoveredOperationRecord",
1609 )?;
1610 let mut disposition = None;
1611 for effect in effects {
1612 match effect {
1613 mm_dsl::MeerkatMachineEffect::RetainTerminalRecord { operation_id }
1614 if operation_id == operation_id_key =>
1615 {
1616 disposition = Some(RecoveredOperationRecordDisposition::Retain);
1617 }
1618 mm_dsl::MeerkatMachineEffect::DiscardRecoveredOperationRecord { operation_id }
1619 if operation_id == operation_id_key =>
1620 {
1621 disposition = Some(RecoveredOperationRecordDisposition::Discard);
1622 }
1623 other => {
1624 return Err(OpsLifecycleError::Internal(format!(
1625 "unexpected generated recovered-operation classification effect: {other:?}"
1626 )));
1627 }
1628 }
1629 }
1630 disposition.ok_or_else(|| {
1631 OpsLifecycleError::Internal(format!(
1632 "generated recovered-operation classifier emitted no effect for {operation_id}"
1633 ))
1634 })
1635 }
1636
1637 fn apply_stateless_classifier(
1638 input: mm_dsl::MeerkatMachineInput,
1639 label: &'static str,
1640 ) -> Result<Vec<mm_dsl::MeerkatMachineEffect>, OpsLifecycleError> {
1641 let mut authority = crate::meerkat_machine::dsl_authority::new_initialized_authority(
1642 "ops stateless classifier Initialize must be accepted",
1643 );
1644 let transition =
1645 mm_dsl::MeerkatMachineMutator::apply(&mut authority, input).map_err(|err| {
1646 OpsLifecycleError::Internal(format!(
1647 "DSL rejected ops transition ({label}): {err:?}"
1648 ))
1649 })?;
1650 Ok(transition.into_effects())
1651 }
1652
1653 fn maybe_satisfy_wait(&mut self) -> Result<(), OpsLifecycleError> {
1677 let satisfied = match self.try_satisfy_wait_all_authority() {
1678 Ok(Some(satisfied)) => satisfied,
1679 Ok(None) => return Ok(()),
1680 Err(err) => {
1681 if let Some(pending) = self.pending_wait.take() {
1689 drop(pending.sender);
1690 }
1691 self.wait_request_id = None;
1692 return Err(err);
1693 }
1694 };
1695 let shell_wait_id = self.wait_request_id.take();
1696 if shell_wait_id
1697 .as_ref()
1698 .is_some_and(|id| id != &satisfied.wait_request_id)
1699 {
1700 tracing::error!(
1701 shell_wait_request_id = ?shell_wait_id,
1702 authority_wait_request_id = %satisfied.wait_request_id,
1703 "generated wait_all authority satisfied a different wait request"
1704 );
1705 }
1706 if let Some(pending) = self.pending_wait.take() {
1707 if pending.wait_request_id == satisfied.wait_request_id {
1708 let _ = pending.sender.send(satisfied);
1709 } else if let Some(shell_wait_id) = shell_wait_id {
1710 tracing::error!(
1711 shell_wait_request_id = %shell_wait_id,
1712 pending_wait_request_id = %pending.wait_request_id,
1713 authority_wait_request_id = %satisfied.wait_request_id,
1714 "generated wait_all authority satisfied without a matching pending waiter"
1715 );
1716 }
1717 }
1718 Ok(())
1719 }
1720
1721 fn maybe_persist(&self) -> Result<(), OpsLifecycleError> {
1727 let (tx, epoch_id, cursor_state) = match (
1728 &self.persist_tx,
1729 &self.persist_epoch_id,
1730 &self.persist_cursor_state,
1731 ) {
1732 (Some(tx), Some(epoch_id), Some(cs)) => (tx, epoch_id, cs),
1733 _ => return Ok(()),
1734 };
1735
1736 let snapshot = self.capture_snapshot(epoch_id.clone(), cursor_state)?;
1737 let (result_tx, result_rx) = std::sync::mpsc::sync_channel(1);
1738 let request = OpsLifecyclePersistenceRequest {
1739 snapshot,
1740 result_tx,
1741 };
1742
1743 tx.send(request).map_err(|_| {
1744 OpsLifecycleError::Internal(
1745 "ops lifecycle persistence channel closed before terminal snapshot could be queued"
1746 .into(),
1747 )
1748 })?;
1749 result_rx.recv().map_err(|_| {
1750 OpsLifecycleError::Internal(
1751 "ops lifecycle persistence worker dropped terminal snapshot before confirming durability"
1752 .into(),
1753 )
1754 })?
1755 }
1756
1757 fn capture_snapshot(
1759 &self,
1760 epoch_id: meerkat_core::RuntimeEpochId,
1761 _cursor_state: &meerkat_core::EpochCursorState,
1762 ) -> Result<PersistedOpsSnapshot, OpsLifecycleError> {
1763 let mut operations: HashMap<OperationId, OperationCanonicalState> = HashMap::new();
1764 for op_id in self.operation_ids()? {
1765 let status = self.require_status(&op_id)?;
1766 let kind = self.require_kind(&op_id)?;
1767 if Self::operation_durability_class(&op_id, kind)?
1768 != mm_dsl::OperationDurabilityClass::Retain
1769 {
1770 continue;
1771 }
1772 let peer_ready = self.require_peer_ready(&op_id)?;
1773 let progress_count = self.require_progress_count(&op_id)?;
1774 let operation_source = self.operation_source(&op_id)?;
1775 let terminal_outcome = self.terminal_outcome(&op_id)?;
1776 let completion_sequence = self.completion_sequence(&op_id);
1777 if terminal_outcome.is_some() && completion_sequence.is_none() {
1778 return Err(OpsLifecycleError::Internal(format!(
1779 "generated op terminal authority missing completion sequence for retained terminal {op_id}"
1780 )));
1781 }
1782 if terminal_outcome.is_none() && completion_sequence.is_some() {
1783 return Err(OpsLifecycleError::Internal(format!(
1784 "generated op terminal authority has completion sequence for non-terminal {op_id}"
1785 )));
1786 }
1787 let terminal_buffered = terminal_outcome.is_some();
1788 let watcher_count = self
1789 .records
1790 .get(&op_id)
1791 .map(|r| r.watchers.len() as u32)
1792 .unwrap_or(0);
1793 operations.insert(
1794 op_id,
1795 OperationCanonicalState {
1796 status,
1797 kind,
1798 operation_source,
1799 peer_ready,
1800 progress_count,
1801 watcher_count,
1802 terminal_outcome,
1803 completion_sequence,
1804 terminal_buffered,
1805 },
1806 );
1807 }
1808 let operation_specs: HashMap<OperationId, OperationSpec> = self
1809 .records
1810 .iter()
1811 .filter(|(id, _)| operations.contains_key(*id))
1812 .map(|(id, record)| {
1813 let mut spec = record.spec.clone();
1814 let operation_source = operations
1815 .get(id)
1816 .and_then(|state| state.operation_source.as_ref());
1817 Self::align_spec_child_session_id_to_source(&mut spec, operation_source);
1818 (id.clone(), spec)
1819 })
1820 .collect();
1821 let completed_order: VecDeque<OperationId> = self
1822 .completed_order
1823 .iter()
1824 .filter(|id| operations.contains_key(*id))
1825 .cloned()
1826 .collect();
1827 let active_count = operations
1828 .iter()
1829 .filter(|(id, state)| {
1830 matches!(
1831 Self::operation_status_is_terminal(id, state.status),
1832 Ok(false)
1833 )
1834 })
1835 .count();
1836 let completion_entries = {
1837 let inner = self
1838 .feed_buffer
1839 .inner
1840 .read()
1841 .unwrap_or_else(std::sync::PoisonError::into_inner);
1842 inner.entries.iter().cloned().collect()
1843 };
1844
1845 let authority_state = RegistryCanonicalState {
1846 operations,
1847 completion_feed_entries: self.completion_feed_authority_entries()?,
1848 completed_order,
1849 max_completed: self.max_completed,
1850 max_concurrent: self.max_concurrent,
1851 active_count,
1852 wait_request_id: self.wait_request_id.clone(),
1853 wait_operation_ids: self.wait_operation_ids()?,
1854 next_completion_seq: self.dsl.0.state().next_completion_seq,
1855 };
1856
1857 Ok(PersistedOpsSnapshot {
1858 epoch_id,
1859 authority_state,
1860 operation_specs,
1861 completion_entries,
1862 cursors: self.completion_cursor_snapshot(),
1863 })
1864 }
1865
1866 fn shell_record_mut(
1867 &mut self,
1868 id: &OperationId,
1869 ) -> Result<&mut ShellRecord, OpsLifecycleError> {
1870 self.records
1871 .get_mut(id)
1872 .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))
1873 }
1874
1875 fn collect_wait_outcomes(
1876 &self,
1877 operation_ids: &[OperationId],
1878 ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
1879 operation_ids
1880 .iter()
1881 .map(|operation_id| {
1882 let outcome = self.terminal_outcome(operation_id)?.ok_or_else(|| {
1883 OpsLifecycleError::Internal(format!(
1884 "wait_all completed without terminal outcome for {operation_id}"
1885 ))
1886 })?;
1887 Ok((operation_id.clone(), outcome))
1888 })
1889 .collect()
1890 }
1891}
1892
1893impl Default for ShellState {
1894 fn default() -> Self {
1895 Self::new(DEFAULT_MAX_COMPLETED, None)
1896 }
1897}
1898
1899#[derive(Debug, Clone)]
1905pub struct OpsLifecycleConfig {
1906 pub max_completed: usize,
1908 pub max_concurrent: Option<usize>,
1910}
1911
1912impl Default for OpsLifecycleConfig {
1913 fn default() -> Self {
1914 Self {
1915 max_completed: DEFAULT_MAX_COMPLETED,
1916 max_concurrent: None,
1917 }
1918 }
1919}
1920
1921#[derive(Debug)]
1928pub struct RuntimeOpsLifecycleRegistry {
1929 state: RwLock<ShellState>,
1930}
1931
1932#[derive(Debug, Clone)]
1933pub(crate) struct RuntimeOpsDiagnosticSnapshot {
1934 pub operation_count: usize,
1935 pub active_count: usize,
1936 pub wait_request_id: Option<WaitRequestId>,
1937 pub pending_wait_present: bool,
1938 pub pending_wait_request_id: Option<WaitRequestId>,
1939 pub wait_operation_ids: Vec<OperationId>,
1940 pub operations: Vec<OperationLifecycleSnapshot>,
1941}
1942
1943impl Default for RuntimeOpsLifecycleRegistry {
1944 fn default() -> Self {
1945 Self {
1946 state: RwLock::new(ShellState::default()),
1947 }
1948 }
1949}
1950
1951impl RuntimeOpsLifecycleRegistry {
1952 pub fn new() -> Self {
1953 let dsl = new_ops_dsl_authority();
1954 let feed_capacity = DEFAULT_MAX_COMPLETED.saturating_mul(4).max(1024);
1955 let feed_buffer = Arc::new(FeedBuffer::new(feed_capacity));
1956 Self {
1957 state: RwLock::new(ShellState {
1958 dsl,
1959 records: HashMap::new(),
1960 pending_wait: None,
1961 completed_order: VecDeque::new(),
1962 max_completed: DEFAULT_MAX_COMPLETED,
1963 max_concurrent: None,
1964 wait_request_id: None,
1965 feed_buffer,
1966 persist_tx: None,
1967 persist_epoch_id: None,
1968 persist_cursor_state: None,
1969 }),
1970 }
1971 }
1972
1973 pub fn with_config(config: OpsLifecycleConfig) -> Self {
1974 Self {
1975 state: RwLock::new(ShellState::new(config.max_completed, config.max_concurrent)),
1976 }
1977 }
1978
1979 fn recover_completion_feed_entry(
1980 shell: &mut ShellState,
1981 operation_id: &OperationId,
1982 entry: &CompletionFeedCanonicalState,
1983 ) -> Result<(), OpsLifecycleError> {
1984 let expected_operation_id = mm_dsl::OperationId::from_domain(operation_id).0;
1985 let terminal_outcome_kind =
1986 mm_dsl::OperationTerminalOutcomeKind::from(&entry.terminal_outcome);
1987 let effects = shell.dsl_apply_with_effects(
1988 mm_dsl::MeerkatMachineInput::RecoverCompletionFeedEntry {
1989 operation_id: expected_operation_id.clone(),
1990 kind: mm_dsl::OperationKind::from(entry.kind),
1991 terminal_outcome: terminal_outcome_kind,
1992 terminal_payload: entry.terminal_outcome.clone(),
1993 completion_sequence: entry.seq,
1994 },
1995 "RecoverCompletionFeedEntry",
1996 )?;
1997 let recovered = effects.iter().find_map(|effect| match effect {
1998 mm_dsl::MeerkatMachineEffect::CompletionFeedEntryRecovered {
1999 operation_id,
2000 seq,
2001 kind,
2002 terminal_outcome,
2003 terminal_payload,
2004 } => Some((
2005 operation_id,
2006 *seq,
2007 OperationKind::from(*kind),
2008 *terminal_outcome,
2009 terminal_payload,
2010 )),
2011 _ => None,
2012 });
2013 let Some((operation_id, seq, kind, terminal_outcome, terminal_payload)) = recovered else {
2014 return Err(OpsLifecycleError::Internal(
2015 "generated completion-feed recovery emitted no recovered entry".into(),
2016 ));
2017 };
2018 if operation_id != &expected_operation_id
2019 || seq != entry.seq
2020 || kind != entry.kind
2021 || terminal_outcome != terminal_outcome_kind
2022 || terminal_payload != &entry.terminal_outcome
2023 {
2024 return Err(OpsLifecycleError::Internal(format!(
2025 "generated completion-feed recovery drifted for {operation_id}"
2026 )));
2027 }
2028 Ok(())
2029 }
2030
2031 pub fn set_persistence_channel(
2037 &self,
2038 tx: crate::tokio::sync::mpsc::UnboundedSender<OpsLifecyclePersistenceRequest>,
2039 epoch_id: meerkat_core::RuntimeEpochId,
2040 cursor_state: Arc<meerkat_core::EpochCursorState>,
2041 ) {
2042 if let Ok(mut state) = self.state.write() {
2043 state.persist_tx = Some(tx);
2044 state.persist_epoch_id = Some(epoch_id);
2045 state.persist_cursor_state = Some(cursor_state);
2046 }
2047 }
2048
2049 pub fn from_recovered(snapshot: PersistedOpsSnapshot) -> Result<Self, OpsLifecycleError> {
2056 let PersistedOpsSnapshot {
2057 authority_state,
2058 operation_specs,
2059 completion_entries,
2060 cursors,
2061 ..
2062 } = snapshot;
2063 let max_completed = authority_state.max_completed;
2064 let max_concurrent = authority_state.max_concurrent;
2065 let next_completion_seq = authority_state.next_completion_seq;
2066 let authority_completion_entries = authority_state.completion_feed_entries;
2067 let authority_operations = authority_state.operations;
2068 let mut shell = ShellState::new(max_completed, max_concurrent);
2069
2070 let mut retained_ids: HashSet<OperationId> = HashSet::new();
2076 for (op_id, op_state) in authority_operations {
2077 let terminal_outcome = op_state
2078 .terminal_outcome
2079 .as_ref()
2080 .map(mm_dsl::OperationTerminalOutcomeKind::from);
2081 let terminal_payload = op_state.terminal_outcome.clone();
2082 let disposition = ShellState::recovered_operation_record_disposition(
2083 &op_id,
2084 op_state.status,
2085 op_state.kind,
2086 terminal_outcome.is_some(),
2087 terminal_payload.is_some(),
2088 op_state.completion_sequence.is_some(),
2089 )?;
2090 if disposition == RecoveredOperationRecordDisposition::Discard {
2091 continue;
2092 }
2093 if let Some(spec_source) = operation_specs
2094 .get(&op_id)
2095 .and_then(|spec| spec.operation_source.as_ref())
2096 && op_state.operation_source.as_ref() != Some(spec_source)
2097 {
2098 return Err(OpsLifecycleError::Internal(format!(
2099 "persisted operation source mirror for {op_id} drifted from generated authority"
2100 )));
2101 }
2102 let recovery = mm_dsl::MeerkatMachineInput::RecoverOpRecord {
2103 operation_id: mm_dsl::OperationId::from_domain(&op_id).0,
2104 status: mm_dsl::OperationStatus::from(op_state.status),
2105 kind: mm_dsl::OperationKind::from(op_state.kind),
2106 source: op_state
2107 .operation_source
2108 .as_ref()
2109 .map(mm_dsl::OperationSource::from_domain),
2110 peer_ready: op_state.peer_ready,
2111 progress_count: u64::from(op_state.progress_count),
2112 terminal_outcome,
2113 terminal_payload,
2114 completion_sequence: op_state.completion_sequence,
2115 };
2116 shell.dsl_apply(recovery, "RecoverOpRecord")?;
2117 let recovered_seq = shell.completion_sequence(&op_id).ok_or_else(|| {
2118 OpsLifecycleError::Internal(format!(
2119 "generated op recovery accepted {op_id} without completion sequence"
2120 ))
2121 })?;
2122 if op_state.completion_sequence != Some(recovered_seq) {
2123 return Err(OpsLifecycleError::Internal(format!(
2124 "generated op recovery completion sequence mismatch for {op_id}"
2125 )));
2126 }
2127 retained_ids.insert(op_id);
2128 }
2129 shell.dsl_apply(
2130 mm_dsl::MeerkatMachineInput::RecoverOpsCompletionCursor {
2131 next_completion_seq,
2132 },
2133 "RecoverOpsCompletionCursor",
2134 )?;
2135 shell.dsl_apply(
2136 mm_dsl::MeerkatMachineInput::RecoverCompletionConsumerCursors {
2137 agent_applied_cursor: cursors.agent_applied_cursor,
2138 runtime_observed_cursor: cursors.runtime_observed_seq,
2139 runtime_injected_cursor: cursors.runtime_last_injected_seq,
2140 },
2141 "RecoverCompletionConsumerCursors",
2142 )?;
2143
2144 let mut recovered_completed: Vec<(CompletionSeq, OperationId)> = retained_ids
2147 .iter()
2148 .filter_map(|id| shell.completion_sequence(id).map(|seq| (seq, id.clone())))
2149 .collect();
2150 recovered_completed.sort_by_key(|(seq, _)| *seq);
2151 shell.completed_order = recovered_completed.into_iter().map(|(_, id)| id).collect();
2152
2153 for (operation_id, entry) in &authority_completion_entries {
2157 if !retained_ids.contains(operation_id) {
2158 Self::recover_completion_feed_entry(&mut shell, operation_id, entry)?;
2159 }
2160 }
2161
2162 let canonical_feed_entries = shell.completion_feed_authority_entries()?;
2163 for (operation_id, entry) in &authority_completion_entries {
2164 let Some(recovered_entry) = canonical_feed_entries.get(operation_id) else {
2165 return Err(OpsLifecycleError::Internal(format!(
2166 "persisted completion feed authority for {operation_id} was not recovered"
2167 )));
2168 };
2169 if recovered_entry != entry {
2170 return Err(OpsLifecycleError::Internal(format!(
2171 "persisted completion feed authority drifted from generated recovery for {operation_id}"
2172 )));
2173 }
2174 }
2175
2176 let mut projection_entries_by_id: HashMap<OperationId, CompletionEntry> = HashMap::new();
2180 for entry in completion_entries {
2181 let Some(authority_entry) = canonical_feed_entries.get(&entry.operation_id) else {
2182 return Err(OpsLifecycleError::Internal(format!(
2183 "persisted completion feed projection for {} has no generated feed authority",
2184 entry.operation_id
2185 )));
2186 };
2187 if authority_entry.seq != entry.seq
2188 || authority_entry.kind != entry.kind
2189 || authority_entry.terminal_outcome != entry.terminal_outcome
2190 {
2191 return Err(OpsLifecycleError::Internal(format!(
2192 "persisted completion feed projection for {} drifted from generated feed authority",
2193 entry.operation_id
2194 )));
2195 }
2196 projection_entries_by_id.insert(entry.operation_id.clone(), entry);
2197 }
2198
2199 let mut recovered_entries: Vec<(OperationId, CompletionFeedCanonicalState)> =
2200 canonical_feed_entries.into_iter().collect();
2201 recovered_entries.sort_by_key(|(_, entry)| entry.seq);
2202 for (operation_id, entry) in recovered_entries {
2203 let projection = projection_entries_by_id.get(&operation_id);
2204 let display_name = operation_specs
2205 .get(&operation_id)
2206 .map(|spec| spec.display_name.clone())
2207 .or_else(|| projection.map(|entry| entry.display_name.clone()))
2208 .unwrap_or_default();
2209 let completed_at_ms = projection.and_then(|entry| entry.completed_at_ms);
2210 shell.feed_buffer.push(CompletionEntry {
2211 seq: entry.seq,
2212 operation_id,
2213 kind: entry.kind,
2214 display_name,
2215 terminal_outcome: entry.terminal_outcome,
2216 completed_at_ms,
2217 });
2218 }
2219
2220 for (op_id, spec) in operation_specs {
2223 if retained_ids.contains(&op_id) {
2224 let mut spec = spec;
2225 let operation_source = shell.operation_source(&op_id)?;
2226 ShellState::align_spec_child_session_id_to_source(
2227 &mut spec,
2228 operation_source.as_ref(),
2229 );
2230 shell.records.insert(
2231 op_id,
2232 ShellRecord {
2233 spec,
2234 peer_handle: None,
2235 watchers: Vec::new(),
2236 created_at: Instant::now(),
2237 started_at: None,
2238 completed_at: None,
2239 created_at_wall: SystemTime::now(),
2240 },
2241 );
2242 }
2243 }
2244
2245 Ok(Self {
2246 state: RwLock::new(shell),
2247 })
2248 }
2249
2250 pub fn capture_persistence_snapshot(
2255 &self,
2256 epoch_id: meerkat_core::RuntimeEpochId,
2257 cursor_state: &meerkat_core::EpochCursorState,
2258 ) -> Result<PersistedOpsSnapshot, OpsLifecycleError> {
2259 let state = self
2260 .state
2261 .read()
2262 .unwrap_or_else(std::sync::PoisonError::into_inner);
2263 state.capture_snapshot(epoch_id, cursor_state)
2264 }
2265
2266 pub fn completion_cursor_snapshot(&self) -> meerkat_core::EpochCursorSnapshot {
2268 let state = self
2269 .state
2270 .read()
2271 .unwrap_or_else(std::sync::PoisonError::into_inner);
2272 state.completion_cursor_snapshot()
2273 }
2274
2275 pub fn completion_feed_handle(&self) -> Arc<dyn CompletionFeed> {
2277 let state = self
2278 .state
2279 .read()
2280 .unwrap_or_else(std::sync::PoisonError::into_inner);
2281 Arc::new(RuntimeCompletionFeed {
2282 buffer: Arc::clone(&state.feed_buffer),
2283 })
2284 }
2285
2286 pub(crate) fn diagnostic_snapshot(
2288 &self,
2289 ) -> Result<RuntimeOpsDiagnosticSnapshot, OpsLifecycleError> {
2290 let state = self
2291 .state
2292 .read()
2293 .unwrap_or_else(std::sync::PoisonError::into_inner);
2294 let mut operations = state
2295 .operation_ids()?
2296 .into_iter()
2297 .map(|id| state.snapshot(&id))
2298 .collect::<Result<Vec<_>, _>>()?
2299 .into_iter()
2300 .flatten()
2301 .collect::<Vec<_>>();
2302 operations.sort_by(|left, right| left.display_name.cmp(&right.display_name));
2303 Ok(RuntimeOpsDiagnosticSnapshot {
2304 operation_count: state.operation_count(),
2305 active_count: state.active_count(),
2306 wait_request_id: state.wait_request_id.clone(),
2307 pending_wait_present: state.pending_wait.is_some(),
2308 pending_wait_request_id: state
2309 .pending_wait
2310 .as_ref()
2311 .map(|pending_wait| pending_wait.wait_request_id.clone()),
2312 wait_operation_ids: state.wait_operation_ids()?,
2313 operations,
2314 })
2315 }
2316
2317 fn read_state(&self) -> Result<RwLockReadGuard<'_, ShellState>, OpsLifecycleError> {
2318 self.state
2319 .read()
2320 .map_err(|_| OpsLifecycleError::Internal("ops lifecycle registry poisoned".into()))
2321 }
2322
2323 fn write_state(&self) -> Result<RwLockWriteGuard<'_, ShellState>, OpsLifecycleError> {
2324 self.state
2325 .write()
2326 .map_err(|_| OpsLifecycleError::Internal("ops lifecycle registry poisoned".into()))
2327 }
2328
2329 fn cancel_wait_all_internal(
2330 &self,
2331 wait_request_id: &WaitRequestId,
2332 ) -> Result<(), OpsLifecycleError> {
2333 let mut state = self.write_state()?;
2334 match state.wait_request_id.as_ref() {
2335 Some(active) if active == wait_request_id => {
2336 state.dsl_apply(
2341 mm_dsl::MeerkatMachineInput::CancelWaitAll,
2342 "CancelWaitAll(cancel)",
2343 )?;
2344 state.wait_request_id = None;
2345 if state
2346 .pending_wait
2347 .as_ref()
2348 .is_some_and(|pending| pending.wait_request_id == *wait_request_id)
2349 {
2350 state.pending_wait = None;
2351 }
2352 Ok(())
2353 }
2354 _ => {
2355 if state
2356 .pending_wait
2357 .as_ref()
2358 .is_some_and(|pending| pending.wait_request_id == *wait_request_id)
2359 {
2360 state.pending_wait = None;
2361 }
2362 Ok(())
2363 }
2364 }
2365 }
2366}
2367
2368enum WaitAllFutureState {
2369 Ready(Option<Result<WaitAllResult, OpsLifecycleError>>),
2370 Waiting(tokio::sync::oneshot::Receiver<WaitAllSatisfied>),
2371 Done,
2372}
2373
2374struct WaitAllFuture<'a> {
2375 registry: &'a RuntimeOpsLifecycleRegistry,
2376 wait_request_id: WaitRequestId,
2377 state: WaitAllFutureState,
2378}
2379
2380impl Future for WaitAllFuture<'_> {
2381 type Output = Result<WaitAllResult, OpsLifecycleError>;
2382
2383 fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2384 match &mut self.state {
2385 WaitAllFutureState::Ready(result) => {
2386 let ready = result.take().unwrap_or_else(|| {
2387 Err(OpsLifecycleError::Internal(
2388 "wait_all future polled after completion".into(),
2389 ))
2390 });
2391 self.state = WaitAllFutureState::Done;
2392 Poll::Ready(ready)
2393 }
2394 WaitAllFutureState::Waiting(receiver) => match std::pin::Pin::new(receiver).poll(cx) {
2395 Poll::Pending => Poll::Pending,
2396 Poll::Ready(Ok(satisfied)) => {
2397 let outcomes = match self.registry.read_state() {
2398 Ok(state) => state.collect_wait_outcomes(&satisfied.operation_ids),
2399 Err(err) => Err(err),
2400 };
2401 self.state = WaitAllFutureState::Done;
2402 Poll::Ready(outcomes.map(|outcomes| WaitAllResult {
2403 outcomes,
2404 satisfied,
2405 }))
2406 }
2407 Poll::Ready(Err(_)) => {
2408 self.state = WaitAllFutureState::Done;
2409 Poll::Ready(Err(OpsLifecycleError::Internal(
2410 "wait_all completion channel dropped".into(),
2411 )))
2412 }
2413 },
2414 WaitAllFutureState::Done => Poll::Ready(Err(OpsLifecycleError::Internal(
2415 "wait_all future polled after completion".into(),
2416 ))),
2417 }
2418 }
2419}
2420
2421impl Drop for WaitAllFuture<'_> {
2422 fn drop(&mut self) {
2423 if matches!(self.state, WaitAllFutureState::Waiting(_))
2424 && let Err(err) = self
2425 .registry
2426 .cancel_wait_all_internal(&self.wait_request_id)
2427 {
2428 tracing::error!(
2429 wait_request_id = %self.wait_request_id,
2430 error = %err,
2431 "generated wait_all authority rejected cancellation during drop"
2432 );
2433 }
2434 }
2435}
2436
2437fn op_lifecycle_action_label(action: mm_dsl::OpLifecycleActionKind) -> &'static str {
2442 match action {
2443 mm_dsl::OpLifecycleActionKind::Start => "provisioning_succeeded",
2444 mm_dsl::OpLifecycleActionKind::Fail => "fail_operation",
2445 mm_dsl::OpLifecycleActionKind::PeerReady => "peer_ready",
2446 mm_dsl::OpLifecycleActionKind::ProgressReported => "report_progress",
2447 mm_dsl::OpLifecycleActionKind::Complete => "complete_operation",
2448 mm_dsl::OpLifecycleActionKind::Abort => "abort_provisioning",
2449 mm_dsl::OpLifecycleActionKind::Cancel => "cancel_operation",
2450 mm_dsl::OpLifecycleActionKind::RetireRequested => "request_retire",
2451 mm_dsl::OpLifecycleActionKind::RetireCompleted => "mark_retired",
2452 mm_dsl::OpLifecycleActionKind::Terminate => "terminate_owner",
2453 }
2454}
2455
2456fn op_lifecycle_rejection_error_from_effects(
2457 id: &OperationId,
2458 requested_action: mm_dsl::OpLifecycleActionKind,
2459 effects: &[mm_dsl::MeerkatMachineEffect],
2460) -> Result<OpsLifecycleError, OpsLifecycleError> {
2461 let expected_id = mm_dsl::OperationId::from_domain(id).0;
2462 let mut rejection = None;
2463 for effect in effects {
2464 let mm_dsl::MeerkatMachineEffect::OpLifecycleTransitionRejected {
2465 operation_id,
2466 action,
2467 reason,
2468 status,
2469 } = effect
2470 else {
2471 continue;
2472 };
2473 if rejection.is_some() {
2474 return Err(OpsLifecycleError::Internal(
2475 "generated op lifecycle authority emitted multiple rejection results".into(),
2476 ));
2477 }
2478 if operation_id != &expected_id || *action != requested_action {
2479 return Err(OpsLifecycleError::Internal(format!(
2480 "generated op lifecycle authority resolved {operation_id}/{action:?} while shell requested {expected_id}/{requested_action:?}"
2481 )));
2482 }
2483 let error = match reason {
2484 mm_dsl::OpLifecycleRejectReasonKind::OperationNotFound => {
2485 if status.is_some() {
2486 return Err(OpsLifecycleError::Internal(
2487 "generated op lifecycle authority emitted not-found with status".into(),
2488 ));
2489 }
2490 OpsLifecycleError::NotFound(id.clone())
2491 }
2492 mm_dsl::OpLifecycleRejectReasonKind::InvalidTransition => {
2493 let status = status.ok_or_else(|| {
2494 OpsLifecycleError::Internal(
2495 "generated op lifecycle authority emitted invalid-transition without status"
2496 .into(),
2497 )
2498 })?;
2499 OpsLifecycleError::InvalidTransition {
2500 id: id.clone(),
2501 status: OperationStatus::from(status),
2502 action: op_lifecycle_action_label(requested_action),
2503 }
2504 }
2505 mm_dsl::OpLifecycleRejectReasonKind::PeerNotExpected => {
2506 if status.is_none() {
2507 return Err(OpsLifecycleError::Internal(
2508 "generated op lifecycle authority emitted peer-not-expected without status"
2509 .into(),
2510 ));
2511 }
2512 OpsLifecycleError::PeerNotExpected(id.clone())
2513 }
2514 mm_dsl::OpLifecycleRejectReasonKind::AlreadyPeerReady => {
2515 if status.is_none() {
2516 return Err(OpsLifecycleError::Internal(
2517 "generated op lifecycle authority emitted already-peer-ready without status"
2518 .into(),
2519 ));
2520 }
2521 OpsLifecycleError::AlreadyPeerReady(id.clone())
2522 }
2523 };
2524 rejection = Some(error);
2525 }
2526 rejection.ok_or_else(|| {
2527 OpsLifecycleError::Internal(
2528 "generated op lifecycle authority emitted no rejection result".into(),
2529 )
2530 })
2531}
2532
2533fn classify_generated_op_rejection(
2534 state: &mut ShellState,
2535 err: mm_dsl::MeerkatMachineTransitionError,
2536 id: &OperationId,
2537 action: mm_dsl::OpLifecycleActionKind,
2538) -> OpsLifecycleError {
2539 match err {
2540 mm_dsl::MeerkatMachineTransitionError::GuardRejected { .. } => {
2541 match state.dsl_apply_with_effects(
2542 mm_dsl::MeerkatMachineInput::ResolveOpLifecycleTransitionRejection {
2543 operation_id: mm_dsl::OperationId::from_domain(id).0,
2544 action,
2545 },
2546 "ResolveOpLifecycleTransitionRejection",
2547 ) {
2548 Ok(effects) => op_lifecycle_rejection_error_from_effects(id, action, &effects)
2549 .unwrap_or_else(|err| err),
2550 Err(err) => err,
2551 }
2552 }
2553 other => OpsLifecycleError::Internal(format!(
2554 "DSL rejected ops transition ({}): {other:?}",
2555 op_lifecycle_action_label(action)
2556 )),
2557 }
2558}
2559
2560fn apply_op_transition(
2561 state: &mut ShellState,
2562 id: &OperationId,
2563 input: mm_dsl::MeerkatMachineInput,
2564 action: mm_dsl::OpLifecycleActionKind,
2565) -> Result<(), OpsLifecycleError> {
2566 state
2567 .dsl_apply_raw(input)
2568 .map_err(|err| classify_generated_op_rejection(state, err, id, action))
2569}
2570
2571fn op_registration_error_from_effects(
2572 id: &OperationId,
2573 effects: &[mm_dsl::MeerkatMachineEffect],
2574) -> Result<Option<OpsLifecycleError>, OpsLifecycleError> {
2575 let expected_id = mm_dsl::OperationId::from_domain(id).0;
2576 let mut admission = None;
2577 for effect in effects {
2578 let mm_dsl::MeerkatMachineEffect::OpRegistrationAdmissionResolved {
2579 operation_id,
2580 result,
2581 reject_reason,
2582 max_concurrent_limit,
2583 active_op_count,
2584 } = effect
2585 else {
2586 continue;
2587 };
2588 if admission.is_some() {
2589 return Err(OpsLifecycleError::Internal(
2590 "generated op registration authority emitted multiple admission results".into(),
2591 ));
2592 }
2593 if operation_id != &expected_id {
2594 return Err(OpsLifecycleError::Internal(format!(
2595 "generated op registration authority resolved {operation_id} while shell requested {expected_id}"
2596 )));
2597 }
2598 admission = Some(match result {
2599 mm_dsl::OpRegistrationAdmissionResultKind::Accept => {
2600 if reject_reason.is_some() {
2601 return Err(OpsLifecycleError::Internal(
2602 "generated op registration authority accepted with rejection reason".into(),
2603 ));
2604 }
2605 None
2606 }
2607 mm_dsl::OpRegistrationAdmissionResultKind::Reject => {
2608 let reason = reject_reason.ok_or_else(|| {
2609 OpsLifecycleError::Internal(
2610 "generated op registration authority rejected without reason".into(),
2611 )
2612 })?;
2613 let error = match reason {
2614 mm_dsl::OpRegistrationRejectReasonKind::AlreadyRegistered => {
2615 OpsLifecycleError::AlreadyRegistered(id.clone())
2616 }
2617 mm_dsl::OpRegistrationRejectReasonKind::MaxConcurrentExceeded => {
2618 let limit = max_concurrent_limit.ok_or_else(|| {
2619 OpsLifecycleError::Internal(
2620 "generated op registration authority rejected capacity without limit"
2621 .into(),
2622 )
2623 })?;
2624 OpsLifecycleError::MaxConcurrentExceeded {
2625 limit: limit as usize,
2626 active: *active_op_count as usize,
2627 }
2628 }
2629 };
2630 Some(error)
2631 }
2632 });
2633 }
2634 admission.ok_or_else(|| {
2635 OpsLifecycleError::Internal(
2636 "generated op registration authority emitted no admission result".into(),
2637 )
2638 })
2639}
2640
2641impl OpsLifecycleRegistry for RuntimeOpsLifecycleRegistry {
2642 fn register_operation(&self, spec: OperationSpec) -> Result<(), OpsLifecycleError> {
2643 self.register_operation_with_admission_limit(spec, None)
2644 }
2645
2646 fn register_operation_with_admission_limit(
2647 &self,
2648 mut spec: OperationSpec,
2649 max_concurrent: Option<usize>,
2650 ) -> Result<(), OpsLifecycleError> {
2651 let mut state = self.write_state()?;
2652 let operation_id = spec.id.clone();
2653 let kind = spec.kind;
2654 let max_concurrent = max_concurrent
2655 .or(state.max_concurrent)
2656 .map(|limit| limit as u64);
2657
2658 let effects = state.dsl_apply_with_effects(
2659 mm_dsl::MeerkatMachineInput::RegisterOp {
2660 operation_id: mm_dsl::OperationId::from_domain(&operation_id).0,
2661 kind: mm_dsl::OperationKind::from_domain(&kind),
2662 source: spec
2663 .operation_source
2664 .as_ref()
2665 .map(mm_dsl::OperationSource::from_domain),
2666 max_concurrent,
2667 },
2668 "RegisterOp",
2669 )?;
2670 if let Some(error) = op_registration_error_from_effects(&operation_id, &effects)? {
2671 return Err(error);
2672 }
2673
2674 let authority_operation_source = state.operation_source(&operation_id)?;
2675 ShellState::align_spec_child_session_id_to_source(
2676 &mut spec,
2677 authority_operation_source.as_ref(),
2678 );
2679
2680 state.records.insert(operation_id, ShellRecord::new(spec));
2682 Ok(())
2683 }
2684
2685 fn provisioning_succeeded(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
2686 let mut state = self.write_state()?;
2687
2688 apply_op_transition(
2689 &mut state,
2690 id,
2691 mm_dsl::MeerkatMachineInput::StartOp {
2692 operation_id: mm_dsl::OperationId::from_domain(id).0,
2693 },
2694 mm_dsl::OpLifecycleActionKind::Start,
2695 )?;
2696
2697 if let Some(shell) = state.records.get_mut(id) {
2699 shell.started_at = Some(Instant::now());
2700 }
2701 Ok(())
2702 }
2703
2704 fn provisioning_failed(
2705 &self,
2706 id: &OperationId,
2707 error: String,
2708 ) -> Result<(), OpsLifecycleError> {
2709 let mut state = self.write_state()?;
2710
2711 let terminal_outcome = OperationTerminalOutcome::Failed { error };
2712 let outcome_kind = mm_dsl::OperationTerminalOutcomeKind::from(&terminal_outcome);
2713
2714 apply_op_transition(
2715 &mut state,
2716 id,
2717 mm_dsl::MeerkatMachineInput::FailOp {
2718 operation_id: mm_dsl::OperationId::from_domain(id).0,
2719 outcome: outcome_kind,
2720 payload: terminal_outcome,
2721 },
2722 mm_dsl::OpLifecycleActionKind::Fail,
2723 )?;
2724
2725 state.finalize_terminal(id)?;
2726 state.maybe_persist()?;
2727 Ok(())
2728 }
2729
2730 fn peer_ready(
2731 &self,
2732 id: &OperationId,
2733 peer: OperationPeerHandle,
2734 ) -> Result<(), OpsLifecycleError> {
2735 let mut state = self.write_state()?;
2736
2737 apply_op_transition(
2738 &mut state,
2739 id,
2740 mm_dsl::MeerkatMachineInput::PeerReadyOp {
2741 operation_id: mm_dsl::OperationId::from_domain(id).0,
2742 },
2743 mm_dsl::OpLifecycleActionKind::PeerReady,
2744 )?;
2745
2746 if let Some(shell) = state.records.get_mut(id) {
2748 shell.peer_handle = Some(peer);
2749 }
2750 Ok(())
2751 }
2752
2753 fn register_watcher(
2754 &self,
2755 id: &OperationId,
2756 ) -> Result<OperationCompletionWatch, OpsLifecycleError> {
2757 let mut state = self.write_state()?;
2758
2759 if !state.contains(id) {
2760 return Err(OpsLifecycleError::NotFound(id.clone()));
2761 }
2762
2763 if let Some(outcome) = state.terminal_outcome(id)? {
2765 return Ok(resolved_operation_completion_watch(outcome));
2766 }
2767
2768 let shell = state.shell_record_mut(id)?;
2770 let (tx, rx) = tokio::sync::oneshot::channel();
2771 let watch = operation_completion_watch_from_receiver(rx);
2772 shell.watchers.push(OperationCompletionNotifier::new(tx));
2773 Ok(watch)
2774 }
2775
2776 fn report_progress(
2777 &self,
2778 id: &OperationId,
2779 _update: OperationProgressUpdate,
2780 ) -> Result<(), OpsLifecycleError> {
2781 let mut state = self.write_state()?;
2782
2783 apply_op_transition(
2784 &mut state,
2785 id,
2786 mm_dsl::MeerkatMachineInput::ProgressReportedOp {
2787 operation_id: mm_dsl::OperationId::from_domain(id).0,
2788 },
2789 mm_dsl::OpLifecycleActionKind::ProgressReported,
2790 )?;
2791 Ok(())
2792 }
2793
2794 fn complete_operation(
2795 &self,
2796 id: &OperationId,
2797 result: OperationResult,
2798 ) -> Result<(), OpsLifecycleError> {
2799 let mut state = self.write_state()?;
2800
2801 let terminal_outcome = OperationTerminalOutcome::Completed(result);
2802 let outcome_kind = mm_dsl::OperationTerminalOutcomeKind::from(&terminal_outcome);
2803
2804 apply_op_transition(
2805 &mut state,
2806 id,
2807 mm_dsl::MeerkatMachineInput::CompleteOp {
2808 operation_id: mm_dsl::OperationId::from_domain(id).0,
2809 outcome: outcome_kind,
2810 payload: terminal_outcome,
2811 },
2812 mm_dsl::OpLifecycleActionKind::Complete,
2813 )?;
2814
2815 state.finalize_terminal(id)?;
2816 state.maybe_persist()?;
2817 Ok(())
2818 }
2819
2820 fn fail_operation(&self, id: &OperationId, error: String) -> Result<(), OpsLifecycleError> {
2821 let mut state = self.write_state()?;
2822
2823 let terminal_outcome = OperationTerminalOutcome::Failed { error };
2824 let outcome_kind = mm_dsl::OperationTerminalOutcomeKind::from(&terminal_outcome);
2825
2826 apply_op_transition(
2827 &mut state,
2828 id,
2829 mm_dsl::MeerkatMachineInput::FailOp {
2830 operation_id: mm_dsl::OperationId::from_domain(id).0,
2831 outcome: outcome_kind,
2832 payload: terminal_outcome,
2833 },
2834 mm_dsl::OpLifecycleActionKind::Fail,
2835 )?;
2836
2837 state.finalize_terminal(id)?;
2838 state.maybe_persist()?;
2839 Ok(())
2840 }
2841
2842 fn abort_provisioning(
2843 &self,
2844 id: &OperationId,
2845 reason: Option<String>,
2846 ) -> Result<(), OpsLifecycleError> {
2847 let mut state = self.write_state()?;
2848
2849 let terminal_outcome = OperationTerminalOutcome::Aborted { reason };
2850 let outcome_kind = mm_dsl::OperationTerminalOutcomeKind::from(&terminal_outcome);
2851
2852 apply_op_transition(
2853 &mut state,
2854 id,
2855 mm_dsl::MeerkatMachineInput::AbortOp {
2856 operation_id: mm_dsl::OperationId::from_domain(id).0,
2857 outcome: outcome_kind,
2858 payload: terminal_outcome,
2859 },
2860 mm_dsl::OpLifecycleActionKind::Abort,
2861 )?;
2862
2863 state.finalize_terminal(id)?;
2864 state.maybe_persist()?;
2865 Ok(())
2866 }
2867
2868 fn cancel_operation(
2869 &self,
2870 id: &OperationId,
2871 reason: Option<String>,
2872 ) -> Result<(), OpsLifecycleError> {
2873 let mut state = self.write_state()?;
2874
2875 let terminal_outcome = OperationTerminalOutcome::Cancelled { reason };
2876 let outcome_kind = mm_dsl::OperationTerminalOutcomeKind::from(&terminal_outcome);
2877
2878 apply_op_transition(
2879 &mut state,
2880 id,
2881 mm_dsl::MeerkatMachineInput::CancelOp {
2882 operation_id: mm_dsl::OperationId::from_domain(id).0,
2883 outcome: outcome_kind,
2884 payload: terminal_outcome,
2885 },
2886 mm_dsl::OpLifecycleActionKind::Cancel,
2887 )?;
2888
2889 state.finalize_terminal(id)?;
2890 state.maybe_persist()?;
2891 Ok(())
2892 }
2893
2894 fn request_retire(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
2895 let mut state = self.write_state()?;
2896
2897 apply_op_transition(
2898 &mut state,
2899 id,
2900 mm_dsl::MeerkatMachineInput::RetireRequestedOp {
2901 operation_id: mm_dsl::OperationId::from_domain(id).0,
2902 },
2903 mm_dsl::OpLifecycleActionKind::RetireRequested,
2904 )?;
2905 Ok(())
2906 }
2907
2908 fn mark_retired(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
2909 let mut state = self.write_state()?;
2910
2911 let terminal_outcome = OperationTerminalOutcome::Retired;
2912 let outcome_kind = mm_dsl::OperationTerminalOutcomeKind::from(&terminal_outcome);
2913
2914 apply_op_transition(
2915 &mut state,
2916 id,
2917 mm_dsl::MeerkatMachineInput::RetireCompletedOp {
2918 operation_id: mm_dsl::OperationId::from_domain(id).0,
2919 outcome: outcome_kind,
2920 payload: terminal_outcome,
2921 },
2922 mm_dsl::OpLifecycleActionKind::RetireCompleted,
2923 )?;
2924
2925 state.finalize_terminal(id)?;
2926 state.maybe_persist()?;
2927 Ok(())
2928 }
2929
2930 fn snapshot(
2931 &self,
2932 id: &OperationId,
2933 ) -> Result<Option<OperationLifecycleSnapshot>, OpsLifecycleError> {
2934 let state = self.read_state()?;
2935 state.snapshot(id)
2936 }
2937
2938 fn list_operations(&self) -> Result<Vec<OperationLifecycleSnapshot>, OpsLifecycleError> {
2939 let state = self.read_state()?;
2940 let mut snapshots = Vec::new();
2941 for id in state.operation_ids()? {
2942 let snapshot = state.snapshot(&id)?.ok_or_else(|| {
2943 OpsLifecycleError::Internal(format!(
2944 "operation {id} was present in generated lifecycle authority but produced no public snapshot"
2945 ))
2946 })?;
2947 snapshots.push(snapshot);
2948 }
2949 snapshots.sort_by(|left, right| left.display_name.cmp(&right.display_name));
2950 Ok(snapshots)
2951 }
2952
2953 fn classify_operation_terminality(
2954 &self,
2955 id: &OperationId,
2956 status: OperationStatus,
2957 ) -> Result<bool, OpsLifecycleError> {
2958 ShellState::operation_status_is_terminal(id, status)
2959 }
2960
2961 fn classify_operation_public_result(
2962 &self,
2963 id: &OperationId,
2964 ) -> Result<OperationPublicResultClass, OpsLifecycleError> {
2965 let state = self.read_state()?;
2966 let status = match state.status(id) {
2967 Some(status) => status,
2968 None if state.records.contains_key(id)
2969 || state.has_generated_operation_record_fact(id) =>
2970 {
2971 return Err(OpsLifecycleError::Internal(format!(
2972 "generated op lifecycle authority missing status for {id}"
2973 )));
2974 }
2975 None => OperationStatus::Absent,
2976 };
2977 ShellState::operation_public_result_class(id, status)
2978 }
2979
2980 fn classify_operation_completion_wake(
2981 &self,
2982 id: &OperationId,
2983 kind: OperationKind,
2984 ) -> Result<OperationCompletionWakeClass, OpsLifecycleError> {
2985 ShellState::operation_completion_wake_class(id, kind)
2986 }
2987
2988 fn classify_operation_transition_idempotence(
2989 &self,
2990 id: &OperationId,
2991 action: OperationLifecycleAction,
2992 ) -> Result<bool, OpsLifecycleError> {
2993 let state = self.read_state()?;
2994 let status = match state.status(id) {
2995 Some(status) => status,
2996 None if state.records.contains_key(id)
2997 || state.has_generated_operation_record_fact(id) =>
2998 {
2999 return Err(OpsLifecycleError::Internal(format!(
3000 "generated op lifecycle authority missing status for {id}"
3001 )));
3002 }
3003 None => OperationStatus::Absent,
3004 };
3005 ShellState::operation_transition_rejection_is_idempotent(id, action, status)
3006 }
3007
3008 fn terminate_owner(&self, reason: String) -> Result<(), OpsLifecycleError> {
3009 let mut state = self.write_state()?;
3010
3011 let to_terminate = state.owner_termination_targets()?;
3012
3013 for (op_id, _status) in &to_terminate {
3014 let terminal_outcome = OperationTerminalOutcome::Terminated {
3015 reason: reason.clone(),
3016 };
3017 let outcome_kind = mm_dsl::OperationTerminalOutcomeKind::from(&terminal_outcome);
3018
3019 apply_op_transition(
3020 &mut state,
3021 op_id,
3022 mm_dsl::MeerkatMachineInput::TerminateOp {
3023 operation_id: mm_dsl::OperationId::from_domain(op_id).0,
3024 outcome: outcome_kind,
3025 payload: terminal_outcome,
3026 },
3027 mm_dsl::OpLifecycleActionKind::Terminate,
3028 )?;
3029
3030 state.finalize_terminal(op_id)?;
3031 }
3032
3033 if !to_terminate.is_empty() {
3034 state.maybe_persist()?;
3035 }
3036 Ok(())
3037 }
3038
3039 fn collect_completed(
3040 &self,
3041 ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
3042 let mut state = self.write_state()?;
3043
3044 let ids: Vec<OperationId> = state.completed_order.iter().cloned().collect();
3045 let mut collected = Vec::with_capacity(ids.len());
3046 for id in ids {
3047 let outcome = state.terminal_outcome(&id)?;
3048 state.dsl_apply(
3049 mm_dsl::MeerkatMachineInput::CollectCompletedOp {
3050 operation_id: mm_dsl::OperationId::from_domain(&id).0,
3051 },
3052 "CollectCompletedOp",
3053 )?;
3054 state.completed_order.retain(|queued| queued != &id);
3055 state.records.remove(&id);
3056 if let Some(outcome) = outcome {
3057 collected.push((id, outcome));
3058 }
3059 }
3060 Ok(collected)
3061 }
3062
3063 fn completion_feed(&self) -> Option<Arc<dyn CompletionFeed>> {
3064 Some(self.completion_feed_handle())
3065 }
3066
3067 fn completion_cursor(
3068 &self,
3069 consumer: CompletionCursorConsumer,
3070 ) -> Result<Option<CompletionSeq>, OpsLifecycleError> {
3071 let state = self.read_state()?;
3072 Ok(Some(state.completion_cursor(consumer)))
3073 }
3074
3075 fn advance_completion_cursor(
3076 &self,
3077 consumer: CompletionCursorConsumer,
3078 cursor: CompletionSeq,
3079 projection: Option<&meerkat_core::EpochCursorState>,
3080 ) -> Result<CompletionSeq, OpsLifecycleError> {
3081 let mut state = self.write_state()?;
3082 let input = match consumer {
3083 CompletionCursorConsumer::AgentApplied => {
3084 mm_dsl::MeerkatMachineInput::AdvanceAgentCompletionCursor { cursor }
3085 }
3086 CompletionCursorConsumer::RuntimeObserved => {
3087 mm_dsl::MeerkatMachineInput::AdvanceRuntimeObservedCompletionCursor { cursor }
3088 }
3089 CompletionCursorConsumer::RuntimeInjected => {
3090 mm_dsl::MeerkatMachineInput::AdvanceRuntimeInjectedCompletionCursor { cursor }
3091 }
3092 };
3093 let effects = state.dsl_apply_with_effects(input, "AdvanceCompletionCursor")?;
3094 let advanced = effects
3095 .iter()
3096 .find_map(|effect| match (consumer, effect) {
3097 (
3098 CompletionCursorConsumer::AgentApplied,
3099 mm_dsl::MeerkatMachineEffect::AgentCompletionCursorAdvanced { cursor },
3100 ) => Some(*cursor),
3101 (
3102 CompletionCursorConsumer::RuntimeObserved,
3103 mm_dsl::MeerkatMachineEffect::RuntimeObservedCompletionCursorAdvanced {
3104 cursor,
3105 },
3106 ) => Some(*cursor),
3107 (
3108 CompletionCursorConsumer::RuntimeInjected,
3109 mm_dsl::MeerkatMachineEffect::RuntimeInjectedCompletionCursorAdvanced {
3110 cursor,
3111 },
3112 ) => Some(*cursor),
3113 _ => None,
3114 })
3115 .ok_or_else(|| {
3116 OpsLifecycleError::Internal(format!(
3117 "generated completion cursor transition emitted no feedback for {consumer:?}"
3118 ))
3119 })?;
3120 if let Some(projection) = projection {
3121 projection.project_authorized_completion_cursor(consumer, advanced);
3122 }
3123 Ok(advanced)
3124 }
3125
3126 fn wait_all(
3127 &self,
3128 run_id: &RunId,
3129 ids: &[OperationId],
3130 ) -> std::pin::Pin<
3131 Box<dyn std::future::Future<Output = Result<WaitAllResult, OpsLifecycleError>> + Send + '_>,
3132 > {
3133 let wait_request_id = WaitRequestId::new();
3134 let owned_ids = ids.to_vec();
3135
3136 let state = match self.write_state() {
3137 Ok(mut state) => {
3138 match state.begin_wait_all_authority(run_id, &wait_request_id, &owned_ids) {
3139 Ok(WaitAllAuthorityPlan::AlreadySatisfied(satisfied)) => {
3140 let outcomes =
3141 state
3142 .collect_wait_outcomes(&satisfied.operation_ids)
3143 .map(|outcomes| WaitAllResult {
3144 outcomes,
3145 satisfied,
3146 });
3147 WaitAllFutureState::Ready(Some(outcomes))
3148 }
3149 Ok(WaitAllAuthorityPlan::ActivateBarrier) => {
3150 if state.pending_wait.is_some() {
3151 let rollback = state.dsl_apply(
3157 mm_dsl::MeerkatMachineInput::CancelWaitAll,
3158 "CancelWaitAll(rollback)",
3159 );
3160 return Box::pin(WaitAllFuture {
3161 registry: self,
3162 wait_request_id,
3163 state: WaitAllFutureState::Ready(Some(Err(match rollback {
3164 Ok(()) => OpsLifecycleError::Internal(
3165 "wait_all started while a pending wait sender already existed"
3166 .into(),
3167 ),
3168 Err(err) => err,
3169 }))),
3170 });
3171 }
3172 state.wait_request_id = Some(wait_request_id.clone());
3173 let (sender, receiver) = tokio::sync::oneshot::channel();
3174 state.pending_wait = Some(PendingWaitState {
3175 wait_request_id: wait_request_id.clone(),
3176 sender,
3177 });
3178 WaitAllFutureState::Waiting(receiver)
3179 }
3180 Err(err) => WaitAllFutureState::Ready(Some(Err(err))),
3181 }
3182 }
3183 Err(err) => WaitAllFutureState::Ready(Some(Err(err))),
3184 };
3185
3186 Box::pin(WaitAllFuture {
3187 registry: self,
3188 wait_request_id,
3189 state,
3190 })
3191 }
3192}
3193
3194#[cfg(test)]
3195#[allow(clippy::unwrap_used, clippy::panic)]
3196mod tests {
3197 use super::*;
3198 use meerkat_core::comms::{PeerId, TrustedPeerDescriptor};
3199 use meerkat_core::lifecycle::RunId;
3200 use meerkat_core::ops_lifecycle::{OperationKind, OpsLifecycleRegistry};
3201 use meerkat_core::types::SessionId;
3202 use std::sync::atomic::Ordering;
3203 use uuid::Uuid;
3204
3205 fn test_run_id() -> RunId {
3206 RunId(Uuid::from_u128(1))
3207 }
3208
3209 fn background_spec(name: &str) -> OperationSpec {
3210 OperationSpec {
3211 id: OperationId::new(),
3212 kind: OperationKind::BackgroundToolOp,
3213 owner_session_id: SessionId::new(),
3214 display_name: name.into(),
3215 source_label: "test".into(),
3216 operation_source: None,
3217 child_session_id: None,
3218 expect_peer_channel: false,
3219 }
3220 }
3221
3222 #[tokio::test]
3223 async fn late_watchers_resolve_immediately() {
3224 let registry = RuntimeOpsLifecycleRegistry::new();
3225 let spec = background_spec("late");
3226 let op_id = spec.id.clone();
3227 registry.register_operation(spec).unwrap();
3228 registry.provisioning_succeeded(&op_id).unwrap();
3229 registry
3230 .complete_operation(
3231 &op_id,
3232 OperationResult {
3233 id: op_id.clone(),
3234 content: "done".into(),
3235 is_error: false,
3236 duration_ms: 1,
3237 tokens_used: 0,
3238 },
3239 )
3240 .unwrap();
3241
3242 let watch = registry.register_watcher(&op_id).unwrap();
3243 match watch
3244 .await
3245 .expect("operation completion watch should resolve")
3246 {
3247 OperationTerminalOutcome::Completed(result) => assert_eq!(result.content, "done"),
3248 other => panic!("expected completed outcome, got {other:?}"),
3249 }
3250 }
3251
3252 #[tokio::test]
3253 async fn dropped_watch_sender_is_waiter_error_not_terminal_outcome() {
3254 let (tx, rx) = tokio::sync::oneshot::channel();
3255 let watch = operation_completion_watch_from_receiver(rx);
3256 drop(tx);
3257
3258 assert_eq!(
3259 watch.await,
3260 Err(meerkat_core::ops_lifecycle::OperationCompletionWatchError::ChannelClosed)
3261 );
3262 }
3263
3264 #[test]
3265 fn peer_ready_requires_peer_expectation() {
3266 let registry = RuntimeOpsLifecycleRegistry::new();
3267 let spec = background_spec("no-peer");
3268 let op_id = spec.id.clone();
3269 registry.register_operation(spec).unwrap();
3270 registry.provisioning_succeeded(&op_id).unwrap();
3271
3272 let result = registry.peer_ready(
3273 &op_id,
3274 OperationPeerHandle {
3275 peer_name: meerkat_core::comms::PeerName::new("peer").unwrap(),
3276 trusted_peer: TrustedPeerDescriptor::test_only_unsigned_typed(
3277 "peer",
3278 PeerId::new(),
3279 "inproc://peer",
3280 )
3281 .unwrap(),
3282 },
3283 );
3284 assert!(matches!(result, Err(OpsLifecycleError::PeerNotExpected(_))));
3285 }
3286
3287 #[test]
3293 fn typed_terminal_payload_classifies_and_reads_back_each_variant() {
3294 let op_id = OperationId::new();
3295 let outcomes = vec![
3296 (
3297 OperationTerminalOutcome::Completed(OperationResult {
3298 id: op_id.clone(),
3299 content: "done".into(),
3300 is_error: false,
3301 duration_ms: 7,
3302 tokens_used: 42,
3303 }),
3304 mm_dsl::OperationTerminalOutcomeKind::Completed,
3305 ),
3306 (
3307 OperationTerminalOutcome::Failed {
3308 error: "boom".into(),
3309 },
3310 mm_dsl::OperationTerminalOutcomeKind::Failed,
3311 ),
3312 (
3313 OperationTerminalOutcome::Aborted {
3314 reason: Some("user aborted".into()),
3315 },
3316 mm_dsl::OperationTerminalOutcomeKind::Aborted,
3317 ),
3318 (
3319 OperationTerminalOutcome::Aborted { reason: None },
3320 mm_dsl::OperationTerminalOutcomeKind::Aborted,
3321 ),
3322 (
3323 OperationTerminalOutcome::Cancelled {
3324 reason: Some("cancelled".into()),
3325 },
3326 mm_dsl::OperationTerminalOutcomeKind::Cancelled,
3327 ),
3328 (
3329 OperationTerminalOutcome::Cancelled { reason: None },
3330 mm_dsl::OperationTerminalOutcomeKind::Cancelled,
3331 ),
3332 (
3333 OperationTerminalOutcome::Retired,
3334 mm_dsl::OperationTerminalOutcomeKind::Retired,
3335 ),
3336 (
3337 OperationTerminalOutcome::Terminated {
3338 reason: "owner stopped".into(),
3339 },
3340 mm_dsl::OperationTerminalOutcomeKind::Terminated,
3341 ),
3342 ];
3343
3344 for (outcome, expected_kind) in &outcomes {
3345 assert_eq!(
3346 mm_dsl::OperationTerminalOutcomeKind::from(outcome),
3347 *expected_kind,
3348 "typed payload {outcome:?} must classify to {expected_kind:?}"
3349 );
3350 let read = ShellState::checked_terminal_payload(
3351 *expected_kind,
3352 outcome,
3353 "test authority",
3354 "test-op",
3355 )
3356 .expect("matching discriminant must read back the exact payload");
3357 assert_eq!(&read, outcome);
3358 }
3359
3360 let err = ShellState::checked_terminal_payload(
3362 mm_dsl::OperationTerminalOutcomeKind::Completed,
3363 &OperationTerminalOutcome::Retired,
3364 "test authority",
3365 "test-op",
3366 )
3367 .expect_err("variant mismatch must be rejected");
3368 assert!(matches!(err, OpsLifecycleError::Internal(_)));
3369 }
3370
3371 #[test]
3377 fn generated_guard_rejects_terminal_payload_variant_mismatch() {
3378 let registry = RuntimeOpsLifecycleRegistry::new();
3379 let spec = background_spec("variant-mismatch");
3380 let op_id = spec.id.clone();
3381 registry.register_operation(spec).unwrap();
3382 registry.provisioning_succeeded(&op_id).unwrap();
3383
3384 let mut state = registry.write_state().unwrap();
3385 let err = state
3386 .dsl_apply(
3387 mm_dsl::MeerkatMachineInput::CompleteOp {
3388 operation_id: mm_dsl::OperationId::from_domain(&op_id).0,
3389 outcome: mm_dsl::OperationTerminalOutcomeKind::Completed,
3390 payload: OperationTerminalOutcome::Retired,
3392 },
3393 "CompleteOp",
3394 )
3395 .expect_err("machine must reject payload variant mismatch");
3396 assert!(
3403 matches!(
3404 &err,
3405 OpsLifecycleError::Internal(message)
3406 if message.contains("GuardRejected") && message.contains("CompleteOp")
3407 ),
3408 "expected generated guard rejection for CompleteOp, got: {err:?}"
3409 );
3410 drop(state);
3411
3412 registry
3415 .complete_operation(
3416 &op_id,
3417 OperationResult {
3418 id: op_id.clone(),
3419 content: "done".into(),
3420 is_error: false,
3421 duration_ms: 1,
3422 tokens_used: 0,
3423 },
3424 )
3425 .expect("matching variant must complete");
3426 }
3427
3428 #[test]
3429 fn duplicate_registration_rejection_is_generated() {
3430 let registry = RuntimeOpsLifecycleRegistry::new();
3431 let spec = background_spec("duplicate");
3432 let op_id = spec.id.clone();
3433
3434 registry.register_operation(spec.clone()).unwrap();
3435 let result = registry.register_operation(spec);
3436
3437 assert!(matches!(
3438 result,
3439 Err(OpsLifecycleError::AlreadyRegistered(id)) if id == op_id
3440 ));
3441 }
3442
3443 #[test]
3444 fn invalid_transition_rejection_is_generated() {
3445 let registry = RuntimeOpsLifecycleRegistry::new();
3446 let spec = background_spec("invalid-transition");
3447 let op_id = spec.id.clone();
3448 registry.register_operation(spec).unwrap();
3449
3450 let result = registry.complete_operation(
3451 &op_id,
3452 OperationResult {
3453 id: op_id.clone(),
3454 content: "too-early".into(),
3455 is_error: false,
3456 duration_ms: 1,
3457 tokens_used: 0,
3458 },
3459 );
3460
3461 assert!(matches!(
3462 result,
3463 Err(OpsLifecycleError::InvalidTransition {
3464 id,
3465 status: OperationStatus::Provisioning,
3466 action: "complete_operation",
3467 }) if id == op_id
3468 ));
3469 }
3470
3471 #[tokio::test]
3472 async fn multi_listener_completion() {
3473 let registry = RuntimeOpsLifecycleRegistry::new();
3474 let spec = background_spec("multi");
3475 let op_id = spec.id.clone();
3476 registry.register_operation(spec).unwrap();
3477 registry.provisioning_succeeded(&op_id).unwrap();
3478
3479 let watch1 = registry.register_watcher(&op_id).unwrap();
3480 let watch2 = registry.register_watcher(&op_id).unwrap();
3481 let watch3 = registry.register_watcher(&op_id).unwrap();
3482
3483 registry
3484 .complete_operation(
3485 &op_id,
3486 OperationResult {
3487 id: op_id.clone(),
3488 content: "multi-done".into(),
3489 is_error: false,
3490 duration_ms: 1,
3491 tokens_used: 0,
3492 },
3493 )
3494 .unwrap();
3495
3496 for watch in [watch1, watch2, watch3] {
3497 match watch
3498 .await
3499 .expect("operation completion watch should resolve")
3500 {
3501 OperationTerminalOutcome::Completed(result) => {
3502 assert_eq!(result.content, "multi-done");
3503 }
3504 other => panic!("expected completed, got {other:?}"),
3505 }
3506 }
3507 }
3508
3509 #[tokio::test]
3510 async fn wait_all_returns_all_outcomes() {
3511 let registry = RuntimeOpsLifecycleRegistry::new();
3512
3513 let spec_a = background_spec("a");
3514 let id_a = spec_a.id.clone();
3515 registry.register_operation(spec_a).unwrap();
3516 registry.provisioning_succeeded(&id_a).unwrap();
3517
3518 let spec_b = background_spec("b");
3519 let id_b = spec_b.id.clone();
3520 registry.register_operation(spec_b).unwrap();
3521 registry.provisioning_succeeded(&id_b).unwrap();
3522
3523 registry
3524 .complete_operation(
3525 &id_a,
3526 OperationResult {
3527 id: id_a.clone(),
3528 content: "a-done".into(),
3529 is_error: false,
3530 duration_ms: 1,
3531 tokens_used: 0,
3532 },
3533 )
3534 .unwrap();
3535 registry.fail_operation(&id_b, "b-error".into()).unwrap();
3536
3537 let wait_result = registry
3538 .wait_all(&test_run_id(), &[id_a.clone(), id_b.clone()])
3539 .await
3540 .unwrap();
3541 assert_eq!(wait_result.outcomes.len(), 2);
3542 assert_eq!(wait_result.outcomes[0].0, id_a);
3543 assert!(matches!(
3544 wait_result.outcomes[0].1,
3545 OperationTerminalOutcome::Completed(_)
3546 ));
3547 assert_eq!(wait_result.outcomes[1].0, id_b);
3548 assert!(matches!(
3549 wait_result.outcomes[1].1,
3550 OperationTerminalOutcome::Failed { .. }
3551 ));
3552 assert_eq!(wait_result.satisfied.operation_ids.len(), 2);
3554 assert_ne!(wait_result.satisfied.wait_request_id.to_string(), "");
3555 }
3556
3557 #[tokio::test]
3560 async fn wait_all_trait_path_submits_through_authority() {
3561 let registry = RuntimeOpsLifecycleRegistry::new();
3562 let spec = background_spec("trait-wait");
3563 let op_id = spec.id.clone();
3564 registry.register_operation(spec).unwrap();
3565 registry.provisioning_succeeded(&op_id).unwrap();
3566 registry
3567 .complete_operation(
3568 &op_id,
3569 OperationResult {
3570 id: op_id.clone(),
3571 content: "done".into(),
3572 is_error: false,
3573 duration_ms: 1,
3574 tokens_used: 0,
3575 },
3576 )
3577 .unwrap();
3578
3579 let trait_ref: &dyn OpsLifecycleRegistry = ®istry;
3581 let wait_result = trait_ref
3582 .wait_all(&test_run_id(), std::slice::from_ref(&op_id))
3583 .await
3584 .unwrap();
3585 assert_eq!(wait_result.outcomes.len(), 1);
3586 assert!(matches!(
3587 wait_result.outcomes[0].1,
3588 OperationTerminalOutcome::Completed(_)
3589 ));
3590 assert_eq!(wait_result.satisfied.operation_ids, vec![op_id]);
3592 assert_ne!(wait_result.satisfied.wait_request_id.to_string(), "");
3593 let state = registry.read_state().unwrap();
3594 assert!(
3595 !state.wait_active(),
3596 "already-satisfied wait_all must be cleared by generated satisfaction authority"
3597 );
3598 assert!(state.wait_operation_ids().unwrap().is_empty());
3599 }
3600
3601 #[tokio::test]
3602 async fn wait_all_duplicate_rejection_is_generated() {
3603 let registry = RuntimeOpsLifecycleRegistry::new();
3604 let spec = background_spec("duplicate-wait");
3605 let op_id = spec.id.clone();
3606 registry.register_operation(spec).unwrap();
3607 registry.provisioning_succeeded(&op_id).unwrap();
3608
3609 let result = registry
3610 .wait_all(&test_run_id(), &[op_id.clone(), op_id.clone()])
3611 .await;
3612
3613 assert!(matches!(
3614 result,
3615 Err(OpsLifecycleError::DuplicateWaitOperation(id)) if id == op_id
3616 ));
3617 let state = registry.read_state().unwrap();
3618 assert!(
3619 !state.wait_active(),
3620 "duplicate wait rejection must not create a shell or machine barrier"
3621 );
3622 assert!(state.wait_operation_ids().unwrap().is_empty());
3623 }
3624
3625 #[tokio::test]
3626 async fn wait_all_active_rejection_is_generated() {
3627 let registry = RuntimeOpsLifecycleRegistry::new();
3628 let spec = background_spec("active-wait");
3629 let op_id = spec.id.clone();
3630 registry.register_operation(spec).unwrap();
3631 registry.provisioning_succeeded(&op_id).unwrap();
3632
3633 let active_wait = registry.wait_all(&test_run_id(), std::slice::from_ref(&op_id));
3634 let result = registry
3635 .wait_all(&test_run_id(), std::slice::from_ref(&op_id))
3636 .await;
3637
3638 assert!(matches!(result, Err(OpsLifecycleError::WaitAlreadyActive)));
3639 drop(active_wait);
3640 let state = registry.read_state().unwrap();
3641 assert!(!state.wait_active());
3642 assert!(state.wait_operation_ids().unwrap().is_empty());
3643 }
3644
3645 #[tokio::test]
3646 async fn wait_all_unknown_operation_rejection_is_generated() {
3647 let registry = RuntimeOpsLifecycleRegistry::new();
3648 let op_id = OperationId::new();
3649
3650 let result = registry
3651 .wait_all(&test_run_id(), std::slice::from_ref(&op_id))
3652 .await;
3653
3654 assert!(matches!(result, Err(OpsLifecycleError::NotFound(id)) if id == op_id));
3655 let state = registry.read_state().unwrap();
3656 assert!(!state.wait_active());
3657 assert!(state.wait_operation_ids().unwrap().is_empty());
3658 }
3659
3660 #[tokio::test]
3661 async fn wait_all_resolves_from_authority_owned_wait_request() {
3662 let registry = RuntimeOpsLifecycleRegistry::new();
3663 let run_id = test_run_id();
3664
3665 let spec = background_spec("pending");
3666 let op_id = spec.id.clone();
3667 registry.register_operation(spec).unwrap();
3668 registry.provisioning_succeeded(&op_id).unwrap();
3669
3670 let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
3671 tokio::pin!(wait_fut);
3672 assert!(
3673 tokio::time::timeout(std::time::Duration::from_millis(10), &mut wait_fut)
3674 .await
3675 .is_err()
3676 );
3677
3678 let active_wait_request_id = {
3679 let state = registry.read_state().unwrap();
3680 let wait_request_id = match state.wait_request_id.clone() {
3681 Some(wait_request_id) => wait_request_id,
3682 None => panic!("wait request should be active"),
3683 };
3684 assert_eq!(
3685 state.wait_operation_ids().unwrap().as_slice(),
3686 std::slice::from_ref(&op_id)
3687 );
3688 wait_request_id
3689 };
3690
3691 registry
3692 .complete_operation(
3693 &op_id,
3694 OperationResult {
3695 id: op_id.clone(),
3696 content: "done".into(),
3697 is_error: false,
3698 duration_ms: 1,
3699 tokens_used: 0,
3700 },
3701 )
3702 .unwrap();
3703
3704 let wait_result = wait_fut.await.unwrap();
3705 assert_eq!(
3706 wait_result.satisfied.wait_request_id,
3707 active_wait_request_id
3708 );
3709 assert_eq!(wait_result.satisfied.operation_ids, vec![op_id.clone()]);
3710 assert!(matches!(
3711 wait_result.outcomes.as_slice(),
3712 [(returned_id, OperationTerminalOutcome::Completed(_))] if *returned_id == op_id
3713 ));
3714 assert!(registry.read_state().unwrap().wait_request_id.is_none());
3715 }
3716
3717 #[tokio::test]
3718 async fn dropping_wait_all_future_cancels_active_wait_request() {
3719 let registry = RuntimeOpsLifecycleRegistry::new();
3720 let run_id = test_run_id();
3721
3722 let spec = background_spec("cancelled-wait");
3723 let op_id = spec.id.clone();
3724 registry.register_operation(spec).unwrap();
3725 registry.provisioning_succeeded(&op_id).unwrap();
3726
3727 let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
3728 drop(wait_fut);
3729
3730 let state = registry.read_state().unwrap();
3731 assert!(state.wait_request_id.is_none());
3732 assert!(state.wait_operation_ids().unwrap().is_empty());
3733 assert!(!state.wait_active());
3734 }
3735
3736 #[tokio::test]
3742 async fn satisfy_wait_authority_fault_fails_terminal_and_unblocks_waiter() {
3743 let registry = RuntimeOpsLifecycleRegistry::new();
3744 let run_id = test_run_id();
3745
3746 let spec = background_spec("corrupt-barrier");
3747 let op_id = spec.id.clone();
3748 registry.register_operation(spec).unwrap();
3749 registry.provisioning_succeeded(&op_id).unwrap();
3750
3751 let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
3753 tokio::pin!(wait_fut);
3754 assert!(
3755 tokio::time::timeout(std::time::Duration::from_millis(10), &mut wait_fut)
3756 .await
3757 .is_err(),
3758 "barrier waiter must still be pending before corruption"
3759 );
3760 assert!(registry.read_state().unwrap().wait_request_id.is_some());
3761
3762 {
3766 let mut state = registry.write_state().unwrap();
3767 let mut machine_state = state.dsl.0.state().clone();
3768 machine_state.wait_run_id = None;
3769 state.dsl = DslAuthority(Box::new(
3770 mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
3771 ));
3772 }
3773
3774 let err = registry
3777 .complete_operation(
3778 &op_id,
3779 OperationResult {
3780 id: op_id.clone(),
3781 content: "done".into(),
3782 is_error: false,
3783 duration_ms: 1,
3784 tokens_used: 0,
3785 },
3786 )
3787 .expect_err("corrupt wait authority must fail the terminal transition");
3788 assert!(
3789 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("active wait without run id")),
3790 "unexpected terminal error: {err:?}"
3791 );
3792
3793 let waiter_result = tokio::time::timeout(std::time::Duration::from_secs(1), &mut wait_fut)
3795 .await
3796 .expect("waiter must resolve, not hang, after authority corruption");
3797 match waiter_result {
3798 Err(OpsLifecycleError::Internal(message)) => assert!(
3799 message.contains("wait_all completion channel dropped"),
3800 "unexpected waiter error message: {message}"
3801 ),
3802 other => panic!("expected dropped-channel Internal error, got {other:?}"),
3803 }
3804 }
3805
3806 #[test]
3810 fn completion_cursor_propagates_poison_not_none() {
3811 let registry = std::sync::Arc::new(RuntimeOpsLifecycleRegistry::new());
3812
3813 let poison_registry = std::sync::Arc::clone(®istry);
3815 let join = std::thread::spawn(move || {
3816 let _guard = poison_registry.write_state().unwrap();
3817 panic!("intentional panic to poison ops lifecycle registry lock");
3818 });
3819 assert!(
3820 join.join().is_err(),
3821 "poisoning thread must have panicked while holding the write guard"
3822 );
3823
3824 let trait_ref: &dyn OpsLifecycleRegistry = registry.as_ref();
3825 let result = trait_ref.completion_cursor(CompletionCursorConsumer::AgentApplied);
3826 match result {
3827 Err(OpsLifecycleError::Internal(message)) => assert!(
3828 message.contains("ops lifecycle registry poisoned"),
3829 "unexpected cursor error message: {message}"
3830 ),
3831 other => panic!("poisoned registry must surface typed Internal fault, got {other:?}"),
3832 }
3833 }
3834
3835 #[test]
3836 fn terminate_owner_only_targets_non_terminal_operations() {
3837 let registry = RuntimeOpsLifecycleRegistry::new();
3838
3839 let running_spec = background_spec("running");
3840 let running_id = running_spec.id.clone();
3841 registry.register_operation(running_spec).unwrap();
3842 registry.provisioning_succeeded(&running_id).unwrap();
3843
3844 let completed_spec = background_spec("completed");
3845 let completed_id = completed_spec.id.clone();
3846 registry.register_operation(completed_spec).unwrap();
3847 registry.provisioning_succeeded(&completed_id).unwrap();
3848 registry
3849 .complete_operation(
3850 &completed_id,
3851 OperationResult {
3852 id: completed_id.clone(),
3853 content: "done".into(),
3854 is_error: false,
3855 duration_ms: 1,
3856 tokens_used: 0,
3857 },
3858 )
3859 .unwrap();
3860
3861 registry.terminate_owner("shutdown".into()).unwrap();
3862
3863 assert!(matches!(
3864 registry.snapshot(&running_id).unwrap().unwrap().status,
3865 OperationStatus::Terminated
3866 ));
3867 assert!(matches!(
3868 registry.snapshot(&completed_id).unwrap().unwrap().status,
3869 OperationStatus::Completed
3870 ));
3871 }
3872
3873 #[test]
3874 fn collect_completed_drains_terminal_operations() {
3875 let registry = RuntimeOpsLifecycleRegistry::new();
3876
3877 let spec_a = background_spec("a");
3878 let id_a = spec_a.id.clone();
3879 registry.register_operation(spec_a).unwrap();
3880 registry.provisioning_succeeded(&id_a).unwrap();
3881 registry
3882 .complete_operation(
3883 &id_a,
3884 OperationResult {
3885 id: id_a.clone(),
3886 content: "done".into(),
3887 is_error: false,
3888 duration_ms: 1,
3889 tokens_used: 0,
3890 },
3891 )
3892 .unwrap();
3893
3894 let spec_b = background_spec("b");
3895 let id_b = spec_b.id.clone();
3896 registry.register_operation(spec_b).unwrap();
3897
3898 let collected = registry.collect_completed().unwrap();
3899 assert_eq!(collected.len(), 1);
3900 assert_eq!(collected[0].0, id_a);
3901
3902 assert!(registry.snapshot(&id_a).unwrap().is_none());
3903 assert!(registry.snapshot(&id_b).unwrap().is_some());
3904
3905 let collected2 = registry.collect_completed().unwrap();
3906 assert!(collected2.is_empty());
3907 }
3908
3909 #[test]
3910 fn bounded_completed_retention_evicts_oldest() {
3911 let registry = RuntimeOpsLifecycleRegistry::with_config(OpsLifecycleConfig {
3912 max_completed: 3,
3913 max_concurrent: None,
3914 });
3915
3916 let mut ids = Vec::new();
3917 for i in 0..5 {
3918 let spec = background_spec(&format!("op-{i}"));
3919 let id = spec.id.clone();
3920 registry.register_operation(spec).unwrap();
3921 registry.provisioning_succeeded(&id).unwrap();
3922 registry
3923 .complete_operation(
3924 &id,
3925 OperationResult {
3926 id: id.clone(),
3927 content: format!("done-{i}"),
3928 is_error: false,
3929 duration_ms: 1,
3930 tokens_used: 0,
3931 },
3932 )
3933 .unwrap();
3934 ids.push(id);
3935 }
3936
3937 assert!(registry.snapshot(&ids[0]).unwrap().is_none());
3938 assert!(registry.snapshot(&ids[1]).unwrap().is_none());
3939 assert!(registry.snapshot(&ids[2]).unwrap().is_some());
3940 assert!(registry.snapshot(&ids[3]).unwrap().is_some());
3941 assert!(registry.snapshot(&ids[4]).unwrap().is_some());
3942 }
3943
3944 #[test]
3945 fn recovered_snapshot_retains_only_machine_accepted_terminal_records() {
3946 let registry = RuntimeOpsLifecycleRegistry::new();
3947
3948 let completed_spec = background_spec("completed");
3949 let completed_id = completed_spec.id.clone();
3950 registry.register_operation(completed_spec).unwrap();
3951 registry.provisioning_succeeded(&completed_id).unwrap();
3952 registry
3953 .complete_operation(
3954 &completed_id,
3955 OperationResult {
3956 id: completed_id.clone(),
3957 content: "done".into(),
3958 is_error: false,
3959 duration_ms: 1,
3960 tokens_used: 0,
3961 },
3962 )
3963 .unwrap();
3964
3965 let running_spec = background_spec("running");
3966 let running_id = running_spec.id.clone();
3967 registry.register_operation(running_spec).unwrap();
3968 registry.provisioning_succeeded(&running_id).unwrap();
3969
3970 let cursor_state = meerkat_core::EpochCursorState::new();
3971 let snapshot = registry
3972 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
3973 .unwrap();
3974 let recovered = RuntimeOpsLifecycleRegistry::from_recovered(snapshot).unwrap();
3975
3976 assert!(recovered.snapshot(&completed_id).unwrap().is_some());
3977 assert!(recovered.snapshot(&running_id).unwrap().is_none());
3978
3979 let collected = recovered.collect_completed().unwrap();
3980 assert_eq!(collected.len(), 1);
3981 assert_eq!(collected[0].0, completed_id);
3982 }
3983
3984 #[test]
3985 fn capacity_slot_terminal_is_not_persisted_or_recovered() {
3986 let registry = RuntimeOpsLifecycleRegistry::new();
3987
3988 let mut spec = background_spec("capacity");
3989 spec.kind = OperationKind::BackgroundToolCapacitySlot;
3990 let operation_id = spec.id.clone();
3991 registry.register_operation(spec).unwrap();
3992 registry.provisioning_succeeded(&operation_id).unwrap();
3993 registry.mark_retired(&operation_id).unwrap();
3994
3995 assert!(registry.snapshot(&operation_id).unwrap().is_none());
3996
3997 let cursor_state = meerkat_core::EpochCursorState::new();
3998 let snapshot = registry
3999 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4000 .unwrap();
4001 assert!(
4002 !snapshot
4003 .authority_state
4004 .operations
4005 .contains_key(&operation_id)
4006 );
4007 assert!(!snapshot.operation_specs.contains_key(&operation_id));
4008 assert!(snapshot.completion_entries.is_empty());
4009
4010 let recovered = RuntimeOpsLifecycleRegistry::from_recovered(snapshot).unwrap();
4011 assert!(recovered.snapshot(&operation_id).unwrap().is_none());
4012 }
4013
4014 #[test]
4015 fn recovered_snapshot_uses_authority_operation_source() {
4016 let registry = RuntimeOpsLifecycleRegistry::new();
4017 let child_session_id = SessionId::new();
4018 let operation_source = OperationSource::session_child(child_session_id.clone());
4019 let spec = OperationSpec {
4020 id: OperationId::new(),
4021 kind: OperationKind::MobMemberChild,
4022 owner_session_id: SessionId::new(),
4023 display_name: "source-recovery".into(),
4024 source_label: "test".into(),
4025 operation_source: Some(operation_source.clone()),
4026 child_session_id: Some(child_session_id),
4027 expect_peer_channel: true,
4028 };
4029 let operation_id = spec.id.clone();
4030
4031 registry.register_operation(spec).unwrap();
4032 registry.provisioning_succeeded(&operation_id).unwrap();
4033 registry.mark_retired(&operation_id).unwrap();
4034
4035 let cursor_state = meerkat_core::EpochCursorState::new();
4036 let mut snapshot = registry
4037 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4038 .unwrap();
4039 assert_eq!(
4040 snapshot
4041 .authority_state
4042 .operations
4043 .get(&operation_id)
4044 .and_then(|state| state.operation_source.as_ref()),
4045 Some(&operation_source)
4046 );
4047
4048 snapshot
4049 .operation_specs
4050 .get_mut(&operation_id)
4051 .expect("persisted spec")
4052 .operation_source = None;
4053 let recovered = RuntimeOpsLifecycleRegistry::from_recovered(snapshot).unwrap();
4054 assert_eq!(
4055 recovered
4056 .snapshot(&operation_id)
4057 .unwrap()
4058 .unwrap()
4059 .operation_source,
4060 Some(operation_source)
4061 );
4062 }
4063
4064 #[test]
4065 fn recovered_snapshot_rejects_operation_source_mirror_drift() {
4066 let registry = RuntimeOpsLifecycleRegistry::new();
4067 let child_session_id = SessionId::new();
4068 let operation_source = OperationSource::session_child(child_session_id.clone());
4069 let spec = OperationSpec {
4070 id: OperationId::new(),
4071 kind: OperationKind::MobMemberChild,
4072 owner_session_id: SessionId::new(),
4073 display_name: "source-drift".into(),
4074 source_label: "test".into(),
4075 operation_source: Some(operation_source),
4076 child_session_id: Some(child_session_id),
4077 expect_peer_channel: true,
4078 };
4079 let operation_id = spec.id.clone();
4080
4081 registry.register_operation(spec).unwrap();
4082 registry.provisioning_succeeded(&operation_id).unwrap();
4083 registry.mark_retired(&operation_id).unwrap();
4084
4085 let cursor_state = meerkat_core::EpochCursorState::new();
4086 let mut snapshot = registry
4087 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4088 .unwrap();
4089 snapshot
4090 .operation_specs
4091 .get_mut(&operation_id)
4092 .expect("persisted spec")
4093 .operation_source = Some(OperationSource::session_child(SessionId::new()));
4094
4095 let err = RuntimeOpsLifecycleRegistry::from_recovered(snapshot)
4096 .expect_err("source mirror drift must fail recovery");
4097 assert!(
4098 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("operation source mirror")),
4099 "unexpected recovery error: {err:?}"
4100 );
4101 }
4102
4103 #[test]
4104 fn persisted_authority_state_serializes_explicit_no_operation_source() {
4105 let registry = RuntimeOpsLifecycleRegistry::new();
4106
4107 let spec = background_spec("explicit-no-source");
4108 let operation_id = spec.id.clone();
4109 registry.register_operation(spec).unwrap();
4110 registry.provisioning_succeeded(&operation_id).unwrap();
4111
4112 let cursor_state = meerkat_core::EpochCursorState::new();
4113 let snapshot = registry
4114 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4115 .unwrap();
4116 let value = serde_json::to_value(&snapshot).unwrap();
4117 let operations = value
4118 .get("authority_state")
4119 .and_then(|state| state.get("operations"))
4120 .and_then(serde_json::Value::as_object)
4121 .expect("serialized authority operations");
4122 let persisted_state = operations
4123 .values()
4124 .next()
4125 .and_then(serde_json::Value::as_object)
4126 .expect("serialized operation state");
4127
4128 assert!(
4129 persisted_state
4130 .get("operation_source")
4131 .is_some_and(serde_json::Value::is_null),
4132 "generated explicit no-source fact must be serialized as present null: {persisted_state:?}"
4133 );
4134
4135 let recovered_snapshot = serde_json::from_value::<PersistedOpsSnapshot>(value).unwrap();
4136 assert_eq!(
4137 recovered_snapshot
4138 .authority_state
4139 .operations
4140 .get(&operation_id)
4141 .expect("round-tripped operation")
4142 .operation_source,
4143 None
4144 );
4145 }
4146
4147 #[test]
4148 fn persisted_authority_state_rejects_missing_operation_source_fact() {
4149 let registry = RuntimeOpsLifecycleRegistry::new();
4150
4151 let spec = background_spec("missing-source-fact");
4152 registry.register_operation(spec).unwrap();
4153
4154 let cursor_state = meerkat_core::EpochCursorState::new();
4155 let snapshot = registry
4156 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4157 .unwrap();
4158 let mut value = serde_json::to_value(&snapshot).unwrap();
4159 let operations = value
4160 .get_mut("authority_state")
4161 .and_then(|state| state.get_mut("operations"))
4162 .and_then(serde_json::Value::as_object_mut)
4163 .expect("serialized authority operations");
4164 let operation_state = operations
4165 .values_mut()
4166 .next()
4167 .and_then(serde_json::Value::as_object_mut)
4168 .expect("serialized operation state");
4169 assert!(operation_state.remove("operation_source").is_some());
4170
4171 let err = serde_json::from_value::<PersistedOpsSnapshot>(value)
4172 .expect_err("missing generated source fact must fail recovery snapshot decoding");
4173 assert!(
4174 err.to_string().contains("operation_source"),
4175 "unexpected decode error: {err}"
4176 );
4177 }
4178
4179 #[test]
4180 fn persisted_authority_state_rejects_missing_completion_feed_authority() {
4181 let registry = RuntimeOpsLifecycleRegistry::new();
4182
4183 let spec = background_spec("missing-feed-authority");
4184 let operation_id = spec.id.clone();
4185 registry.register_operation(spec).unwrap();
4186 registry.provisioning_succeeded(&operation_id).unwrap();
4187 registry
4188 .complete_operation(
4189 &operation_id,
4190 OperationResult {
4191 id: operation_id.clone(),
4192 content: "done".into(),
4193 is_error: false,
4194 duration_ms: 1,
4195 tokens_used: 0,
4196 },
4197 )
4198 .unwrap();
4199
4200 let cursor_state = meerkat_core::EpochCursorState::new();
4201 let snapshot = registry
4202 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4203 .unwrap();
4204 let mut value = serde_json::to_value(&snapshot).unwrap();
4205 let authority_state = value
4206 .get_mut("authority_state")
4207 .and_then(serde_json::Value::as_object_mut)
4208 .expect("serialized authority state");
4209 assert!(authority_state.remove("completion_feed_entries").is_some());
4210
4211 let err = serde_json::from_value::<PersistedOpsSnapshot>(value)
4212 .expect_err("missing generated feed authority must fail recovery snapshot decoding");
4213 assert!(
4214 err.to_string().contains("completion_feed_entries"),
4215 "unexpected decode error: {err}"
4216 );
4217 }
4218
4219 #[test]
4220 fn public_child_session_projection_uses_authority_operation_source() {
4221 let registry = RuntimeOpsLifecycleRegistry::new();
4222 let authority_child_session_id = SessionId::new();
4223 let stale_shell_child_session_id = SessionId::new();
4224 let operation_source = OperationSource::session_child(authority_child_session_id.clone());
4225 let spec = OperationSpec {
4226 id: OperationId::new(),
4227 kind: OperationKind::MobMemberChild,
4228 owner_session_id: SessionId::new(),
4229 display_name: "child-projection".into(),
4230 source_label: "test".into(),
4231 operation_source: Some(operation_source),
4232 child_session_id: Some(stale_shell_child_session_id),
4233 expect_peer_channel: true,
4234 };
4235 let operation_id = spec.id.clone();
4236
4237 registry.register_operation(spec).unwrap();
4238
4239 assert_eq!(
4240 registry
4241 .snapshot(&operation_id)
4242 .unwrap()
4243 .unwrap()
4244 .child_session_id,
4245 Some(authority_child_session_id.clone())
4246 );
4247
4248 let cursor_state = meerkat_core::EpochCursorState::new();
4249 let snapshot = registry
4250 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4251 .unwrap();
4252 assert_eq!(
4253 snapshot
4254 .operation_specs
4255 .get(&operation_id)
4256 .expect("persisted spec")
4257 .child_session_id,
4258 Some(authority_child_session_id)
4259 );
4260 }
4261
4262 #[test]
4263 fn generated_terminal_payload_projection_fails_closed() {
4264 let registry = RuntimeOpsLifecycleRegistry::new();
4265
4266 let spec = background_spec("terminal-payload-drift");
4267 let operation_id = spec.id.clone();
4268 registry.register_operation(spec).unwrap();
4269 registry.provisioning_succeeded(&operation_id).unwrap();
4270 registry
4271 .complete_operation(
4272 &operation_id,
4273 OperationResult {
4274 id: operation_id.clone(),
4275 content: "done".into(),
4276 is_error: false,
4277 duration_ms: 1,
4278 tokens_used: 0,
4279 },
4280 )
4281 .unwrap();
4282
4283 {
4284 let mut state = registry.write_state().unwrap();
4285 let mut machine_state = state.dsl.0.state().clone();
4286 let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4287 machine_state
4288 .op_terminal_payload
4289 .insert(operation_id_key, OperationTerminalOutcome::Retired);
4290 state.dsl = DslAuthority(Box::new(
4291 mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4292 ));
4293 }
4294
4295 let err = match registry.register_watcher(&operation_id) {
4296 Ok(_) => panic!("invalid generated terminal payload must reject watcher projection"),
4297 Err(err) => err,
4298 };
4299 assert!(
4300 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("payload variant") && message.contains("does not match terminal outcome discriminant")),
4301 "unexpected watcher error: {err:?}"
4302 );
4303 let err = registry
4304 .snapshot(&operation_id)
4305 .expect_err("invalid generated terminal payload must reject public snapshot");
4306 assert!(
4307 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("does not match terminal outcome discriminant")),
4308 "unexpected public snapshot error: {err:?}"
4309 );
4310
4311 let cursor_state = meerkat_core::EpochCursorState::new();
4312 let err = match registry
4313 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4314 {
4315 Ok(_) => panic!("invalid generated terminal payload must reject persistence snapshot"),
4316 Err(err) => err,
4317 };
4318 assert!(
4319 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("payload variant") && message.contains("does not match terminal outcome discriminant")),
4320 "unexpected snapshot error: {err:?}"
4321 );
4322
4323 let err = match registry.collect_completed() {
4324 Ok(_) => panic!("invalid generated terminal payload must reject collection"),
4325 Err(err) => err,
4326 };
4327 assert!(
4328 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("payload variant") && message.contains("does not match terminal outcome discriminant")),
4329 "unexpected collection error: {err:?}"
4330 );
4331 }
4332
4333 #[test]
4334 fn generated_terminal_payload_missing_projection_fails_closed() {
4335 let registry = RuntimeOpsLifecycleRegistry::new();
4336
4337 let spec = background_spec("terminal-payload-missing");
4338 let operation_id = spec.id.clone();
4339 registry.register_operation(spec).unwrap();
4340 registry.provisioning_succeeded(&operation_id).unwrap();
4341 registry
4342 .fail_operation(&operation_id, "boom".into())
4343 .unwrap();
4344
4345 {
4346 let mut state = registry.write_state().unwrap();
4347 let mut machine_state = state.dsl.0.state().clone();
4348 let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4349 machine_state.op_terminal_payload.remove(&operation_id_key);
4350 state.dsl = DslAuthority(Box::new(
4351 mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4352 ));
4353 }
4354
4355 let err = match registry.register_watcher(&operation_id) {
4356 Ok(_) => panic!("missing generated terminal payload must reject watcher projection"),
4357 Err(err) => err,
4358 };
4359 assert!(
4360 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing terminal payload")),
4361 "unexpected watcher error: {err:?}"
4362 );
4363 let err = registry
4364 .snapshot(&operation_id)
4365 .expect_err("missing generated terminal payload must reject public snapshot");
4366 assert!(
4367 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing terminal payload")),
4368 "unexpected public snapshot error: {err:?}"
4369 );
4370 }
4371
4372 #[test]
4373 fn generated_terminal_status_without_outcome_fails_closed() {
4374 let registry = RuntimeOpsLifecycleRegistry::new();
4375
4376 let spec = background_spec("terminal-outcome-missing");
4377 let operation_id = spec.id.clone();
4378 registry.register_operation(spec).unwrap();
4379 registry.provisioning_succeeded(&operation_id).unwrap();
4380 registry
4381 .fail_operation(&operation_id, "boom".into())
4382 .unwrap();
4383
4384 {
4385 let mut state = registry.write_state().unwrap();
4386 let mut machine_state = state.dsl.0.state().clone();
4387 let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4388 machine_state.op_terminal_outcomes.remove(&operation_id_key);
4389 state.dsl = DslAuthority(Box::new(
4390 mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4391 ));
4392 }
4393
4394 let err = registry
4395 .snapshot(&operation_id)
4396 .expect_err("terminal status without outcome must reject public snapshot");
4397 assert!(
4398 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing terminal outcome")),
4399 "unexpected public snapshot error: {err:?}"
4400 );
4401
4402 let err = match registry.collect_completed() {
4403 Ok(_) => panic!("terminal status without outcome must reject collection"),
4404 Err(err) => err,
4405 };
4406 assert!(
4407 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing terminal outcome")),
4408 "unexpected collection error: {err:?}"
4409 );
4410 }
4411
4412 #[test]
4413 fn generated_operation_source_projection_fails_closed() {
4414 let registry = RuntimeOpsLifecycleRegistry::new();
4415 let child_session_id = SessionId::new();
4416 let operation_source = OperationSource::session_child(child_session_id.clone());
4417 let spec = OperationSpec {
4418 id: OperationId::new(),
4419 kind: OperationKind::MobMemberChild,
4420 owner_session_id: SessionId::new(),
4421 display_name: "source-authority-drift".into(),
4422 source_label: "test".into(),
4423 operation_source: Some(operation_source),
4424 child_session_id: Some(child_session_id),
4425 expect_peer_channel: true,
4426 };
4427 let operation_id = spec.id.clone();
4428
4429 registry.register_operation(spec).unwrap();
4430
4431 {
4432 let mut state = registry.write_state().unwrap();
4433 let mut machine_state = state.dsl.0.state().clone();
4434 let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4435 machine_state
4436 .op_sources
4437 .get_mut(&operation_id_key)
4438 .expect("generated operation source")
4439 .session_id = None;
4440 state.dsl = DslAuthority(Box::new(
4441 mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4442 ));
4443 }
4444
4445 let err = registry
4446 .snapshot(&operation_id)
4447 .expect_err("invalid generated operation source must reject public snapshot");
4448 assert!(
4449 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("generated operation source authority has invalid source")),
4450 "unexpected public snapshot error: {err:?}"
4451 );
4452 let err = registry
4453 .list_operations()
4454 .expect_err("invalid generated operation source must reject public operation list");
4455 assert!(
4456 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("generated operation source authority has invalid source")),
4457 "unexpected operation list error: {err:?}"
4458 );
4459
4460 let cursor_state = meerkat_core::EpochCursorState::new();
4461 let err = match registry
4462 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4463 {
4464 Ok(_) => panic!("invalid generated operation source must reject persistence snapshot"),
4465 Err(err) => err,
4466 };
4467 assert!(
4468 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("generated operation source authority has invalid source")),
4469 "unexpected snapshot error: {err:?}"
4470 );
4471 }
4472
4473 #[test]
4474 fn generated_operation_id_projection_fails_closed() {
4475 let registry = RuntimeOpsLifecycleRegistry::new();
4476
4477 {
4478 let mut state = registry.write_state().unwrap();
4479 let mut machine_state = state.dsl.0.state().clone();
4480 machine_state.op_statuses.insert(
4481 "not-json-operation-id".into(),
4482 mm_dsl::OperationStatus::Running,
4483 );
4484 state.dsl = DslAuthority(Box::new(
4485 mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4486 ));
4487 }
4488
4489 let err = registry
4490 .list_operations()
4491 .expect_err("invalid generated operation id must reject public operation list");
4492 assert!(
4493 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("invalid operation id key")),
4494 "unexpected operation list error: {err:?}"
4495 );
4496
4497 let cursor_state = meerkat_core::EpochCursorState::new();
4498 let err = match registry
4499 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4500 {
4501 Ok(_) => panic!("invalid generated operation id must reject persistence snapshot"),
4502 Err(err) => err,
4503 };
4504 assert!(
4505 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("invalid operation id key")),
4506 "unexpected persistence snapshot error: {err:?}"
4507 );
4508 }
4509
4510 #[test]
4511 fn generated_missing_kind_projection_fails_closed() {
4512 let registry = RuntimeOpsLifecycleRegistry::new();
4513 let spec = background_spec("missing-kind");
4514 let operation_id = spec.id.clone();
4515 registry.register_operation(spec).unwrap();
4516
4517 {
4518 let mut state = registry.write_state().unwrap();
4519 let mut machine_state = state.dsl.0.state().clone();
4520 let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4521 machine_state.op_kinds.remove(&operation_id_key);
4522 state.dsl = DslAuthority(Box::new(
4523 mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4524 ));
4525 }
4526
4527 let err = registry
4528 .snapshot(&operation_id)
4529 .expect_err("missing generated kind must reject public snapshot");
4530 assert!(
4531 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing kind")),
4532 "unexpected public snapshot error: {err:?}"
4533 );
4534 let err = registry
4535 .list_operations()
4536 .expect_err("missing generated kind must reject public list");
4537 assert!(
4538 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing kind")),
4539 "unexpected public list error: {err:?}"
4540 );
4541
4542 let cursor_state = meerkat_core::EpochCursorState::new();
4543 let err = registry
4544 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4545 .expect_err("missing generated kind must reject persistence snapshot");
4546 assert!(
4547 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing kind")),
4548 "unexpected persistence snapshot error: {err:?}"
4549 );
4550 }
4551
4552 #[test]
4553 fn generated_missing_status_projection_fails_closed() {
4554 let registry = RuntimeOpsLifecycleRegistry::new();
4555 let spec = background_spec("missing-status");
4556 let operation_id = spec.id.clone();
4557 registry.register_operation(spec).unwrap();
4558
4559 {
4560 let mut state = registry.write_state().unwrap();
4561 let mut machine_state = state.dsl.0.state().clone();
4562 let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4563 machine_state.op_statuses.remove(&operation_id_key);
4564 state.dsl = DslAuthority(Box::new(
4565 mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4566 ));
4567 }
4568
4569 let err = registry
4570 .snapshot(&operation_id)
4571 .expect_err("missing generated status must reject public snapshot");
4572 assert!(
4573 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing status")),
4574 "unexpected public snapshot error: {err:?}"
4575 );
4576 let err = registry
4577 .list_operations()
4578 .expect_err("missing generated status must reject public list");
4579 assert!(
4580 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing status")),
4581 "unexpected public list error: {err:?}"
4582 );
4583 let err = registry
4584 .classify_operation_public_result(&operation_id)
4585 .expect_err("missing generated status must reject public-result classification");
4586 assert!(
4587 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing status")),
4588 "unexpected public-result error: {err:?}"
4589 );
4590 }
4591
4592 #[test]
4593 fn generated_retiring_public_result_remains_running_until_terminal() {
4594 let registry = RuntimeOpsLifecycleRegistry::new();
4595 let spec = background_spec("retiring-public-result");
4596 let operation_id = spec.id.clone();
4597 registry.register_operation(spec).unwrap();
4598 registry.provisioning_succeeded(&operation_id).unwrap();
4599 registry.request_retire(&operation_id).unwrap();
4600
4601 let snapshot = registry.snapshot(&operation_id).unwrap().unwrap();
4602 assert_eq!(snapshot.status, OperationStatus::Retiring);
4603 assert!(snapshot.terminal_outcome.is_none());
4604 assert!(!snapshot.terminal);
4605 assert_eq!(
4606 snapshot.public_result_class,
4607 OperationPublicResultClass::Running
4608 );
4609 assert_eq!(
4610 registry
4611 .classify_operation_public_result(&operation_id)
4612 .unwrap(),
4613 OperationPublicResultClass::Running
4614 );
4615 }
4616
4617 #[test]
4618 fn generated_missing_peer_ready_projection_fails_closed() {
4619 let registry = RuntimeOpsLifecycleRegistry::new();
4620 let spec = background_spec("missing-peer-ready");
4621 let operation_id = spec.id.clone();
4622 registry.register_operation(spec).unwrap();
4623
4624 {
4625 let mut state = registry.write_state().unwrap();
4626 let mut machine_state = state.dsl.0.state().clone();
4627 let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4628 machine_state.op_peer_ready.remove(&operation_id_key);
4629 state.dsl = DslAuthority(Box::new(
4630 mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4631 ));
4632 }
4633
4634 let err = registry
4635 .snapshot(&operation_id)
4636 .expect_err("missing generated peer-ready fact must reject public snapshot");
4637 assert!(
4638 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing peer-ready")),
4639 "unexpected public snapshot error: {err:?}"
4640 );
4641
4642 let cursor_state = meerkat_core::EpochCursorState::new();
4643 let err = registry
4644 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4645 .expect_err("missing generated peer-ready fact must reject persistence snapshot");
4646 assert!(
4647 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing peer-ready")),
4648 "unexpected persistence snapshot error: {err:?}"
4649 );
4650 }
4651
4652 #[test]
4653 fn generated_missing_progress_count_projection_fails_closed() {
4654 let registry = RuntimeOpsLifecycleRegistry::new();
4655 let spec = background_spec("missing-progress-count");
4656 let operation_id = spec.id.clone();
4657 registry.register_operation(spec).unwrap();
4658
4659 {
4660 let mut state = registry.write_state().unwrap();
4661 let mut machine_state = state.dsl.0.state().clone();
4662 let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4663 machine_state.op_progress_counts.remove(&operation_id_key);
4664 state.dsl = DslAuthority(Box::new(
4665 mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4666 ));
4667 }
4668
4669 let err = registry
4670 .snapshot(&operation_id)
4671 .expect_err("missing generated progress count must reject public snapshot");
4672 assert!(
4673 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing progress count")),
4674 "unexpected public snapshot error: {err:?}"
4675 );
4676
4677 let cursor_state = meerkat_core::EpochCursorState::new();
4678 let err = registry
4679 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4680 .expect_err("missing generated progress count must reject persistence snapshot");
4681 assert!(
4682 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing progress count")),
4683 "unexpected persistence snapshot error: {err:?}"
4684 );
4685 }
4686
4687 #[test]
4688 fn generated_terminal_sequence_missing_persistence_fails_closed() {
4689 let registry = RuntimeOpsLifecycleRegistry::new();
4690 let spec = background_spec("terminal-sequence-missing");
4691 let operation_id = spec.id.clone();
4692 registry.register_operation(spec).unwrap();
4693 registry.provisioning_succeeded(&operation_id).unwrap();
4694 registry
4695 .complete_operation(
4696 &operation_id,
4697 OperationResult {
4698 id: operation_id.clone(),
4699 content: "done".into(),
4700 is_error: false,
4701 duration_ms: 1,
4702 tokens_used: 0,
4703 },
4704 )
4705 .unwrap();
4706
4707 {
4708 let mut state = registry.write_state().unwrap();
4709 let mut machine_state = state.dsl.0.state().clone();
4710 let operation_id_key = mm_dsl::OperationId::from_domain(&operation_id).0;
4711 machine_state.op_completion_seq.remove(&operation_id_key);
4712 state.dsl = DslAuthority(Box::new(
4713 mm_dsl::MeerkatMachineAuthority::recover_from_state(machine_state).unwrap(),
4714 ));
4715 }
4716
4717 let cursor_state = meerkat_core::EpochCursorState::new();
4718 let err = registry
4719 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4720 .expect_err("missing generated terminal sequence must reject persistence snapshot");
4721 assert!(
4722 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("missing completion sequence")),
4723 "unexpected persistence snapshot error: {err:?}"
4724 );
4725 }
4726
4727 #[test]
4728 fn generated_record_without_shell_projection_fails_closed() {
4729 let registry = RuntimeOpsLifecycleRegistry::new();
4730 let spec = background_spec("missing-shell-record");
4731 let operation_id = spec.id.clone();
4732 registry.register_operation(spec).unwrap();
4733
4734 {
4735 let mut state = registry.write_state().unwrap();
4736 state.records.remove(&operation_id);
4737 }
4738
4739 let err = registry
4740 .snapshot(&operation_id)
4741 .expect_err("generated operation without shell record must reject public snapshot");
4742 assert!(
4743 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("without shell projection record")),
4744 "unexpected public snapshot error: {err:?}"
4745 );
4746 let err = registry
4747 .list_operations()
4748 .expect_err("generated operation without shell record must reject public list");
4749 assert!(
4750 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("without shell projection record")),
4751 "unexpected public list error: {err:?}"
4752 );
4753 }
4754
4755 #[test]
4756 fn recovered_snapshot_rebuilds_child_session_mirror_from_authority() {
4757 let registry = RuntimeOpsLifecycleRegistry::new();
4758 let child_session_id = SessionId::new();
4759 let operation_source = OperationSource::session_child(child_session_id.clone());
4760 let spec = OperationSpec {
4761 id: OperationId::new(),
4762 kind: OperationKind::MobMemberChild,
4763 owner_session_id: SessionId::new(),
4764 display_name: "child-drift".into(),
4765 source_label: "test".into(),
4766 operation_source: Some(operation_source),
4767 child_session_id: Some(child_session_id.clone()),
4768 expect_peer_channel: true,
4769 };
4770 let operation_id = spec.id.clone();
4771
4772 registry.register_operation(spec).unwrap();
4773 registry.provisioning_succeeded(&operation_id).unwrap();
4774 registry.mark_retired(&operation_id).unwrap();
4775
4776 let cursor_state = meerkat_core::EpochCursorState::new();
4777 let mut snapshot = registry
4778 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4779 .unwrap();
4780 snapshot
4781 .operation_specs
4782 .get_mut(&operation_id)
4783 .expect("persisted spec")
4784 .child_session_id = Some(SessionId::new());
4785
4786 let recovered = RuntimeOpsLifecycleRegistry::from_recovered(snapshot).unwrap();
4787 assert_eq!(
4788 recovered
4789 .snapshot(&operation_id)
4790 .unwrap()
4791 .unwrap()
4792 .child_session_id,
4793 Some(child_session_id)
4794 );
4795 }
4796
4797 #[test]
4798 fn completion_wake_class_is_generated_by_operation_kind() {
4799 let registry = RuntimeOpsLifecycleRegistry::new();
4800 let operation_id = OperationId::new();
4801
4802 assert_eq!(
4803 registry
4804 .classify_operation_completion_wake(&operation_id, OperationKind::BackgroundToolOp)
4805 .unwrap(),
4806 OperationCompletionWakeClass::Wake
4807 );
4808 assert_eq!(
4809 registry
4810 .classify_operation_completion_wake(&operation_id, OperationKind::MobMemberChild)
4811 .unwrap(),
4812 OperationCompletionWakeClass::Ignore
4813 );
4814 assert_eq!(
4815 registry
4816 .classify_operation_completion_wake(
4817 &operation_id,
4818 OperationKind::BackgroundToolCapacitySlot,
4819 )
4820 .unwrap(),
4821 OperationCompletionWakeClass::Ignore
4822 );
4823 }
4824
4825 #[test]
4826 fn recovered_snapshot_rejects_completion_feed_without_generated_record() {
4827 let registry = RuntimeOpsLifecycleRegistry::new();
4828
4829 let running_spec = background_spec("running");
4830 let running_id = running_spec.id.clone();
4831 registry.register_operation(running_spec.clone()).unwrap();
4832 registry.provisioning_succeeded(&running_id).unwrap();
4833
4834 let cursor_state = meerkat_core::EpochCursorState::new();
4835 let mut snapshot = registry
4836 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4837 .unwrap();
4838 snapshot.completion_entries.push(CompletionEntry {
4839 seq: 1,
4840 operation_id: running_id.clone(),
4841 kind: running_spec.kind,
4842 display_name: running_spec.display_name,
4843 terminal_outcome: OperationTerminalOutcome::Completed(OperationResult {
4844 id: running_id,
4845 content: "phantom".into(),
4846 is_error: false,
4847 duration_ms: 1,
4848 tokens_used: 0,
4849 }),
4850 completed_at_ms: None,
4851 });
4852
4853 let err = match RuntimeOpsLifecycleRegistry::from_recovered(snapshot) {
4854 Ok(_) => panic!("public completion feed must not recover without generated op truth"),
4855 Err(err) => err,
4856 };
4857 assert!(
4858 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("no generated feed authority")),
4859 "unexpected recovery error: {err:?}"
4860 );
4861 }
4862
4863 #[test]
4864 fn recovered_snapshot_rejects_feed_authority_beyond_completion_cursor() {
4865 let registry = RuntimeOpsLifecycleRegistry::new();
4866
4867 let spec = background_spec("terminal");
4868 let operation_id = spec.id.clone();
4869 registry.register_operation(spec).unwrap();
4870 registry.provisioning_succeeded(&operation_id).unwrap();
4871 registry
4872 .complete_operation(
4873 &operation_id,
4874 OperationResult {
4875 id: operation_id.clone(),
4876 content: "done".into(),
4877 is_error: false,
4878 duration_ms: 1,
4879 tokens_used: 0,
4880 },
4881 )
4882 .unwrap();
4883
4884 let cursor_state = meerkat_core::EpochCursorState::new();
4885 let mut snapshot = registry
4886 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4887 .unwrap();
4888 let phantom_id = OperationId::new();
4889 let phantom_result = OperationResult {
4890 id: phantom_id.clone(),
4891 content: "phantom".into(),
4892 is_error: false,
4893 duration_ms: 1,
4894 tokens_used: 0,
4895 };
4896 let phantom_entry = CompletionFeedCanonicalState {
4897 seq: snapshot.authority_state.next_completion_seq + 1,
4898 kind: OperationKind::BackgroundToolOp,
4899 terminal_outcome: OperationTerminalOutcome::Completed(phantom_result.clone()),
4900 };
4901 snapshot
4902 .authority_state
4903 .completion_feed_entries
4904 .insert(phantom_id.clone(), phantom_entry);
4905 snapshot.completion_entries.push(CompletionEntry {
4906 seq: snapshot.authority_state.next_completion_seq + 1,
4907 operation_id: phantom_id.clone(),
4908 kind: OperationKind::BackgroundToolOp,
4909 display_name: "phantom".into(),
4910 terminal_outcome: OperationTerminalOutcome::Completed(phantom_result),
4911 completed_at_ms: None,
4912 });
4913
4914 let err = match RuntimeOpsLifecycleRegistry::from_recovered(snapshot) {
4915 Ok(_) => panic!("feed authority must not advance the recovered completion cursor"),
4916 Err(err) => err,
4917 };
4918 assert!(
4919 matches!(&err, OpsLifecycleError::Internal(message) if message.contains("RecoverCompletionFeedEntry")),
4920 "unexpected recovery error: {err:?}"
4921 );
4922 }
4923
4924 #[test]
4925 fn recovered_completed_order_uses_generated_completion_sequences() {
4926 let registry = RuntimeOpsLifecycleRegistry::new();
4927
4928 let spec_a = background_spec("a");
4929 let id_a = spec_a.id.clone();
4930 registry.register_operation(spec_a).unwrap();
4931 registry.provisioning_succeeded(&id_a).unwrap();
4932 registry
4933 .complete_operation(
4934 &id_a,
4935 OperationResult {
4936 id: id_a.clone(),
4937 content: "a".into(),
4938 is_error: false,
4939 duration_ms: 1,
4940 tokens_used: 0,
4941 },
4942 )
4943 .unwrap();
4944
4945 let spec_b = background_spec("b");
4946 let id_b = spec_b.id.clone();
4947 registry.register_operation(spec_b).unwrap();
4948 registry.provisioning_succeeded(&id_b).unwrap();
4949 registry
4950 .complete_operation(
4951 &id_b,
4952 OperationResult {
4953 id: id_b.clone(),
4954 content: "b".into(),
4955 is_error: false,
4956 duration_ms: 1,
4957 tokens_used: 0,
4958 },
4959 )
4960 .unwrap();
4961
4962 let cursor_state = meerkat_core::EpochCursorState::new();
4963 let mut snapshot = registry
4964 .capture_persistence_snapshot(meerkat_core::RuntimeEpochId::new(), &cursor_state)
4965 .unwrap();
4966 snapshot.authority_state.completed_order = VecDeque::from([id_b.clone(), id_a.clone()]);
4967
4968 let recovered = RuntimeOpsLifecycleRegistry::from_recovered(snapshot).unwrap();
4969 let collected = recovered.collect_completed().unwrap();
4970
4971 assert_eq!(collected[0].0, id_a);
4972 assert_eq!(collected[1].0, id_b);
4973 }
4974
4975 #[test]
4976 fn max_concurrent_enforcement() {
4977 let registry = RuntimeOpsLifecycleRegistry::with_config(OpsLifecycleConfig {
4978 max_completed: DEFAULT_MAX_COMPLETED,
4979 max_concurrent: Some(2),
4980 });
4981
4982 let spec_a = background_spec("a");
4983 let id_a = spec_a.id.clone();
4984 registry.register_operation(spec_a).unwrap();
4985
4986 let spec_b = background_spec("b");
4987 registry.register_operation(spec_b).unwrap();
4988
4989 let spec_c = background_spec("c");
4990 let result = registry.register_operation(spec_c);
4991 assert!(matches!(
4992 result,
4993 Err(OpsLifecycleError::MaxConcurrentExceeded {
4994 limit: 2,
4995 active: 2,
4996 })
4997 ));
4998
4999 registry.provisioning_succeeded(&id_a).unwrap();
5000 registry
5001 .complete_operation(
5002 &id_a,
5003 OperationResult {
5004 id: id_a.clone(),
5005 content: "done".into(),
5006 is_error: false,
5007 duration_ms: 1,
5008 tokens_used: 0,
5009 },
5010 )
5011 .unwrap();
5012
5013 let spec_d = background_spec("d");
5014 assert!(registry.register_operation(spec_d).is_ok());
5015 }
5016
5017 #[test]
5018 fn snapshot_includes_timestamps() {
5019 let registry = RuntimeOpsLifecycleRegistry::new();
5020 let spec = background_spec("timed");
5021 let op_id = spec.id.clone();
5022 registry.register_operation(spec).unwrap();
5023
5024 let snap1 = registry.snapshot(&op_id).unwrap().unwrap();
5025 assert!(snap1.created_at_ms > 0);
5026 assert!(snap1.started_at_ms.is_none());
5027 assert!(snap1.completed_at_ms.is_none());
5028 assert!(snap1.elapsed_ms.is_none());
5029
5030 registry.provisioning_succeeded(&op_id).unwrap();
5031 let snap2 = registry.snapshot(&op_id).unwrap().unwrap();
5032 assert!(snap2.started_at_ms.is_some());
5033 assert!(snap2.started_at_ms.unwrap() >= snap2.created_at_ms);
5034
5035 registry
5036 .complete_operation(
5037 &op_id,
5038 OperationResult {
5039 id: op_id.clone(),
5040 content: "done".into(),
5041 is_error: false,
5042 duration_ms: 1,
5043 tokens_used: 0,
5044 },
5045 )
5046 .unwrap();
5047 let snap3 = registry.snapshot(&op_id).unwrap().unwrap();
5048 assert!(snap3.completed_at_ms.is_some());
5049 assert!(snap3.elapsed_ms.is_some());
5050 assert!(snap3.completed_at_ms.unwrap() >= snap3.started_at_ms.unwrap());
5051 }
5052
5053 #[test]
5054 fn snapshot_includes_peer_handle() {
5055 let registry = RuntimeOpsLifecycleRegistry::new();
5056 let child_session_id = SessionId::new();
5057 let spec = OperationSpec {
5058 id: OperationId::new(),
5059 kind: OperationKind::MobMemberChild,
5060 owner_session_id: SessionId::new(),
5061 display_name: "peer-test".into(),
5062 source_label: "test".into(),
5063 operation_source: Some(OperationSource::session_child(child_session_id.clone())),
5064 child_session_id: Some(child_session_id),
5065 expect_peer_channel: true,
5066 };
5067 let op_id = spec.id.clone();
5068 registry.register_operation(spec).unwrap();
5069 registry.provisioning_succeeded(&op_id).unwrap();
5070
5071 let snap1 = registry.snapshot(&op_id).unwrap().unwrap();
5072 assert!(snap1.peer_handle.is_none());
5073
5074 let handle = OperationPeerHandle {
5075 peer_name: meerkat_core::comms::PeerName::new("member-x").unwrap(),
5076 trusted_peer: TrustedPeerDescriptor::test_only_unsigned_typed(
5077 "member-x",
5078 PeerId::new(),
5079 "inproc://x",
5080 )
5081 .unwrap(),
5082 };
5083 registry.peer_ready(&op_id, handle).unwrap();
5084
5085 let snap2 = registry.snapshot(&op_id).unwrap().unwrap();
5086 assert_eq!(
5087 snap2.peer_handle.as_ref().unwrap().peer_name.as_str(),
5088 "member-x"
5089 );
5090 }
5091}