1use std::collections::HashMap;
9use std::future::Future;
10use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
11use std::task::{Context, Poll};
12
13#[cfg(target_arch = "wasm32")]
14use crate::tokio;
15use meerkat_core::lifecycle::{RunId, WaitRequestId};
16use meerkat_core::ops_lifecycle::{
17 DEFAULT_MAX_COMPLETED, OperationCompletionWatch, OperationId, OperationLifecycleSnapshot,
18 OperationPeerHandle, OperationProgressUpdate, OperationResult, OperationSpec,
19 OperationTerminalOutcome, OpsLifecycleError, OpsLifecycleRegistry, WaitAllResult,
20 WaitAllSatisfied,
21};
22use meerkat_core::time_compat::{Instant, SystemTime, UNIX_EPOCH};
23
24use crate::ops_lifecycle_authority::{
25 OpsLifecycleAuthority, OpsLifecycleEffect, OpsLifecycleInput, OpsLifecycleMutator,
26};
27
28#[derive(Debug)]
36struct ShellRecord {
37 spec: OperationSpec,
38 peer_handle: Option<OperationPeerHandle>,
39 watchers: Vec<tokio::sync::oneshot::Sender<OperationTerminalOutcome>>,
40 created_at: Instant,
42 started_at: Option<Instant>,
43 completed_at: Option<Instant>,
44 created_at_wall: SystemTime,
46}
47
48#[derive(Debug)]
49struct PendingWaitState {
50 wait_request_id: WaitRequestId,
51 sender: tokio::sync::oneshot::Sender<WaitAllSatisfied>,
52}
53
54impl ShellRecord {
55 fn new(spec: OperationSpec) -> Self {
56 Self {
57 spec,
58 peer_handle: None,
59 watchers: Vec::new(),
60 created_at: Instant::now(),
61 started_at: None,
62 completed_at: None,
63 created_at_wall: SystemTime::now(),
64 }
65 }
66
67 fn epoch_millis(wall_anchor: &SystemTime) -> u64 {
68 wall_anchor
69 .duration_since(UNIX_EPOCH)
70 .map(|d| d.as_millis() as u64)
71 .unwrap_or(0)
72 }
73
74 fn epoch_millis_for_instant(&self, instant: Instant) -> u64 {
75 let offset = instant.saturating_duration_since(self.created_at);
78 let wall = self.created_at_wall + offset;
79 Self::epoch_millis(&wall)
80 }
81
82 fn notify_watchers(&mut self, outcome: &OperationTerminalOutcome) {
84 for watcher in std::mem::take(&mut self.watchers) {
85 let _ = watcher.send(outcome.clone());
86 }
87 }
88
89 fn mark_completed(&mut self) {
91 self.completed_at = Some(Instant::now());
92 }
93}
94
95#[derive(Debug)]
100struct ShellState {
101 authority: OpsLifecycleAuthority,
102 records: HashMap<OperationId, ShellRecord>,
103 pending_wait: Option<PendingWaitState>,
104}
105
106impl ShellState {
107 fn new(max_completed: usize, max_concurrent: Option<usize>) -> Self {
108 Self {
109 authority: OpsLifecycleAuthority::new(max_completed, max_concurrent),
110 records: HashMap::new(),
111 pending_wait: None,
112 }
113 }
114
115 fn snapshot(&self, id: &OperationId) -> Option<OperationLifecycleSnapshot> {
117 let canonical = self.authority.operation(id)?;
118 let shell = self.records.get(id)?;
119
120 let created_at_ms = ShellRecord::epoch_millis(&shell.created_at_wall);
121 let started_at_ms = shell.started_at.map(|i| shell.epoch_millis_for_instant(i));
122 let completed_at_ms = shell
123 .completed_at
124 .map(|i| shell.epoch_millis_for_instant(i));
125 let elapsed_ms = shell.completed_at.map(|completed| {
126 completed
127 .saturating_duration_since(shell.created_at)
128 .as_millis() as u64
129 });
130
131 Some(OperationLifecycleSnapshot {
132 id: shell.spec.id.clone(),
133 kind: canonical.kind(),
134 display_name: shell.spec.display_name.clone(),
135 status: canonical.status(),
136 peer_ready: canonical.peer_ready(),
137 progress_count: canonical.progress_count(),
138 watcher_count: shell.watchers.len() as u32,
139 terminal_outcome: canonical.terminal_outcome().cloned(),
140 child_session_id: shell.spec.child_session_id.clone(),
141 peer_handle: shell.peer_handle.clone(),
142 created_at_ms,
143 started_at_ms,
144 completed_at_ms,
145 elapsed_ms,
146 })
147 }
148
149 fn execute_effects(&mut self, effects: &[OpsLifecycleEffect]) {
156 for effect in effects {
157 match effect {
158 OpsLifecycleEffect::NotifyOpWatcher { operation_id, .. } => {
159 let outcome = self
161 .authority
162 .operation(operation_id)
163 .and_then(|op| op.terminal_outcome().cloned());
164 if let Some(outcome) = outcome
165 && let Some(shell) = self.records.get_mut(operation_id)
166 {
167 let watcher_count = shell.watchers.len() as u32;
168 shell.notify_watchers(&outcome);
169 shell.mark_completed();
170 self.authority.watchers_drained(operation_id, watcher_count);
171 }
172 }
173 OpsLifecycleEffect::ExposeOperationPeer { .. } => {
174 }
177 OpsLifecycleEffect::RetainTerminalRecord { .. } => {
178 }
181 OpsLifecycleEffect::EvictCompletedRecord { operation_id } => {
182 self.records.remove(operation_id);
183 self.authority.remove_operation(operation_id);
184 }
185 OpsLifecycleEffect::SubmitOpEvent { .. } => {
186 }
188 OpsLifecycleEffect::WaitAllSatisfied {
189 wait_request_id,
190 operation_ids,
191 } => {
192 if let Some(pending_wait) = self.pending_wait.take() {
193 if pending_wait.wait_request_id == *wait_request_id {
194 let _ = pending_wait.sender.send(WaitAllSatisfied {
195 wait_request_id: wait_request_id.clone(),
196 operation_ids: operation_ids.clone(),
197 });
198 } else {
199 self.pending_wait = Some(pending_wait);
200 }
201 }
202 }
203 }
204 }
205 }
206
207 fn shell_record_mut(
208 &mut self,
209 id: &OperationId,
210 ) -> Result<&mut ShellRecord, OpsLifecycleError> {
211 self.records
212 .get_mut(id)
213 .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))
214 }
215
216 fn collect_wait_outcomes(
217 &self,
218 operation_ids: &[OperationId],
219 ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
220 operation_ids
221 .iter()
222 .map(|operation_id| {
223 let outcome = self
224 .authority
225 .operation(operation_id)
226 .and_then(|op| op.terminal_outcome().cloned())
227 .ok_or_else(|| {
228 OpsLifecycleError::Internal(format!(
229 "wait_all completed without terminal outcome for {operation_id}"
230 ))
231 })?;
232 Ok((operation_id.clone(), outcome))
233 })
234 .collect()
235 }
236}
237
238impl Default for ShellState {
239 fn default() -> Self {
240 Self::new(DEFAULT_MAX_COMPLETED, None)
241 }
242}
243
244#[derive(Debug, Clone)]
250pub struct OpsLifecycleConfig {
251 pub max_completed: usize,
253 pub max_concurrent: Option<usize>,
255}
256
257impl Default for OpsLifecycleConfig {
258 fn default() -> Self {
259 Self {
260 max_completed: DEFAULT_MAX_COMPLETED,
261 max_concurrent: None,
262 }
263 }
264}
265
266#[derive(Debug)]
272pub struct RuntimeOpsLifecycleRegistry {
273 state: RwLock<ShellState>,
274}
275
276impl Default for RuntimeOpsLifecycleRegistry {
277 fn default() -> Self {
278 Self {
279 state: RwLock::new(ShellState::default()),
280 }
281 }
282}
283
284impl RuntimeOpsLifecycleRegistry {
285 pub fn new() -> Self {
286 Self::default()
287 }
288
289 pub fn with_config(config: OpsLifecycleConfig) -> Self {
290 Self {
291 state: RwLock::new(ShellState::new(config.max_completed, config.max_concurrent)),
292 }
293 }
294
295 fn read_state(&self) -> Result<RwLockReadGuard<'_, ShellState>, OpsLifecycleError> {
296 self.state
297 .read()
298 .map_err(|_| OpsLifecycleError::Internal("ops lifecycle registry poisoned".into()))
299 }
300
301 fn write_state(&self) -> Result<RwLockWriteGuard<'_, ShellState>, OpsLifecycleError> {
302 self.state
303 .write()
304 .map_err(|_| OpsLifecycleError::Internal("ops lifecycle registry poisoned".into()))
305 }
306
307 fn cancel_wait_all_internal(
308 &self,
309 wait_request_id: &WaitRequestId,
310 ) -> Result<(), OpsLifecycleError> {
311 let mut state = self.write_state()?;
312 match state.authority.apply(OpsLifecycleInput::CancelWaitAll {
313 wait_request_id: wait_request_id.clone(),
314 }) {
315 Ok(_) => {
316 state.pending_wait = None;
317 Ok(())
318 }
319 Err(OpsLifecycleError::WaitNotActive(_)) => {
320 state.pending_wait = None;
321 Ok(())
322 }
323 Err(err) => Err(err),
324 }
325 }
326}
327
328enum WaitAllFutureState {
329 Ready(Option<Result<WaitAllResult, OpsLifecycleError>>),
330 Waiting(tokio::sync::oneshot::Receiver<WaitAllSatisfied>),
331 Done,
332}
333
334struct WaitAllFuture<'a> {
335 registry: &'a RuntimeOpsLifecycleRegistry,
336 wait_request_id: WaitRequestId,
337 operation_ids: Vec<OperationId>,
338 state: WaitAllFutureState,
339}
340
341impl Future for WaitAllFuture<'_> {
342 type Output = Result<WaitAllResult, OpsLifecycleError>;
343
344 fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
345 match &mut self.state {
346 WaitAllFutureState::Ready(result) => {
347 let ready = result.take().unwrap_or_else(|| {
348 Err(OpsLifecycleError::Internal(
349 "wait_all future polled after completion".into(),
350 ))
351 });
352 self.state = WaitAllFutureState::Done;
353 Poll::Ready(ready)
354 }
355 WaitAllFutureState::Waiting(receiver) => match std::pin::Pin::new(receiver).poll(cx) {
356 Poll::Pending => Poll::Pending,
357 Poll::Ready(Ok(satisfied)) => {
358 let outcomes = match self.registry.read_state() {
359 Ok(state) => state.collect_wait_outcomes(&self.operation_ids),
360 Err(err) => Err(err),
361 };
362 self.state = WaitAllFutureState::Done;
363 Poll::Ready(outcomes.map(|outcomes| WaitAllResult {
364 outcomes,
365 satisfied,
366 }))
367 }
368 Poll::Ready(Err(_)) => {
369 self.state = WaitAllFutureState::Done;
370 Poll::Ready(Err(OpsLifecycleError::Internal(
371 "wait_all completion channel dropped".into(),
372 )))
373 }
374 },
375 WaitAllFutureState::Done => Poll::Ready(Err(OpsLifecycleError::Internal(
376 "wait_all future polled after completion".into(),
377 ))),
378 }
379 }
380}
381
382impl Drop for WaitAllFuture<'_> {
383 fn drop(&mut self) {
384 if matches!(self.state, WaitAllFutureState::Waiting(_)) {
385 let _ = self
386 .registry
387 .cancel_wait_all_internal(&self.wait_request_id);
388 }
389 }
390}
391
392impl OpsLifecycleRegistry for RuntimeOpsLifecycleRegistry {
393 fn register_operation(&self, spec: OperationSpec) -> Result<(), OpsLifecycleError> {
394 let mut state = self.write_state()?;
395 let operation_id = spec.id.clone();
396 let kind = spec.kind;
397
398 let transition = state
400 .authority
401 .apply(OpsLifecycleInput::RegisterOperation {
402 operation_id: operation_id.clone(),
403 kind,
404 })?;
405
406 state.records.insert(operation_id, ShellRecord::new(spec));
408
409 state.execute_effects(&transition.effects);
411 Ok(())
412 }
413
414 fn provisioning_succeeded(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
415 let mut state = self.write_state()?;
416
417 let transition = state
418 .authority
419 .apply(OpsLifecycleInput::ProvisioningSucceeded {
420 operation_id: id.clone(),
421 })?;
422
423 if let Some(shell) = state.records.get_mut(id) {
425 shell.started_at = Some(Instant::now());
426 }
427
428 state.execute_effects(&transition.effects);
429 Ok(())
430 }
431
432 fn provisioning_failed(
433 &self,
434 id: &OperationId,
435 error: String,
436 ) -> Result<(), OpsLifecycleError> {
437 let mut state = self.write_state()?;
438
439 let transition = state
440 .authority
441 .apply(OpsLifecycleInput::ProvisioningFailed {
442 operation_id: id.clone(),
443 })?;
444
445 state
447 .authority
448 .patch_terminal_outcome(id, OperationTerminalOutcome::Failed { error });
449
450 state.execute_effects(&transition.effects);
451 Ok(())
452 }
453
454 fn peer_ready(
455 &self,
456 id: &OperationId,
457 peer: OperationPeerHandle,
458 ) -> Result<(), OpsLifecycleError> {
459 let mut state = self.write_state()?;
460
461 let transition = state.authority.apply(OpsLifecycleInput::PeerReady {
462 operation_id: id.clone(),
463 })?;
464
465 if let Some(shell) = state.records.get_mut(id) {
467 shell.peer_handle = Some(peer);
468 }
469
470 state.execute_effects(&transition.effects);
471 Ok(())
472 }
473
474 fn register_watcher(
475 &self,
476 id: &OperationId,
477 ) -> Result<OperationCompletionWatch, OpsLifecycleError> {
478 let mut state = self.write_state()?;
479
480 let canonical = state
482 .authority
483 .operation(id)
484 .ok_or_else(|| OpsLifecycleError::NotFound(id.clone()))?;
485
486 if let Some(outcome) = canonical.terminal_outcome() {
487 return Ok(OperationCompletionWatch::already_resolved(outcome.clone()));
488 }
489
490 let _transition = state.authority.apply(OpsLifecycleInput::RegisterWatcher {
492 operation_id: id.clone(),
493 })?;
494
495 let shell = state.shell_record_mut(id)?;
497 let (tx, watch) = OperationCompletionWatch::channel();
498 shell.watchers.push(tx);
499 Ok(watch)
500 }
501
502 fn report_progress(
503 &self,
504 id: &OperationId,
505 _update: OperationProgressUpdate,
506 ) -> Result<(), OpsLifecycleError> {
507 let mut state = self.write_state()?;
508
509 let transition = state.authority.apply(OpsLifecycleInput::ProgressReported {
510 operation_id: id.clone(),
511 })?;
512
513 state.execute_effects(&transition.effects);
514 Ok(())
515 }
516
517 fn complete_operation(
518 &self,
519 id: &OperationId,
520 result: OperationResult,
521 ) -> Result<(), OpsLifecycleError> {
522 let mut state = self.write_state()?;
523
524 let transition = state
525 .authority
526 .apply(OpsLifecycleInput::CompleteOperation {
527 operation_id: id.clone(),
528 })?;
529
530 state
532 .authority
533 .patch_terminal_outcome(id, OperationTerminalOutcome::Completed(result));
534
535 state.execute_effects(&transition.effects);
536 Ok(())
537 }
538
539 fn fail_operation(&self, id: &OperationId, error: String) -> Result<(), OpsLifecycleError> {
540 let mut state = self.write_state()?;
541
542 let transition = state.authority.apply(OpsLifecycleInput::FailOperation {
543 operation_id: id.clone(),
544 })?;
545
546 state
548 .authority
549 .patch_terminal_outcome(id, OperationTerminalOutcome::Failed { error });
550
551 state.execute_effects(&transition.effects);
552 Ok(())
553 }
554
555 fn abort_provisioning(
556 &self,
557 id: &OperationId,
558 reason: Option<String>,
559 ) -> Result<(), OpsLifecycleError> {
560 let mut state = self.write_state()?;
561
562 let transition = state
563 .authority
564 .apply(OpsLifecycleInput::AbortProvisioning {
565 operation_id: id.clone(),
566 })?;
567
568 state
569 .authority
570 .patch_terminal_outcome(id, OperationTerminalOutcome::Aborted { reason });
571
572 state.execute_effects(&transition.effects);
573 Ok(())
574 }
575
576 fn cancel_operation(
577 &self,
578 id: &OperationId,
579 reason: Option<String>,
580 ) -> Result<(), OpsLifecycleError> {
581 let mut state = self.write_state()?;
582
583 let transition = state.authority.apply(OpsLifecycleInput::CancelOperation {
584 operation_id: id.clone(),
585 })?;
586
587 state
589 .authority
590 .patch_terminal_outcome(id, OperationTerminalOutcome::Cancelled { reason });
591
592 state.execute_effects(&transition.effects);
593 Ok(())
594 }
595
596 fn request_retire(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
597 let mut state = self.write_state()?;
598
599 let transition = state.authority.apply(OpsLifecycleInput::RetireRequested {
600 operation_id: id.clone(),
601 })?;
602
603 state.execute_effects(&transition.effects);
604 Ok(())
605 }
606
607 fn mark_retired(&self, id: &OperationId) -> Result<(), OpsLifecycleError> {
608 let mut state = self.write_state()?;
609
610 let transition = state.authority.apply(OpsLifecycleInput::RetireCompleted {
611 operation_id: id.clone(),
612 })?;
613
614 state
616 .authority
617 .patch_terminal_outcome(id, OperationTerminalOutcome::Retired);
618
619 state.execute_effects(&transition.effects);
620 Ok(())
621 }
622
623 fn snapshot(&self, id: &OperationId) -> Option<OperationLifecycleSnapshot> {
624 self.read_state().ok().and_then(|state| state.snapshot(id))
625 }
626
627 fn list_operations(&self) -> Vec<OperationLifecycleSnapshot> {
628 let mut snapshots = self
629 .read_state()
630 .map(|state| {
631 state
632 .authority
633 .operations()
634 .filter_map(|(id, _)| state.snapshot(id))
635 .collect::<Vec<_>>()
636 })
637 .unwrap_or_default();
638 snapshots.sort_by(|left, right| left.display_name.cmp(&right.display_name));
639 snapshots
640 }
641
642 fn terminate_owner(&self, reason: String) -> Result<(), OpsLifecycleError> {
643 let mut state = self.write_state()?;
644
645 let transition = state.authority.apply(OpsLifecycleInput::OwnerTerminated)?;
646
647 for effect in &transition.effects {
651 if let OpsLifecycleEffect::NotifyOpWatcher { operation_id, .. } = effect {
652 state.authority.patch_terminal_outcome(
653 operation_id,
654 OperationTerminalOutcome::Terminated {
655 reason: reason.clone(),
656 },
657 );
658 }
659 }
660
661 state.execute_effects(&transition.effects);
662 Ok(())
663 }
664
665 fn collect_completed(
666 &self,
667 ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
668 let mut state = self.write_state()?;
669
670 let collected = state.authority.drain_completed();
671
672 for (id, _) in &collected {
674 state.records.remove(id);
675 }
676
677 Ok(collected)
678 }
679
680 fn wait_all(
681 &self,
682 _run_id: &RunId,
683 ids: &[OperationId],
684 ) -> std::pin::Pin<
685 Box<dyn std::future::Future<Output = Result<WaitAllResult, OpsLifecycleError>> + Send + '_>,
686 > {
687 let wait_request_id = WaitRequestId::new();
688 let owned_ids = ids.to_vec();
689
690 let state = match self.write_state() {
691 Ok(mut state) => {
692 let transition = match state.authority.apply(OpsLifecycleInput::BeginWaitAll {
693 wait_request_id: wait_request_id.clone(),
694 operation_ids: owned_ids.clone(),
695 }) {
696 Ok(transition) => transition,
697 Err(err) => {
698 return Box::pin(WaitAllFuture {
699 registry: self,
700 wait_request_id,
701 operation_ids: owned_ids,
702 state: WaitAllFutureState::Ready(Some(Err(err))),
703 });
704 }
705 };
706
707 let satisfied = transition.effects.iter().find_map(|effect| match effect {
708 OpsLifecycleEffect::WaitAllSatisfied {
709 wait_request_id,
710 operation_ids,
711 } => Some(WaitAllSatisfied {
712 wait_request_id: wait_request_id.clone(),
713 operation_ids: operation_ids.clone(),
714 }),
715 _ => None,
716 });
717
718 state.execute_effects(&transition.effects);
719
720 if let Some(satisfied) = satisfied {
721 WaitAllFutureState::Ready(Some(state.collect_wait_outcomes(&owned_ids).map(
722 |outcomes| WaitAllResult {
723 outcomes,
724 satisfied,
725 },
726 )))
727 } else {
728 if state.pending_wait.is_some() {
729 return Box::pin(WaitAllFuture {
730 registry: self,
731 wait_request_id,
732 operation_ids: owned_ids,
733 state: WaitAllFutureState::Ready(Some(Err(
734 OpsLifecycleError::Internal(
735 "wait_all started while a pending wait sender already existed"
736 .into(),
737 ),
738 ))),
739 });
740 }
741 let (sender, receiver) = tokio::sync::oneshot::channel();
742 state.pending_wait = Some(PendingWaitState {
743 wait_request_id: wait_request_id.clone(),
744 sender,
745 });
746 WaitAllFutureState::Waiting(receiver)
747 }
748 }
749 Err(err) => WaitAllFutureState::Ready(Some(Err(err))),
750 };
751
752 Box::pin(WaitAllFuture {
753 registry: self,
754 wait_request_id,
755 operation_ids: owned_ids,
756 state,
757 })
758 }
759}
760
761#[cfg(test)]
762#[allow(clippy::unwrap_used, clippy::panic)]
763mod tests {
764 use super::*;
765 use meerkat_core::comms::TrustedPeerSpec;
766 use meerkat_core::lifecycle::RunId;
767 use meerkat_core::ops_lifecycle::{OperationKind, OpsLifecycleRegistry};
768 use meerkat_core::types::SessionId;
769 use uuid::Uuid;
770
771 fn test_run_id() -> RunId {
772 RunId(Uuid::from_u128(1))
773 }
774
775 fn background_spec(name: &str) -> OperationSpec {
776 OperationSpec {
777 id: OperationId::new(),
778 kind: OperationKind::BackgroundToolOp,
779 owner_session_id: SessionId::new(),
780 display_name: name.into(),
781 source_label: "test".into(),
782 child_session_id: None,
783 expect_peer_channel: false,
784 }
785 }
786
787 #[tokio::test]
788 async fn late_watchers_resolve_immediately() {
789 let registry = RuntimeOpsLifecycleRegistry::new();
790 let spec = background_spec("late");
791 let op_id = spec.id.clone();
792 registry.register_operation(spec).unwrap();
793 registry.provisioning_succeeded(&op_id).unwrap();
794 registry
795 .complete_operation(
796 &op_id,
797 OperationResult {
798 id: op_id.clone(),
799 content: "done".into(),
800 is_error: false,
801 duration_ms: 1,
802 tokens_used: 0,
803 },
804 )
805 .unwrap();
806
807 let watch = registry.register_watcher(&op_id).unwrap();
808 match watch.wait().await {
809 OperationTerminalOutcome::Completed(result) => assert_eq!(result.content, "done"),
810 other => panic!("expected completed outcome, got {other:?}"),
811 }
812 }
813
814 #[test]
815 fn peer_ready_requires_peer_expectation() {
816 let registry = RuntimeOpsLifecycleRegistry::new();
817 let spec = background_spec("no-peer");
818 let op_id = spec.id.clone();
819 registry.register_operation(spec).unwrap();
820 registry.provisioning_succeeded(&op_id).unwrap();
821
822 let result = registry.peer_ready(
823 &op_id,
824 OperationPeerHandle {
825 peer_name: "peer".into(),
826 trusted_peer: TrustedPeerSpec::new("peer", "peer-id", "inproc://peer").unwrap(),
827 },
828 );
829 assert!(matches!(result, Err(OpsLifecycleError::PeerNotExpected(_))));
830 }
831
832 #[tokio::test]
833 async fn multi_listener_completion() {
834 let registry = RuntimeOpsLifecycleRegistry::new();
835 let spec = background_spec("multi");
836 let op_id = spec.id.clone();
837 registry.register_operation(spec).unwrap();
838 registry.provisioning_succeeded(&op_id).unwrap();
839
840 let watch1 = registry.register_watcher(&op_id).unwrap();
841 let watch2 = registry.register_watcher(&op_id).unwrap();
842 let watch3 = registry.register_watcher(&op_id).unwrap();
843
844 registry
845 .complete_operation(
846 &op_id,
847 OperationResult {
848 id: op_id.clone(),
849 content: "multi-done".into(),
850 is_error: false,
851 duration_ms: 1,
852 tokens_used: 0,
853 },
854 )
855 .unwrap();
856
857 for watch in [watch1, watch2, watch3] {
858 match watch.wait().await {
859 OperationTerminalOutcome::Completed(result) => {
860 assert_eq!(result.content, "multi-done");
861 }
862 other => panic!("expected completed, got {other:?}"),
863 }
864 }
865 }
866
867 #[tokio::test]
868 async fn wait_all_returns_all_outcomes() {
869 let registry = RuntimeOpsLifecycleRegistry::new();
870
871 let spec_a = background_spec("a");
872 let id_a = spec_a.id.clone();
873 registry.register_operation(spec_a).unwrap();
874 registry.provisioning_succeeded(&id_a).unwrap();
875
876 let spec_b = background_spec("b");
877 let id_b = spec_b.id.clone();
878 registry.register_operation(spec_b).unwrap();
879 registry.provisioning_succeeded(&id_b).unwrap();
880
881 registry
882 .complete_operation(
883 &id_a,
884 OperationResult {
885 id: id_a.clone(),
886 content: "a-done".into(),
887 is_error: false,
888 duration_ms: 1,
889 tokens_used: 0,
890 },
891 )
892 .unwrap();
893 registry.fail_operation(&id_b, "b-error".into()).unwrap();
894
895 let wait_result = registry
896 .wait_all(&test_run_id(), &[id_a.clone(), id_b.clone()])
897 .await
898 .unwrap();
899 assert_eq!(wait_result.outcomes.len(), 2);
900 assert_eq!(wait_result.outcomes[0].0, id_a);
901 assert!(matches!(
902 wait_result.outcomes[0].1,
903 OperationTerminalOutcome::Completed(_)
904 ));
905 assert_eq!(wait_result.outcomes[1].0, id_b);
906 assert!(matches!(
907 wait_result.outcomes[1].1,
908 OperationTerminalOutcome::Failed { .. }
909 ));
910 assert_eq!(wait_result.satisfied.operation_ids.len(), 2);
912 assert_ne!(wait_result.satisfied.wait_request_id.to_string(), "");
913 }
914
915 #[tokio::test]
918 async fn wait_all_trait_path_submits_through_authority() {
919 let registry = RuntimeOpsLifecycleRegistry::new();
920 let spec = background_spec("trait-wait");
921 let op_id = spec.id.clone();
922 registry.register_operation(spec).unwrap();
923 registry.provisioning_succeeded(&op_id).unwrap();
924 registry
925 .complete_operation(
926 &op_id,
927 OperationResult {
928 id: op_id.clone(),
929 content: "done".into(),
930 is_error: false,
931 duration_ms: 1,
932 tokens_used: 0,
933 },
934 )
935 .unwrap();
936
937 let trait_ref: &dyn OpsLifecycleRegistry = ®istry;
939 let wait_result = trait_ref
940 .wait_all(&test_run_id(), std::slice::from_ref(&op_id))
941 .await
942 .unwrap();
943 assert_eq!(wait_result.outcomes.len(), 1);
944 assert!(matches!(
945 wait_result.outcomes[0].1,
946 OperationTerminalOutcome::Completed(_)
947 ));
948 assert_eq!(wait_result.satisfied.operation_ids, vec![op_id]);
950 assert_ne!(wait_result.satisfied.wait_request_id.to_string(), "");
951 }
952
953 #[tokio::test]
954 async fn wait_all_resolves_from_authority_owned_wait_request() {
955 let registry = RuntimeOpsLifecycleRegistry::new();
956 let run_id = test_run_id();
957
958 let spec = background_spec("pending");
959 let op_id = spec.id.clone();
960 registry.register_operation(spec).unwrap();
961 registry.provisioning_succeeded(&op_id).unwrap();
962
963 let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
964 tokio::pin!(wait_fut);
965 assert!(
966 tokio::time::timeout(std::time::Duration::from_millis(10), &mut wait_fut)
967 .await
968 .is_err()
969 );
970
971 let active_wait_request_id = {
972 let state = registry.read_state().unwrap();
973 let wait_request_id = match state.authority.wait_request_id().cloned() {
974 Some(wait_request_id) => wait_request_id,
975 None => panic!("wait request should be active"),
976 };
977 assert_eq!(
978 state.authority.wait_operation_ids(),
979 std::slice::from_ref(&op_id)
980 );
981 wait_request_id
982 };
983
984 registry
985 .complete_operation(
986 &op_id,
987 OperationResult {
988 id: op_id.clone(),
989 content: "done".into(),
990 is_error: false,
991 duration_ms: 1,
992 tokens_used: 0,
993 },
994 )
995 .unwrap();
996
997 let wait_result = wait_fut.await.unwrap();
998 assert_eq!(
999 wait_result.satisfied.wait_request_id,
1000 active_wait_request_id
1001 );
1002 assert_eq!(wait_result.satisfied.operation_ids, vec![op_id.clone()]);
1003 assert!(matches!(
1004 wait_result.outcomes.as_slice(),
1005 [(returned_id, OperationTerminalOutcome::Completed(_))] if *returned_id == op_id
1006 ));
1007 assert!(
1008 registry
1009 .read_state()
1010 .unwrap()
1011 .authority
1012 .wait_request_id()
1013 .is_none()
1014 );
1015 }
1016
1017 #[tokio::test]
1018 async fn dropping_wait_all_future_cancels_active_wait_request() {
1019 let registry = RuntimeOpsLifecycleRegistry::new();
1020 let run_id = test_run_id();
1021
1022 let spec = background_spec("cancelled-wait");
1023 let op_id = spec.id.clone();
1024 registry.register_operation(spec).unwrap();
1025 registry.provisioning_succeeded(&op_id).unwrap();
1026
1027 let wait_fut = registry.wait_all(&run_id, std::slice::from_ref(&op_id));
1028 drop(wait_fut);
1029
1030 let state = registry.read_state().unwrap();
1031 assert!(state.authority.wait_request_id().is_none());
1032 assert!(state.authority.wait_operation_ids().is_empty());
1033 }
1034
1035 #[test]
1036 fn collect_completed_drains_terminal_operations() {
1037 let registry = RuntimeOpsLifecycleRegistry::new();
1038
1039 let spec_a = background_spec("a");
1040 let id_a = spec_a.id.clone();
1041 registry.register_operation(spec_a).unwrap();
1042 registry.provisioning_succeeded(&id_a).unwrap();
1043 registry
1044 .complete_operation(
1045 &id_a,
1046 OperationResult {
1047 id: id_a.clone(),
1048 content: "done".into(),
1049 is_error: false,
1050 duration_ms: 1,
1051 tokens_used: 0,
1052 },
1053 )
1054 .unwrap();
1055
1056 let spec_b = background_spec("b");
1057 let id_b = spec_b.id.clone();
1058 registry.register_operation(spec_b).unwrap();
1059
1060 let collected = registry.collect_completed().unwrap();
1061 assert_eq!(collected.len(), 1);
1062 assert_eq!(collected[0].0, id_a);
1063
1064 assert!(registry.snapshot(&id_a).is_none());
1065 assert!(registry.snapshot(&id_b).is_some());
1066
1067 let collected2 = registry.collect_completed().unwrap();
1068 assert!(collected2.is_empty());
1069 }
1070
1071 #[test]
1072 fn bounded_completed_retention_evicts_oldest() {
1073 let registry = RuntimeOpsLifecycleRegistry::with_config(OpsLifecycleConfig {
1074 max_completed: 3,
1075 max_concurrent: None,
1076 });
1077
1078 let mut ids = Vec::new();
1079 for i in 0..5 {
1080 let spec = background_spec(&format!("op-{i}"));
1081 let id = spec.id.clone();
1082 registry.register_operation(spec).unwrap();
1083 registry.provisioning_succeeded(&id).unwrap();
1084 registry
1085 .complete_operation(
1086 &id,
1087 OperationResult {
1088 id: id.clone(),
1089 content: format!("done-{i}"),
1090 is_error: false,
1091 duration_ms: 1,
1092 tokens_used: 0,
1093 },
1094 )
1095 .unwrap();
1096 ids.push(id);
1097 }
1098
1099 assert!(registry.snapshot(&ids[0]).is_none());
1100 assert!(registry.snapshot(&ids[1]).is_none());
1101 assert!(registry.snapshot(&ids[2]).is_some());
1102 assert!(registry.snapshot(&ids[3]).is_some());
1103 assert!(registry.snapshot(&ids[4]).is_some());
1104 }
1105
1106 #[test]
1107 fn max_concurrent_enforcement() {
1108 let registry = RuntimeOpsLifecycleRegistry::with_config(OpsLifecycleConfig {
1109 max_completed: DEFAULT_MAX_COMPLETED,
1110 max_concurrent: Some(2),
1111 });
1112
1113 let spec_a = background_spec("a");
1114 let id_a = spec_a.id.clone();
1115 registry.register_operation(spec_a).unwrap();
1116
1117 let spec_b = background_spec("b");
1118 registry.register_operation(spec_b).unwrap();
1119
1120 let spec_c = background_spec("c");
1121 let result = registry.register_operation(spec_c);
1122 assert!(matches!(
1123 result,
1124 Err(OpsLifecycleError::MaxConcurrentExceeded {
1125 limit: 2,
1126 active: 2,
1127 })
1128 ));
1129
1130 registry.provisioning_succeeded(&id_a).unwrap();
1131 registry
1132 .complete_operation(
1133 &id_a,
1134 OperationResult {
1135 id: id_a.clone(),
1136 content: "done".into(),
1137 is_error: false,
1138 duration_ms: 1,
1139 tokens_used: 0,
1140 },
1141 )
1142 .unwrap();
1143
1144 let spec_d = background_spec("d");
1145 assert!(registry.register_operation(spec_d).is_ok());
1146 }
1147
1148 #[test]
1149 fn snapshot_includes_timestamps() {
1150 let registry = RuntimeOpsLifecycleRegistry::new();
1151 let spec = background_spec("timed");
1152 let op_id = spec.id.clone();
1153 registry.register_operation(spec).unwrap();
1154
1155 let snap1 = registry.snapshot(&op_id).unwrap();
1156 assert!(snap1.created_at_ms > 0);
1157 assert!(snap1.started_at_ms.is_none());
1158 assert!(snap1.completed_at_ms.is_none());
1159 assert!(snap1.elapsed_ms.is_none());
1160
1161 registry.provisioning_succeeded(&op_id).unwrap();
1162 let snap2 = registry.snapshot(&op_id).unwrap();
1163 assert!(snap2.started_at_ms.is_some());
1164 assert!(snap2.started_at_ms.unwrap() >= snap2.created_at_ms);
1165
1166 registry
1167 .complete_operation(
1168 &op_id,
1169 OperationResult {
1170 id: op_id.clone(),
1171 content: "done".into(),
1172 is_error: false,
1173 duration_ms: 1,
1174 tokens_used: 0,
1175 },
1176 )
1177 .unwrap();
1178 let snap3 = registry.snapshot(&op_id).unwrap();
1179 assert!(snap3.completed_at_ms.is_some());
1180 assert!(snap3.elapsed_ms.is_some());
1181 assert!(snap3.completed_at_ms.unwrap() >= snap3.started_at_ms.unwrap());
1182 }
1183
1184 #[test]
1185 fn snapshot_includes_peer_handle() {
1186 let registry = RuntimeOpsLifecycleRegistry::new();
1187 let spec = OperationSpec {
1188 id: OperationId::new(),
1189 kind: OperationKind::MobMemberChild,
1190 owner_session_id: SessionId::new(),
1191 display_name: "peer-test".into(),
1192 source_label: "test".into(),
1193 child_session_id: Some(SessionId::new()),
1194 expect_peer_channel: true,
1195 };
1196 let op_id = spec.id.clone();
1197 registry.register_operation(spec).unwrap();
1198 registry.provisioning_succeeded(&op_id).unwrap();
1199
1200 let snap1 = registry.snapshot(&op_id).unwrap();
1201 assert!(snap1.peer_handle.is_none());
1202
1203 let handle = OperationPeerHandle {
1204 peer_name: "member-x".into(),
1205 trusted_peer: TrustedPeerSpec::new("member-x", "peer-id", "inproc://x").unwrap(),
1206 };
1207 registry.peer_ready(&op_id, handle).unwrap();
1208
1209 let snap2 = registry.snapshot(&op_id).unwrap();
1210 assert_eq!(snap2.peer_handle.as_ref().unwrap().peer_name, "member-x");
1211 }
1212}