1use std::collections::HashMap;
13use std::future::Future;
14
15use meerkat_core::lifecycle::InputId;
16#[cfg(test)]
17use meerkat_core::lifecycle::RunId;
18use meerkat_core::types::{RunResult, SessionId};
19use meerkat_core::{TurnErrorMetadata, TurnTerminalCauseKind, TurnTerminalOutcome};
20use serde_json::Value;
21
22use crate::meerkat_machine::driver::RuntimeCompletionResultAuthority;
23use crate::meerkat_machine::dsl::RuntimeCompletionResultClass;
24use crate::tokio::sync::oneshot;
25
26#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
32pub enum CompletionWaitError {
33 #[error("completion channel closed without an authorized result")]
34 ChannelClosed,
35 #[error("{0}")]
36 AuthorityUnavailable(String),
37}
38
39impl CompletionWaitError {
40 pub fn wait_failure_observation(
41 &self,
42 ) -> crate::meerkat_machine::dsl::RuntimeCompletionWaitFailureObservation {
43 match self {
44 Self::ChannelClosed => {
45 crate::meerkat_machine::dsl::RuntimeCompletionWaitFailureObservation::ChannelClosed
46 }
47 Self::AuthorityUnavailable(_) => {
48 crate::meerkat_machine::dsl::RuntimeCompletionWaitFailureObservation::AuthorityUnavailable
49 }
50 }
51 }
52}
53
54#[derive(Debug)]
56pub enum CompletionOutcome {
57 Completed(Box<RunResult>),
59 CompletedWithoutResult,
61 CallbackPending { tool_name: String, args: Value },
64 Cancelled,
66 Abandoned {
70 reason: String,
71 error: TurnErrorMetadata,
72 },
73 AbandonedWithError {
75 reason: String,
76 error: TurnErrorMetadata,
77 },
78 CompletedWithFinalizationFailure { error: TurnErrorMetadata },
84 RuntimeTerminated {
87 reason: String,
88 error: TurnErrorMetadata,
89 },
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct CompletionCleanupObservation {
98 owner_session_id: SessionId,
99 owner_agent_runtime_id: Option<crate::meerkat_machine::dsl::AgentRuntimeId>,
100 owner_fence_token: Option<crate::meerkat_machine::dsl::FenceToken>,
101 owner_runtime_generation: Option<crate::meerkat_machine::dsl::Generation>,
102 owner_runtime_epoch_id: Option<crate::meerkat_machine::dsl::RuntimeEpochId>,
103 observed_outcome: crate::meerkat_machine::dsl::RuntimeCompletionObservedOutcome,
104}
105
106impl CompletionCleanupObservation {
107 fn from_authority(authority: RuntimeCompletionResultAuthority) -> Self {
108 Self {
109 owner_session_id: authority.session_id().clone(),
110 owner_agent_runtime_id: authority.agent_runtime_id().cloned(),
111 owner_fence_token: authority.fence_token(),
112 owner_runtime_generation: authority.runtime_generation(),
113 owner_runtime_epoch_id: authority.runtime_epoch_id().cloned(),
114 observed_outcome: authority.cleanup_observation(),
115 }
116 }
117
118 pub(crate) fn owner_session_id(&self) -> &SessionId {
119 &self.owner_session_id
120 }
121
122 pub(crate) fn owner_agent_runtime_id(
123 &self,
124 ) -> Option<&crate::meerkat_machine::dsl::AgentRuntimeId> {
125 self.owner_agent_runtime_id.as_ref()
126 }
127
128 pub(crate) fn owner_fence_token(&self) -> Option<crate::meerkat_machine::dsl::FenceToken> {
129 self.owner_fence_token
130 }
131
132 pub(crate) fn owner_runtime_generation(
133 &self,
134 ) -> Option<crate::meerkat_machine::dsl::Generation> {
135 self.owner_runtime_generation
136 }
137
138 pub(crate) fn owner_runtime_epoch_id(
139 &self,
140 ) -> Option<&crate::meerkat_machine::dsl::RuntimeEpochId> {
141 self.owner_runtime_epoch_id.as_ref()
142 }
143
144 pub(crate) fn observed_outcome(
145 &self,
146 ) -> crate::meerkat_machine::dsl::RuntimeCompletionObservedOutcome {
147 self.observed_outcome
148 }
149}
150
151#[derive(Debug)]
154struct CompletionDelivery {
155 outcome: CompletionOutcome,
156 cleanup_observation: CompletionCleanupObservation,
157}
158
159#[derive(Debug, Clone, PartialEq, Eq)]
164pub struct CompletionWaiterEntrySnapshot {
165 pub input_id: InputId,
166 pub waiter_count: usize,
167}
168
169#[derive(Debug, Clone, PartialEq, Eq, Default)]
174pub struct CompletionRegistrySnapshot {
175 pub input_count: usize,
176 pub waiter_count: usize,
177 pub waiting_inputs: Vec<CompletionWaiterEntrySnapshot>,
178}
179
180#[derive(Debug)]
182pub struct CompletionHandle {
183 rx: oneshot::Receiver<Result<CompletionDelivery, CompletionWaitError>>,
184}
185
186impl CompletionHandle {
187 async fn try_wait_delivery(self) -> Result<CompletionDelivery, CompletionWaitError> {
188 self.rx
189 .await
190 .unwrap_or(Err(CompletionWaitError::ChannelClosed))
191 }
192
193 pub async fn try_wait(self) -> Result<CompletionOutcome, CompletionWaitError> {
195 self.try_wait_delivery()
196 .await
197 .map(|delivery| delivery.outcome)
198 }
199
200 pub async fn try_wait_with_cleanup_observation(
203 self,
204 ) -> Result<(CompletionOutcome, CompletionCleanupObservation), CompletionWaitError> {
205 let delivery = self.try_wait_delivery().await?;
206 Ok((delivery.outcome, delivery.cleanup_observation))
207 }
208
209 pub async fn wait(self) -> Result<CompletionOutcome, CompletionWaitError> {
211 self.try_wait().await
212 }
213
214 #[cfg(test)]
216 pub(crate) async fn wait_authorized(self) -> CompletionOutcome {
217 self.wait()
218 .await
219 .expect("completion waiter closed without an authorized result")
220 }
221
222 pub fn with_cleanup<F, Fut>(self, cleanup: F) -> Self
226 where
227 F: FnOnce() -> Fut + Send + 'static,
228 Fut: Future<Output = ()> + Send + 'static,
229 {
230 let (tx, rx) = oneshot::channel();
231 crate::tokio::spawn(async move {
232 let outcome = self.try_wait_delivery().await;
233 cleanup().await;
234 let _ = tx.send(outcome);
235 });
236 Self { rx }
237 }
238
239 pub fn with_outcome_cleanup<F, Fut>(self, cleanup: F) -> Self
244 where
245 F: FnOnce(CompletionCleanupObservation) -> Fut + Send + 'static,
246 Fut: Future<Output = ()> + Send + 'static,
247 {
248 let (tx, rx) = oneshot::channel();
249 crate::tokio::spawn(async move {
250 let outcome = match self.try_wait_delivery().await {
251 Ok(delivery) => {
252 cleanup(delivery.cleanup_observation.clone()).await;
253 Ok(delivery)
254 }
255 Err(error) => Err(error),
256 };
257 let _ = tx.send(outcome);
258 });
259 Self { rx }
260 }
261
262 pub fn with_completion_cleanup<F, Fut>(self, cleanup: F) -> Self
265 where
266 F: FnOnce(Result<CompletionCleanupObservation, CompletionWaitError>) -> Fut
267 + Send
268 + 'static,
269 Fut: Future<Output = ()> + Send + 'static,
270 {
271 let (tx, rx) = oneshot::channel();
272 crate::tokio::spawn(async move {
273 let outcome = self.try_wait_delivery().await;
274 match &outcome {
275 Ok(delivery) => cleanup(Ok(delivery.cleanup_observation.clone())).await,
276 Err(error) => cleanup(Err(error.clone())).await,
277 }
278 let _ = tx.send(outcome);
279 });
280 Self { rx }
281 }
282
283 #[cfg(test)]
284 fn already_resolved_internal(
285 outcome: CompletionOutcome,
286 authority: RuntimeCompletionResultAuthority,
287 ) -> Self {
288 let (tx, rx) = oneshot::channel();
289 let _ = tx.send(Ok(CompletionDelivery {
290 outcome,
291 cleanup_observation: CompletionCleanupObservation::from_authority(authority),
292 }));
293 Self { rx }
294 }
295
296 #[cfg(test)]
297 pub(crate) fn already_resolved_with_generated_class(
298 outcome: CompletionOutcome,
299 expected_class: crate::meerkat_machine::dsl::RuntimeCompletionResultClass,
300 terminal: crate::meerkat_machine::dsl::RuntimeCompletionTerminalObservation,
301 finalization: crate::meerkat_machine::dsl::RuntimeCompletionFinalizationObservation,
302 ) -> Result<Self, crate::RuntimeDriverError> {
303 let run_id = if terminal
304 == crate::meerkat_machine::dsl::RuntimeCompletionTerminalObservation::RuntimeTerminated
305 {
306 None
307 } else {
308 Some(RunId::new())
309 };
310 let authority =
311 crate::meerkat_machine::driver::machine_resolve_pre_resolved_runtime_completion_result(
312 run_id.as_ref(),
313 terminal,
314 finalization,
315 )?;
316 if !authority.allows(expected_class) {
317 return Err(crate::RuntimeDriverError::Internal(format!(
318 "generated runtime completion authority returned {:?}, expected {expected_class:?}",
319 authority.class()
320 )));
321 }
322 Ok(Self::already_resolved_internal(outcome, authority))
323 }
324
325 #[cfg(test)]
326 pub(crate) fn already_completed_without_result() -> Result<Self, crate::RuntimeDriverError> {
327 Self::already_resolved_with_generated_class(
328 CompletionOutcome::CompletedWithoutResult,
329 crate::meerkat_machine::dsl::RuntimeCompletionResultClass::CompletedWithoutResult,
330 crate::meerkat_machine::dsl::RuntimeCompletionTerminalObservation::NoResult,
331 crate::meerkat_machine::dsl::RuntimeCompletionFinalizationObservation::Succeeded,
332 )
333 }
334
335 #[cfg(test)]
336 pub(crate) fn already_runtime_apply_failed(
337 reason: String,
338 error: TurnErrorMetadata,
339 ) -> Result<Self, crate::RuntimeDriverError> {
340 Self::already_resolved_with_generated_class(
341 CompletionOutcome::AbandonedWithError { reason, error },
342 crate::meerkat_machine::dsl::RuntimeCompletionResultClass::AbandonedWithError,
343 crate::meerkat_machine::dsl::RuntimeCompletionTerminalObservation::NoResult,
344 crate::meerkat_machine::dsl::RuntimeCompletionFinalizationObservation::Failed,
345 )
346 }
347
348 #[cfg(test)]
349 pub(crate) fn already_runtime_terminated(
350 reason: String,
351 ) -> Result<Self, crate::RuntimeDriverError> {
352 Self::already_resolved_with_generated_class(
353 CompletionOutcome::runtime_terminated(&reason),
354 crate::meerkat_machine::dsl::RuntimeCompletionResultClass::RuntimeTerminated,
355 crate::meerkat_machine::dsl::RuntimeCompletionTerminalObservation::RuntimeTerminated,
356 crate::meerkat_machine::dsl::RuntimeCompletionFinalizationObservation::Succeeded,
357 )
358 }
359
360 #[cfg(test)]
361 pub(crate) fn already_callback_pending(
362 tool_name: String,
363 args: Value,
364 ) -> Result<Self, crate::RuntimeDriverError> {
365 Self::already_resolved_with_generated_class(
366 CompletionOutcome::CallbackPending { tool_name, args },
367 crate::meerkat_machine::dsl::RuntimeCompletionResultClass::CallbackPending,
368 crate::meerkat_machine::dsl::RuntimeCompletionTerminalObservation::CallbackPending,
369 crate::meerkat_machine::dsl::RuntimeCompletionFinalizationObservation::Succeeded,
370 )
371 }
372}
373
374impl CompletionOutcome {
375 fn runtime_terminated(reason: &str) -> Self {
379 Self::RuntimeTerminated {
380 reason: reason.to_string(),
381 error: TurnErrorMetadata::terminal(
382 TurnTerminalCauseKind::FatalFailure,
383 TurnTerminalOutcome::Failed,
384 reason,
385 ),
386 }
387 }
388
389 pub fn abandoned_reason(&self) -> Option<&str> {
390 match self {
391 Self::Abandoned { reason, .. } | Self::AbandonedWithError { reason, .. } => {
392 Some(reason)
393 }
394 _ => None,
395 }
396 }
397
398 pub fn error_metadata(&self) -> Option<&TurnErrorMetadata> {
399 match self {
400 Self::Abandoned { error, .. }
401 | Self::AbandonedWithError { error, .. }
402 | Self::CompletedWithFinalizationFailure { error, .. }
403 | Self::RuntimeTerminated { error, .. } => Some(error),
404 _ => None,
405 }
406 }
407}
408
409#[derive(Default)]
414pub(crate) struct CompletionRegistry {
415 waiters:
416 HashMap<InputId, Vec<oneshot::Sender<Result<CompletionDelivery, CompletionWaitError>>>>,
417}
418
419impl CompletionRegistry {
420 pub(crate) fn new() -> Self {
421 Self::default()
422 }
423
424 fn take_waiters(
425 &mut self,
426 input_id: &InputId,
427 ) -> Option<Vec<oneshot::Sender<Result<CompletionDelivery, CompletionWaitError>>>> {
428 self.waiters.remove(input_id)
429 }
430
431 fn send_outcome(
432 senders: Vec<oneshot::Sender<Result<CompletionDelivery, CompletionWaitError>>>,
433 outcome: CompletionOutcome,
434 cleanup_observation: CompletionCleanupObservation,
435 ) {
436 for tx in senders {
437 let outcome = match &outcome {
438 CompletionOutcome::Completed(result) => {
439 CompletionOutcome::Completed(Box::new(result.as_ref().clone()))
440 }
441 CompletionOutcome::CompletedWithoutResult => {
442 CompletionOutcome::CompletedWithoutResult
443 }
444 CompletionOutcome::CallbackPending { tool_name, args } => {
445 CompletionOutcome::CallbackPending {
446 tool_name: tool_name.clone(),
447 args: args.clone(),
448 }
449 }
450 CompletionOutcome::Cancelled => CompletionOutcome::Cancelled,
451 CompletionOutcome::Abandoned { reason, error } => CompletionOutcome::Abandoned {
452 reason: reason.clone(),
453 error: error.clone(),
454 },
455 CompletionOutcome::AbandonedWithError { reason, error } => {
456 CompletionOutcome::AbandonedWithError {
457 reason: reason.clone(),
458 error: error.clone(),
459 }
460 }
461 CompletionOutcome::CompletedWithFinalizationFailure { error } => {
462 CompletionOutcome::CompletedWithFinalizationFailure {
463 error: error.clone(),
464 }
465 }
466 CompletionOutcome::RuntimeTerminated { reason, error } => {
467 CompletionOutcome::RuntimeTerminated {
468 reason: reason.clone(),
469 error: error.clone(),
470 }
471 }
472 };
473 let _ = tx.send(Ok(CompletionDelivery {
474 outcome,
475 cleanup_observation: cleanup_observation.clone(),
476 }));
477 }
478 }
479
480 fn send_error(
481 senders: Vec<oneshot::Sender<Result<CompletionDelivery, CompletionWaitError>>>,
482 error: CompletionWaitError,
483 ) {
484 for tx in senders {
485 let _ = tx.send(Err(error.clone()));
486 }
487 }
488
489 fn authority_mismatch_error(
490 authority: &RuntimeCompletionResultAuthority,
491 expected: RuntimeCompletionResultClass,
492 ) -> CompletionWaitError {
493 CompletionWaitError::AuthorityUnavailable(format!(
494 "generated runtime completion authority returned {:?}, expected {expected:?}",
495 authority.class()
496 ))
497 }
498
499 fn fail_input_authority_mismatch(
500 &mut self,
501 input_id: &InputId,
502 authority: &RuntimeCompletionResultAuthority,
503 expected: RuntimeCompletionResultClass,
504 ) {
505 if let Some(senders) = self.take_waiters(input_id) {
506 Self::send_error(senders, Self::authority_mismatch_error(authority, expected));
507 }
508 }
509
510 pub(crate) fn register(&mut self, input_id: InputId) -> CompletionHandle {
515 let (tx, rx) = oneshot::channel();
516 self.waiters.entry(input_id).or_default().push(tx);
517 CompletionHandle { rx }
518 }
519
520 fn resolve_completed(
522 &mut self,
523 input_id: &InputId,
524 result: RunResult,
525 cleanup_observation: CompletionCleanupObservation,
526 ) {
527 if let Some(senders) = self.take_waiters(input_id) {
528 Self::send_outcome(
529 senders,
530 CompletionOutcome::Completed(Box::new(result)),
531 cleanup_observation,
532 );
533 }
534 }
535
536 pub(crate) fn resolve_completed_authorized(
537 &mut self,
538 input_id: &InputId,
539 result: RunResult,
540 authority: RuntimeCompletionResultAuthority,
541 ) {
542 let expected = RuntimeCompletionResultClass::Completed;
543 if !authority.allows(expected) {
544 self.fail_input_authority_mismatch(input_id, &authority, expected);
545 return;
546 }
547 self.resolve_completed(
548 input_id,
549 result,
550 CompletionCleanupObservation::from_authority(authority),
551 );
552 }
553
554 fn resolve_without_result(
556 &mut self,
557 input_id: &InputId,
558 cleanup_observation: CompletionCleanupObservation,
559 ) {
560 if let Some(senders) = self.take_waiters(input_id) {
561 Self::send_outcome(
562 senders,
563 CompletionOutcome::CompletedWithoutResult,
564 cleanup_observation,
565 );
566 }
567 }
568
569 pub(crate) fn resolve_without_result_authorized(
570 &mut self,
571 input_id: &InputId,
572 authority: RuntimeCompletionResultAuthority,
573 ) {
574 let expected = RuntimeCompletionResultClass::CompletedWithoutResult;
575 if !authority.allows(expected) {
576 self.fail_input_authority_mismatch(input_id, &authority, expected);
577 return;
578 }
579 self.resolve_without_result(
580 input_id,
581 CompletionCleanupObservation::from_authority(authority),
582 );
583 }
584
585 fn resolve_callback_pending(
587 &mut self,
588 input_id: &InputId,
589 tool_name: String,
590 args: Value,
591 cleanup_observation: CompletionCleanupObservation,
592 ) {
593 if let Some(senders) = self.take_waiters(input_id) {
594 Self::send_outcome(
595 senders,
596 CompletionOutcome::CallbackPending { tool_name, args },
597 cleanup_observation,
598 );
599 }
600 }
601
602 pub(crate) fn resolve_callback_pending_authorized(
603 &mut self,
604 input_id: &InputId,
605 tool_name: String,
606 args: Value,
607 authority: RuntimeCompletionResultAuthority,
608 ) {
609 let expected = RuntimeCompletionResultClass::CallbackPending;
610 if !authority.allows(expected) {
611 self.fail_input_authority_mismatch(input_id, &authority, expected);
612 return;
613 }
614 self.resolve_callback_pending(
615 input_id,
616 tool_name,
617 args,
618 CompletionCleanupObservation::from_authority(authority),
619 );
620 }
621
622 fn resolve_cancelled(
624 &mut self,
625 input_id: &InputId,
626 cleanup_observation: CompletionCleanupObservation,
627 ) {
628 if let Some(senders) = self.take_waiters(input_id) {
629 Self::send_outcome(senders, CompletionOutcome::Cancelled, cleanup_observation);
630 }
631 }
632
633 pub(crate) fn resolve_cancelled_authorized(
634 &mut self,
635 input_id: &InputId,
636 authority: RuntimeCompletionResultAuthority,
637 ) {
638 let expected = RuntimeCompletionResultClass::Cancelled;
639 if !authority.allows(expected) {
640 self.fail_input_authority_mismatch(input_id, &authority, expected);
641 return;
642 }
643 self.resolve_cancelled(
644 input_id,
645 CompletionCleanupObservation::from_authority(authority),
646 );
647 }
648
649 fn resolve_abandoned_with_error(
651 &mut self,
652 input_id: &InputId,
653 reason: String,
654 error: TurnErrorMetadata,
655 cleanup_observation: CompletionCleanupObservation,
656 ) {
657 if let Some(senders) = self.take_waiters(input_id) {
658 Self::send_outcome(
659 senders,
660 CompletionOutcome::AbandonedWithError { reason, error },
661 cleanup_observation,
662 );
663 }
664 }
665
666 pub(crate) fn resolve_abandoned_with_error_authorized(
667 &mut self,
668 input_id: &InputId,
669 reason: String,
670 error: TurnErrorMetadata,
671 authority: RuntimeCompletionResultAuthority,
672 ) {
673 let expected = RuntimeCompletionResultClass::AbandonedWithError;
674 if !authority.allows(expected) {
675 self.fail_input_authority_mismatch(input_id, &authority, expected);
676 return;
677 }
678 self.resolve_abandoned_with_error(
679 input_id,
680 reason,
681 error,
682 CompletionCleanupObservation::from_authority(authority),
683 );
684 }
685
686 fn resolve_completed_with_finalization_failure(
689 &mut self,
690 input_id: &InputId,
691 error: TurnErrorMetadata,
692 cleanup_observation: CompletionCleanupObservation,
693 ) {
694 if let Some(senders) = self.take_waiters(input_id) {
695 Self::send_outcome(
696 senders,
697 CompletionOutcome::CompletedWithFinalizationFailure { error },
698 cleanup_observation,
699 );
700 }
701 }
702
703 pub(crate) fn resolve_completed_with_finalization_failure_authorized(
704 &mut self,
705 input_id: &InputId,
706 error: TurnErrorMetadata,
707 authority: RuntimeCompletionResultAuthority,
708 ) {
709 let expected = RuntimeCompletionResultClass::CompletedWithFinalizationFailure;
710 if !authority.allows(expected) {
711 self.fail_input_authority_mismatch(input_id, &authority, expected);
712 return;
713 }
714 self.resolve_completed_with_finalization_failure(
715 input_id,
716 error,
717 CompletionCleanupObservation::from_authority(authority),
718 );
719 }
720
721 pub(crate) fn resolve_all_runtime_terminated(
727 &mut self,
728 reason: &str,
729 authority: RuntimeCompletionResultAuthority,
730 ) {
731 let expected = RuntimeCompletionResultClass::RuntimeTerminated;
732 if !authority.allows(expected) {
733 let error = Self::authority_mismatch_error(&authority, expected);
734 self.fail_all_waiters(error);
735 return;
736 }
737 let cleanup_observation = CompletionCleanupObservation::from_authority(authority);
738 for (_, senders) in self.waiters.drain() {
739 Self::send_outcome(
740 senders,
741 CompletionOutcome::runtime_terminated(reason),
742 cleanup_observation.clone(),
743 );
744 }
745 }
746
747 pub(crate) fn resolve_inputs_runtime_terminated<I>(
748 &mut self,
749 input_ids: I,
750 reason: &str,
751 authority: RuntimeCompletionResultAuthority,
752 ) where
753 I: IntoIterator<Item = InputId>,
754 {
755 let input_ids: Vec<InputId> = input_ids.into_iter().collect();
756 let expected = RuntimeCompletionResultClass::RuntimeTerminated;
757 if !authority.allows(expected) {
758 let error = Self::authority_mismatch_error(&authority, expected);
759 self.fail_inputs(input_ids, error);
760 return;
761 }
762 let cleanup_observation = CompletionCleanupObservation::from_authority(authority);
763 for input_id in input_ids {
764 if let Some(senders) = self.take_waiters(&input_id) {
765 Self::send_outcome(
766 senders,
767 CompletionOutcome::runtime_terminated(reason),
768 cleanup_observation.clone(),
769 );
770 }
771 }
772 }
773
774 pub(crate) fn fail_all_waiters(&mut self, error: CompletionWaitError) {
775 for (_, senders) in self.waiters.drain() {
776 Self::send_error(senders, error.clone());
777 }
778 }
779
780 pub(crate) fn fail_inputs<I>(&mut self, input_ids: I, error: CompletionWaitError)
781 where
782 I: IntoIterator<Item = InputId>,
783 {
784 for input_id in input_ids {
785 if let Some(senders) = self.take_waiters(&input_id) {
786 Self::send_error(senders, error.clone());
787 }
788 }
789 }
790
791 pub(crate) fn resolve_not_pending_runtime_terminated<F>(
794 &mut self,
795 mut is_still_pending: F,
796 reason: &str,
797 authority: RuntimeCompletionResultAuthority,
798 ) where
799 F: FnMut(&InputId) -> bool,
800 {
801 let expected = RuntimeCompletionResultClass::RuntimeTerminated;
802 if !authority.allows(expected) {
803 let error = Self::authority_mismatch_error(&authority, expected);
804 self.waiters.retain(|input_id, senders| {
805 if is_still_pending(input_id) {
806 return true;
807 }
808
809 Self::send_error(std::mem::take(senders), error.clone());
810 false
811 });
812 return;
813 }
814 let cleanup_observation = CompletionCleanupObservation::from_authority(authority);
815 self.waiters.retain(|input_id, senders| {
816 if is_still_pending(input_id) {
817 return true;
818 }
819
820 Self::send_outcome(
821 std::mem::take(senders),
822 CompletionOutcome::runtime_terminated(reason),
823 cleanup_observation.clone(),
824 );
825 false
826 });
827 }
828
829 pub(crate) fn diagnostic_snapshot(&self) -> CompletionRegistrySnapshot {
831 let mut waiting_inputs: Vec<_> = self
832 .waiters
833 .iter()
834 .map(|(input_id, senders)| CompletionWaiterEntrySnapshot {
835 input_id: input_id.clone(),
836 waiter_count: senders.len(),
837 })
838 .collect();
839 waiting_inputs
840 .sort_by(|left, right| left.input_id.to_string().cmp(&right.input_id.to_string()));
841
842 CompletionRegistrySnapshot {
843 input_count: waiting_inputs.len(),
844 waiter_count: waiting_inputs.iter().map(|entry| entry.waiter_count).sum(),
845 waiting_inputs,
846 }
847 }
848
849 #[cfg(test)]
854 pub fn debug_has_waiters(&self) -> bool {
855 !self.waiters.is_empty()
856 }
857
858 #[cfg(test)]
863 pub fn debug_waiter_count(&self) -> usize {
864 self.waiters.values().map(Vec::len).sum()
865 }
866}
867
868#[cfg(test)]
869#[allow(clippy::unwrap_used, clippy::panic)]
870mod tests {
871 use super::*;
872 use crate::meerkat_machine::dsl::{
873 RuntimeCompletionObservedOutcome, RuntimeCompletionResultClass,
874 };
875 use meerkat_core::types::{SessionId, Usage};
876
877 fn make_run_result() -> RunResult {
878 RunResult {
879 text: "hello".into(),
880 session_id: SessionId::new(),
881 usage: Usage::default(),
882 turns: 1,
883 tool_calls: 0,
884 terminal_cause_kind: None,
885 structured_output: None,
886 extraction_error: None,
887 schema_warnings: None,
888 skill_diagnostics: None,
889 }
890 }
891
892 fn authority(
893 result_class: RuntimeCompletionResultClass,
894 cleanup_observation: RuntimeCompletionObservedOutcome,
895 ) -> RuntimeCompletionResultAuthority {
896 crate::meerkat_machine::driver::test_runtime_completion_authority(
897 result_class,
898 cleanup_observation,
899 )
900 }
901
902 #[tokio::test]
903 async fn register_and_complete() {
904 let mut registry = CompletionRegistry::new();
905 let input_id = InputId::new();
906 let handle = registry.register(input_id.clone());
907
908 assert!(registry.debug_has_waiters());
909 assert_eq!(registry.debug_waiter_count(), 1);
910
911 let result = make_run_result();
912 registry.resolve_completed_authorized(
913 &input_id,
914 result,
915 authority(
916 RuntimeCompletionResultClass::Completed,
917 RuntimeCompletionObservedOutcome::Completed,
918 ),
919 );
920
921 match handle.wait_authorized().await {
922 CompletionOutcome::Completed(r) => assert_eq!(r.text, "hello"),
923 other => panic!("Expected Completed, got {other:?}"),
924 }
925 }
926
927 #[tokio::test]
928 async fn register_and_fail_waiter() {
929 let mut registry = CompletionRegistry::new();
930 let input_id = InputId::new();
931 let handle = registry.register(input_id.clone());
932
933 registry.fail_inputs(
934 [input_id],
935 CompletionWaitError::AuthorityUnavailable("retired".into()),
936 );
937
938 match handle.try_wait().await {
939 Err(CompletionWaitError::AuthorityUnavailable(reason)) => assert_eq!(reason, "retired"),
940 other => panic!("Expected wait error, got {other:?}"),
941 }
942 }
943
944 #[tokio::test]
945 async fn mismatched_result_authority_fails_waiter_closed() {
946 let mut registry = CompletionRegistry::new();
947 let input_id = InputId::new();
948 let handle = registry.register(input_id.clone());
949
950 registry.resolve_completed_authorized(
951 &input_id,
952 make_run_result(),
953 authority(
954 RuntimeCompletionResultClass::Cancelled,
955 RuntimeCompletionObservedOutcome::Cancelled,
956 ),
957 );
958
959 assert!(!registry.debug_has_waiters());
960 match handle.try_wait().await {
961 Err(CompletionWaitError::AuthorityUnavailable(reason)) => {
962 assert!(reason.contains("Cancelled"));
963 assert!(reason.contains("Completed"));
964 }
965 other => panic!("Expected authority mismatch wait error, got {other:?}"),
966 }
967 }
968
969 #[tokio::test]
970 async fn resolve_all_runtime_terminated() {
971 let mut registry = CompletionRegistry::new();
972 let h1 = registry.register(InputId::new());
973 let h2 = registry.register(InputId::new());
974
975 registry.resolve_all_runtime_terminated(
976 "runtime stopped",
977 authority(
978 RuntimeCompletionResultClass::RuntimeTerminated,
979 RuntimeCompletionObservedOutcome::RuntimeTerminated,
980 ),
981 );
982
983 assert!(!registry.debug_has_waiters());
984
985 match h1.wait_authorized().await {
986 CompletionOutcome::RuntimeTerminated { reason, .. } => {
987 assert_eq!(reason, "runtime stopped");
988 }
989 other => panic!("Expected RuntimeTerminated, got {other:?}"),
990 }
991 match h2.wait_authorized().await {
992 CompletionOutcome::RuntimeTerminated { reason, .. } => {
993 assert_eq!(reason, "runtime stopped");
994 }
995 other => panic!("Expected RuntimeTerminated, got {other:?}"),
996 }
997 }
998
999 #[tokio::test]
1000 async fn mismatched_runtime_terminated_authority_fails_all_waiters_closed() {
1001 let mut registry = CompletionRegistry::new();
1002 let h1 = registry.register(InputId::new());
1003 let h2 = registry.register(InputId::new());
1004
1005 registry.resolve_all_runtime_terminated(
1006 "runtime stopped",
1007 authority(
1008 RuntimeCompletionResultClass::CompletedWithoutResult,
1009 RuntimeCompletionObservedOutcome::CompletedWithoutResult,
1010 ),
1011 );
1012
1013 assert!(!registry.debug_has_waiters());
1014 for handle in [h1, h2] {
1015 match handle.try_wait().await {
1016 Err(CompletionWaitError::AuthorityUnavailable(reason)) => {
1017 assert!(reason.contains("CompletedWithoutResult"));
1018 assert!(reason.contains("RuntimeTerminated"));
1019 }
1020 other => panic!("Expected authority mismatch wait error, got {other:?}"),
1021 }
1022 }
1023 }
1024
1025 #[tokio::test]
1026 async fn cleanup_rejects_observation_from_another_session() {
1027 let adapter = crate::meerkat_machine::MeerkatMachine::ephemeral();
1028 let source_session_id = SessionId::new();
1029 let target_session_id = SessionId::new();
1030 adapter
1031 .prepare_bindings(source_session_id.clone())
1032 .await
1033 .expect("source session should prepare runtime bindings");
1034 adapter
1035 .prepare_bindings(target_session_id.clone())
1036 .await
1037 .expect("target session should prepare runtime bindings");
1038
1039 let input = crate::Input::Prompt(crate::PromptInput::new(
1040 "source session pending completion",
1041 None,
1042 ));
1043 let (_outcome, handle) = adapter
1044 .accept_input_with_completion(&source_session_id, input)
1045 .await
1046 .expect("source input should be accepted");
1047 let handle = handle.expect("source input should have a completion waiter");
1048 adapter
1049 .stop_runtime_executor(&source_session_id, "source stopped")
1050 .await
1051 .expect("source stop should resolve waiter");
1052 let (_outcome, observation) = handle
1053 .try_wait_with_cleanup_observation()
1054 .await
1055 .expect("waiter should resolve with generated cleanup observation");
1056
1057 assert_eq!(observation.owner_session_id(), &source_session_id);
1058 let err = adapter
1059 .resolve_runtime_completion_cleanup(
1060 &target_session_id,
1061 observation,
1062 false,
1063 crate::meerkat_machine::dsl::RuntimeCompletionLiveSessionObservation::Absent,
1064 )
1065 .await
1066 .expect_err("cleanup must reject an observation minted for another session");
1067 assert!(
1068 matches!(err, crate::RuntimeDriverError::ValidationFailed { .. }),
1069 "expected generated cleanup validation failure, got {err:?}"
1070 );
1071 }
1072
1073 #[tokio::test]
1074 async fn cleanup_rejects_stale_same_session_observation_after_rebinding() {
1075 let adapter = crate::meerkat_machine::MeerkatMachine::ephemeral();
1076 let session_id = SessionId::new();
1077 adapter
1078 .prepare_bindings(session_id.clone())
1079 .await
1080 .expect("session should prepare initial runtime bindings");
1081
1082 let input = crate::Input::Prompt(crate::PromptInput::new(
1083 "same session pending completion",
1084 None,
1085 ));
1086 let (_outcome, handle) = adapter
1087 .accept_input_with_completion(&session_id, input)
1088 .await
1089 .expect("input should be accepted");
1090 let handle = handle.expect("input should have a completion waiter");
1091 adapter
1092 .stop_runtime_executor(&session_id, "first runtime stopped")
1093 .await
1094 .expect("stop should resolve waiter");
1095 let (_outcome, stale_observation) = handle
1096 .try_wait_with_cleanup_observation()
1097 .await
1098 .expect("waiter should resolve with generated cleanup observation");
1099
1100 adapter.unregister_session(&session_id).await;
1101 adapter
1102 .prepare_bindings(session_id.clone())
1103 .await
1104 .expect("session should prepare replacement runtime bindings");
1105
1106 let err = adapter
1107 .resolve_runtime_completion_cleanup(
1108 &session_id,
1109 stale_observation,
1110 false,
1111 crate::meerkat_machine::dsl::RuntimeCompletionLiveSessionObservation::Absent,
1112 )
1113 .await
1114 .expect_err("cleanup must reject an observation minted for a prior runtime binding");
1115 assert!(
1116 matches!(err, crate::RuntimeDriverError::ValidationFailed { .. }),
1117 "expected generated cleanup validation failure, got {err:?}"
1118 );
1119 }
1120
1121 #[tokio::test]
1122 async fn wait_failure_authority_releases_pre_admission_and_classifies_public_reason() {
1123 let adapter = crate::meerkat_machine::MeerkatMachine::ephemeral();
1124 let session_id = SessionId::new();
1125 adapter
1126 .prepare_bindings(session_id.clone())
1127 .await
1128 .expect("session should prepare runtime bindings");
1129
1130 let authority = adapter
1131 .resolve_runtime_completion_wait_failure(
1132 &session_id,
1133 &CompletionWaitError::AuthorityUnavailable("missing generated result".into()),
1134 )
1135 .await
1136 .expect("wait-failure authority should resolve");
1137
1138 assert!(authority.releases_pre_admission());
1139 assert_eq!(
1140 authority.public_error_class,
1141 crate::meerkat_machine::dsl::RuntimeCompletionWaitFailurePublicErrorClass::InternalError
1142 );
1143 assert_eq!(
1144 authority.public_reason,
1145 crate::meerkat_machine::dsl::RuntimeCompletionWaitFailurePublicReason::CompletionAuthorityUnavailable
1146 );
1147 assert!(!authority.resumable);
1148 }
1149
1150 #[tokio::test]
1151 async fn wait_failure_authority_rejects_missing_session_authority() {
1152 let adapter = crate::meerkat_machine::MeerkatMachine::ephemeral();
1153 let session_id = SessionId::new();
1154
1155 let err = adapter
1156 .resolve_runtime_completion_wait_failure(
1157 &session_id,
1158 &CompletionWaitError::ChannelClosed,
1159 )
1160 .await
1161 .expect_err("wait-failure authority must fail closed without a session authority");
1162
1163 assert!(
1164 matches!(err, crate::RuntimeDriverError::ValidationFailed { .. }),
1165 "expected generated wait-failure validation failure, got {err:?}"
1166 );
1167 }
1168
1169 #[tokio::test]
1170 async fn resolve_nonexistent_is_a_noop() {
1171 let mut registry = CompletionRegistry::new();
1172 registry.resolve_completed_authorized(
1173 &InputId::new(),
1174 make_run_result(),
1175 authority(
1176 RuntimeCompletionResultClass::Completed,
1177 RuntimeCompletionObservedOutcome::Completed,
1178 ),
1179 );
1180 registry.fail_inputs(
1181 [InputId::new()],
1182 CompletionWaitError::AuthorityUnavailable("gone".into()),
1183 );
1184 assert!(!registry.debug_has_waiters());
1185 }
1186
1187 #[tokio::test]
1188 async fn dropped_sender_gives_wait_error() {
1189 let mut registry = CompletionRegistry::new();
1190 let input_id = InputId::new();
1191 let handle = registry.register(input_id);
1192
1193 drop(registry);
1195
1196 assert!(matches!(
1197 handle.try_wait().await,
1198 Err(CompletionWaitError::ChannelClosed)
1199 ));
1200 }
1201
1202 #[tokio::test]
1203 async fn multi_waiter_all_receive_result() {
1204 let mut registry = CompletionRegistry::new();
1205 let input_id = InputId::new();
1206
1207 let h1 = registry.register(input_id.clone());
1208 let h2 = registry.register(input_id.clone());
1209 let h3 = registry.register(input_id.clone());
1210
1211 assert_eq!(registry.debug_waiter_count(), 3);
1212
1213 let result = make_run_result();
1214 registry.resolve_completed_authorized(
1215 &input_id,
1216 result,
1217 authority(
1218 RuntimeCompletionResultClass::Completed,
1219 RuntimeCompletionObservedOutcome::Completed,
1220 ),
1221 );
1222
1223 assert!(!registry.debug_has_waiters());
1224
1225 for handle in [h1, h2, h3] {
1226 match handle.wait_authorized().await {
1227 CompletionOutcome::Completed(r) => assert_eq!(r.text, "hello"),
1228 other => panic!("Expected Completed, got {other:?}"),
1229 }
1230 }
1231 }
1232
1233 #[tokio::test]
1234 async fn resolve_without_result_sends_variant() {
1235 let mut registry = CompletionRegistry::new();
1236 let input_id = InputId::new();
1237 let handle = registry.register(input_id.clone());
1238
1239 registry.resolve_without_result_authorized(
1240 &input_id,
1241 authority(
1242 RuntimeCompletionResultClass::CompletedWithoutResult,
1243 RuntimeCompletionObservedOutcome::CompletedWithoutResult,
1244 ),
1245 );
1246
1247 match handle.wait_authorized().await {
1248 CompletionOutcome::CompletedWithoutResult => {}
1249 other => panic!("Expected CompletedWithoutResult, got {other:?}"),
1250 }
1251 }
1252
1253 #[tokio::test]
1254 async fn resolve_without_result_multi_waiter() {
1255 let mut registry = CompletionRegistry::new();
1256 let input_id = InputId::new();
1257 let h1 = registry.register(input_id.clone());
1258 let h2 = registry.register(input_id.clone());
1259
1260 registry.resolve_without_result_authorized(
1261 &input_id,
1262 authority(
1263 RuntimeCompletionResultClass::CompletedWithoutResult,
1264 RuntimeCompletionObservedOutcome::CompletedWithoutResult,
1265 ),
1266 );
1267
1268 for handle in [h1, h2] {
1269 match handle.wait_authorized().await {
1270 CompletionOutcome::CompletedWithoutResult => {}
1271 other => panic!("Expected CompletedWithoutResult, got {other:?}"),
1272 }
1273 }
1274 }
1275
1276 #[tokio::test]
1277 async fn resolve_callback_pending_sends_variant() {
1278 let mut registry = CompletionRegistry::new();
1279 let input_id = InputId::new();
1280 let handle = registry.register(input_id.clone());
1281
1282 registry.resolve_callback_pending_authorized(
1283 &input_id,
1284 "browser".to_string(),
1285 serde_json::json!({ "url": "https://example.com" }),
1286 authority(
1287 RuntimeCompletionResultClass::CallbackPending,
1288 RuntimeCompletionObservedOutcome::CallbackPending,
1289 ),
1290 );
1291
1292 match handle.wait_authorized().await {
1293 CompletionOutcome::CallbackPending { tool_name, args } => {
1294 assert_eq!(tool_name, "browser");
1295 assert_eq!(args, serde_json::json!({ "url": "https://example.com" }));
1296 }
1297 other => panic!("Expected CallbackPending, got {other:?}"),
1298 }
1299 }
1300
1301 #[tokio::test]
1302 async fn resolve_cancelled_sends_variant() {
1303 let mut registry = CompletionRegistry::new();
1304 let input_id = InputId::new();
1305 let handle = registry.register(input_id.clone());
1306
1307 registry.resolve_cancelled_authorized(
1308 &input_id,
1309 authority(
1310 RuntimeCompletionResultClass::Cancelled,
1311 RuntimeCompletionObservedOutcome::Cancelled,
1312 ),
1313 );
1314
1315 match handle.wait_authorized().await {
1316 CompletionOutcome::Cancelled => {}
1317 other => panic!("Expected Cancelled, got {other:?}"),
1318 }
1319 }
1320
1321 #[tokio::test]
1322 async fn already_resolved_handle() {
1323 let handle = CompletionHandle::already_completed_without_result()
1324 .expect("generated completion authority should classify no-result completion");
1325 match handle.wait_authorized().await {
1326 CompletionOutcome::CompletedWithoutResult => {}
1327 other => panic!("Expected CompletedWithoutResult, got {other:?}"),
1328 }
1329 }
1330
1331 #[tokio::test]
1332 async fn outcome_cleanup_observes_and_relays_result() {
1333 use std::sync::Arc;
1334 use std::sync::atomic::{AtomicBool, Ordering};
1335
1336 let mut registry = CompletionRegistry::new();
1337 let input_id = InputId::new();
1338 let handle = registry.register(input_id.clone());
1339 let observed = Arc::new(AtomicBool::new(false));
1340 let cleanup_observed = Arc::clone(&observed);
1341 let handle = handle.with_outcome_cleanup(move |observation| async move {
1342 if observation.observed_outcome()
1343 == crate::meerkat_machine::dsl::RuntimeCompletionObservedOutcome::CompletedWithoutResult
1344 {
1345 cleanup_observed.store(true, Ordering::Release);
1346 }
1347 });
1348
1349 registry.resolve_without_result_authorized(
1350 &input_id,
1351 authority(
1352 RuntimeCompletionResultClass::CompletedWithoutResult,
1353 RuntimeCompletionObservedOutcome::CompletedWithoutResult,
1354 ),
1355 );
1356 match handle.wait_authorized().await {
1357 CompletionOutcome::CompletedWithoutResult => {}
1358 other => panic!("Expected CompletedWithoutResult, got {other:?}"),
1359 }
1360 assert!(observed.load(Ordering::Acquire));
1361 }
1362
1363 #[tokio::test]
1364 async fn multi_waiter_terminated_on_reset() {
1365 let mut registry = CompletionRegistry::new();
1366 let input_id = InputId::new();
1367 let h1 = registry.register(input_id.clone());
1368 let h2 = registry.register(input_id);
1369
1370 registry.resolve_all_runtime_terminated(
1371 "runtime reset",
1372 authority(
1373 RuntimeCompletionResultClass::RuntimeTerminated,
1374 RuntimeCompletionObservedOutcome::RuntimeTerminated,
1375 ),
1376 );
1377
1378 for handle in [h1, h2] {
1379 match handle.wait_authorized().await {
1380 CompletionOutcome::RuntimeTerminated { reason, .. } => {
1381 assert_eq!(reason, "runtime reset");
1382 }
1383 other => panic!("Expected RuntimeTerminated, got {other:?}"),
1384 }
1385 }
1386 }
1387
1388 #[tokio::test]
1389 async fn resolve_not_pending_keeps_pending_waiters() {
1390 let mut registry = CompletionRegistry::new();
1391 let keep_id = InputId::new();
1392 let drop_id = InputId::new();
1393
1394 let keep_handle = registry.register(keep_id.clone());
1395 let drop_handle = registry.register(drop_id.clone());
1396 registry.resolve_not_pending_runtime_terminated(
1397 |input_id| input_id == &keep_id,
1398 "runtime recycled",
1399 authority(
1400 RuntimeCompletionResultClass::RuntimeTerminated,
1401 RuntimeCompletionObservedOutcome::RuntimeTerminated,
1402 ),
1403 );
1404 assert_eq!(registry.debug_waiter_count(), 1);
1405
1406 match drop_handle.wait_authorized().await {
1407 CompletionOutcome::RuntimeTerminated { reason, .. } => {
1408 assert_eq!(reason, "runtime recycled");
1409 }
1410 other => panic!("Expected RuntimeTerminated, got {other:?}"),
1411 }
1412
1413 registry.resolve_without_result_authorized(
1414 &keep_id,
1415 authority(
1416 RuntimeCompletionResultClass::CompletedWithoutResult,
1417 RuntimeCompletionObservedOutcome::CompletedWithoutResult,
1418 ),
1419 );
1420 match keep_handle.wait_authorized().await {
1421 CompletionOutcome::CompletedWithoutResult => {}
1422 other => panic!("Expected CompletedWithoutResult, got {other:?}"),
1423 }
1424 }
1425
1426 #[tokio::test]
1427 async fn mismatched_not_pending_runtime_terminated_authority_fails_selected_waiters_closed() {
1428 let mut registry = CompletionRegistry::new();
1429 let keep_id = InputId::new();
1430 let drop_id = InputId::new();
1431
1432 let keep_handle = registry.register(keep_id.clone());
1433 let drop_handle = registry.register(drop_id);
1434 registry.resolve_not_pending_runtime_terminated(
1435 |input_id| input_id == &keep_id,
1436 "runtime recycled",
1437 authority(
1438 RuntimeCompletionResultClass::Completed,
1439 RuntimeCompletionObservedOutcome::Completed,
1440 ),
1441 );
1442
1443 assert_eq!(registry.debug_waiter_count(), 1);
1444 match drop_handle.try_wait().await {
1445 Err(CompletionWaitError::AuthorityUnavailable(reason)) => {
1446 assert!(reason.contains("Completed"));
1447 assert!(reason.contains("RuntimeTerminated"));
1448 }
1449 other => panic!("Expected authority mismatch wait error, got {other:?}"),
1450 }
1451
1452 registry.resolve_without_result_authorized(
1453 &keep_id,
1454 authority(
1455 RuntimeCompletionResultClass::CompletedWithoutResult,
1456 RuntimeCompletionObservedOutcome::CompletedWithoutResult,
1457 ),
1458 );
1459 match keep_handle.wait_authorized().await {
1460 CompletionOutcome::CompletedWithoutResult => {}
1461 other => panic!("Expected CompletedWithoutResult, got {other:?}"),
1462 }
1463 }
1464
1465 #[tokio::test]
1466 async fn resolve_without_result_nonexistent_is_a_noop() {
1467 let mut registry = CompletionRegistry::new();
1468 registry.resolve_without_result_authorized(
1469 &InputId::new(),
1470 authority(
1471 RuntimeCompletionResultClass::CompletedWithoutResult,
1472 RuntimeCompletionObservedOutcome::CompletedWithoutResult,
1473 ),
1474 );
1475 assert!(!registry.debug_has_waiters());
1476 }
1477
1478 #[test]
1479 fn abandoned_carries_typed_error_metadata() {
1480 let error = TurnErrorMetadata::runtime_apply_failure("apply blew up");
1481 let outcome = CompletionOutcome::Abandoned {
1482 reason: "abandoned".into(),
1483 error: error.clone(),
1484 };
1485 assert_eq!(outcome.abandoned_reason(), Some("abandoned"));
1486 assert_eq!(outcome.error_metadata(), Some(&error));
1487 }
1488
1489 #[test]
1490 fn runtime_terminated_carries_typed_error_metadata() {
1491 let outcome = CompletionOutcome::runtime_terminated("runtime stopped");
1492 match &outcome {
1493 CompletionOutcome::RuntimeTerminated { reason, .. } => {
1494 assert_eq!(reason, "runtime stopped");
1495 }
1496 other => panic!("Expected RuntimeTerminated, got {other:?}"),
1497 }
1498 let metadata = outcome
1499 .error_metadata()
1500 .expect("RuntimeTerminated must carry typed turn error metadata");
1501 assert_eq!(metadata.kind, TurnTerminalCauseKind::FatalFailure);
1502 assert_eq!(metadata.outcome, Some(TurnTerminalOutcome::Failed));
1503 assert!(metadata.terminal);
1504 assert_eq!(metadata.detail.as_deref(), Some("runtime stopped"));
1505 }
1506}