Skip to main content

meerkat_runtime/
completion.rs

1//! Input completion waiters — allows callers to await terminal outcome of an accepted input.
2//!
3//! When a surface accepts an input via the runtime, it can optionally receive a
4//! `CompletionHandle` that resolves when the input reaches a terminal state
5//! (Consumed or Abandoned). This bridges the async accept/await pattern needed
6//! for surfaces that want synchronous-feeling turn execution through the runtime.
7//!
8//! `CompletionRegistry` is waiter plumbing only. Production code must never
9//! treat waiter presence, waiter counts, or sender membership as semantic
10//! runtime truth.
11
12use std::collections::HashMap;
13use std::future::Future;
14
15use meerkat_core::lifecycle::InputId;
16#[cfg(test)]
17use meerkat_core::lifecycle::RunId;
18use meerkat_core::types::{RunResult, SessionId};
19use meerkat_core::{TurnErrorMetadata, TurnTerminalCauseKind, TurnTerminalOutcome};
20use serde_json::Value;
21
22use crate::meerkat_machine::driver::RuntimeCompletionResultAuthority;
23use crate::meerkat_machine::dsl::RuntimeCompletionResultClass;
24use crate::tokio::sync::oneshot;
25
26/// Mechanical failure while waiting for completion plumbing.
27///
28/// This is intentionally separate from [`CompletionOutcome`]: a closed waiter
29/// channel or missing generated completion authority is not a public runtime
30/// result class.
31#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
32pub enum CompletionWaitError {
33    #[error("completion channel closed without an authorized result")]
34    ChannelClosed,
35    #[error("{0}")]
36    AuthorityUnavailable(String),
37}
38
39impl CompletionWaitError {
40    pub fn wait_failure_observation(
41        &self,
42    ) -> crate::meerkat_machine::dsl::RuntimeCompletionWaitFailureObservation {
43        match self {
44            Self::ChannelClosed => {
45                crate::meerkat_machine::dsl::RuntimeCompletionWaitFailureObservation::ChannelClosed
46            }
47            Self::AuthorityUnavailable(_) => {
48                crate::meerkat_machine::dsl::RuntimeCompletionWaitFailureObservation::AuthorityUnavailable
49            }
50        }
51    }
52}
53
54/// Outcome delivered to a completion waiter.
55#[derive(Debug)]
56pub enum CompletionOutcome {
57    /// The input was successfully consumed and produced a result.
58    Completed(Box<RunResult>),
59    /// The input was consumed but produced no RunResult (e.g. context-append ops).
60    CompletedWithoutResult,
61    /// The input reached a callback boundary and requires external tool
62    /// fulfillment before the turn can continue.
63    CallbackPending { tool_name: String, args: Value },
64    /// The input reached the canonical cancellation terminal.
65    Cancelled,
66    /// The input was abandoned before completing, carrying typed failure
67    /// metadata so every surface sees the same structured turn error the
68    /// sibling [`AbandonedWithError`](Self::AbandonedWithError) carries.
69    Abandoned {
70        reason: String,
71        error: TurnErrorMetadata,
72    },
73    /// The input was abandoned before completing, with typed failure metadata.
74    AbandonedWithError {
75        reason: String,
76        error: TurnErrorMetadata,
77    },
78    /// The turn produced output, but a later runtime finalization step (the
79    /// durable commit) failed, so the run is NOT durably terminal. The produced
80    /// result is deliberately NOT carried on this outcome: a finalization
81    /// failure must be treated as failure by every surface, never surfaced as a
82    /// usable success result (that would be a false belief of success).
83    CompletedWithFinalizationFailure { error: TurnErrorMetadata },
84    /// The runtime was stopped or destroyed while the input was pending,
85    /// carrying typed failure metadata describing the termination cause.
86    RuntimeTerminated {
87        reason: String,
88        error: TurnErrorMetadata,
89    },
90}
91
92/// Runtime-minted observation for post-completion cleanup.
93///
94/// Cleanup code can inspect this generated-facing observation, but it cannot
95/// rewrite the public [`CompletionOutcome`] that the waiter received.
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct CompletionCleanupObservation {
98    owner_session_id: SessionId,
99    owner_agent_runtime_id: Option<crate::meerkat_machine::dsl::AgentRuntimeId>,
100    owner_fence_token: Option<crate::meerkat_machine::dsl::FenceToken>,
101    owner_runtime_generation: Option<crate::meerkat_machine::dsl::Generation>,
102    owner_runtime_epoch_id: Option<crate::meerkat_machine::dsl::RuntimeEpochId>,
103    observed_outcome: crate::meerkat_machine::dsl::RuntimeCompletionObservedOutcome,
104}
105
106impl CompletionCleanupObservation {
107    fn from_authority(authority: RuntimeCompletionResultAuthority) -> Self {
108        Self {
109            owner_session_id: authority.session_id().clone(),
110            owner_agent_runtime_id: authority.agent_runtime_id().cloned(),
111            owner_fence_token: authority.fence_token(),
112            owner_runtime_generation: authority.runtime_generation(),
113            owner_runtime_epoch_id: authority.runtime_epoch_id().cloned(),
114            observed_outcome: authority.cleanup_observation(),
115        }
116    }
117
118    pub(crate) fn owner_session_id(&self) -> &SessionId {
119        &self.owner_session_id
120    }
121
122    pub(crate) fn owner_agent_runtime_id(
123        &self,
124    ) -> Option<&crate::meerkat_machine::dsl::AgentRuntimeId> {
125        self.owner_agent_runtime_id.as_ref()
126    }
127
128    pub(crate) fn owner_fence_token(&self) -> Option<crate::meerkat_machine::dsl::FenceToken> {
129        self.owner_fence_token
130    }
131
132    pub(crate) fn owner_runtime_generation(
133        &self,
134    ) -> Option<crate::meerkat_machine::dsl::Generation> {
135        self.owner_runtime_generation
136    }
137
138    pub(crate) fn owner_runtime_epoch_id(
139        &self,
140    ) -> Option<&crate::meerkat_machine::dsl::RuntimeEpochId> {
141        self.owner_runtime_epoch_id.as_ref()
142    }
143
144    pub(crate) fn observed_outcome(
145        &self,
146    ) -> crate::meerkat_machine::dsl::RuntimeCompletionObservedOutcome {
147        self.observed_outcome
148    }
149}
150
151/// Result carried on completion waiter plumbing after generated authority has
152/// selected both the public result class and cleanup observation.
153#[derive(Debug)]
154struct CompletionDelivery {
155    outcome: CompletionOutcome,
156    cleanup_observation: CompletionCleanupObservation,
157}
158
159/// Snapshot of one input's registered completion waiters.
160///
161/// This is a diagnostic/supporting-carrier view only. Waiter counts are never
162/// semantic runtime truth.
163#[derive(Debug, Clone, PartialEq, Eq)]
164pub struct CompletionWaiterEntrySnapshot {
165    pub input_id: InputId,
166    pub waiter_count: usize,
167}
168
169/// Diagnostic snapshot of the completion waiter registry.
170///
171/// This makes the carrier explicit for MeerkatMachine mapping work without
172/// promoting waiter plumbing into canonical runtime semantics.
173#[derive(Debug, Clone, PartialEq, Eq, Default)]
174pub struct CompletionRegistrySnapshot {
175    pub input_count: usize,
176    pub waiter_count: usize,
177    pub waiting_inputs: Vec<CompletionWaiterEntrySnapshot>,
178}
179
180/// Handle for awaiting the completion of an accepted input.
181#[derive(Debug)]
182pub struct CompletionHandle {
183    rx: oneshot::Receiver<Result<CompletionDelivery, CompletionWaitError>>,
184}
185
186impl CompletionHandle {
187    async fn try_wait_delivery(self) -> Result<CompletionDelivery, CompletionWaitError> {
188        self.rx
189            .await
190            .unwrap_or(Err(CompletionWaitError::ChannelClosed))
191    }
192
193    /// Wait for the input to reach a terminal state or report mechanical waiter failure.
194    pub async fn try_wait(self) -> Result<CompletionOutcome, CompletionWaitError> {
195        self.try_wait_delivery()
196            .await
197            .map(|delivery| delivery.outcome)
198    }
199
200    /// Wait for completion and return the generated cleanup observation carried
201    /// with the authorized public outcome.
202    pub async fn try_wait_with_cleanup_observation(
203        self,
204    ) -> Result<(CompletionOutcome, CompletionCleanupObservation), CompletionWaitError> {
205        let delivery = self.try_wait_delivery().await?;
206        Ok((delivery.outcome, delivery.cleanup_observation))
207    }
208
209    /// Wait for the input to reach a terminal state or report mechanical waiter failure.
210    pub async fn wait(self) -> Result<CompletionOutcome, CompletionWaitError> {
211        self.try_wait().await
212    }
213
214    /// Wait for a test handle that is expected to resolve through generated authority.
215    #[cfg(test)]
216    pub(crate) async fn wait_authorized(self) -> CompletionOutcome {
217        self.wait()
218            .await
219            .expect("completion waiter closed without an authorized result")
220    }
221
222    /// Relay completion through a cleanup future before resolving the returned
223    /// handle. This lets surfaces transfer cleanup ownership immediately after
224    /// accepting runtime work while still returning a completion handle.
225    pub fn with_cleanup<F, Fut>(self, cleanup: F) -> Self
226    where
227        F: FnOnce() -> Fut + Send + 'static,
228        Fut: Future<Output = ()> + Send + 'static,
229    {
230        let (tx, rx) = oneshot::channel();
231        crate::tokio::spawn(async move {
232            let outcome = self.try_wait_delivery().await;
233            cleanup().await;
234            let _ = tx.send(outcome);
235        });
236        Self { rx }
237    }
238
239    /// Relay completion through a cleanup future that can inspect the outcome.
240    ///
241    /// The cleanup future receives a runtime-minted cleanup observation and
242    /// cannot replace the completion result.
243    pub fn with_outcome_cleanup<F, Fut>(self, cleanup: F) -> Self
244    where
245        F: FnOnce(CompletionCleanupObservation) -> Fut + Send + 'static,
246        Fut: Future<Output = ()> + Send + 'static,
247    {
248        let (tx, rx) = oneshot::channel();
249        crate::tokio::spawn(async move {
250            let outcome = match self.try_wait_delivery().await {
251                Ok(delivery) => {
252                    cleanup(delivery.cleanup_observation.clone()).await;
253                    Ok(delivery)
254                }
255                Err(error) => Err(error),
256            };
257            let _ = tx.send(outcome);
258        });
259        Self { rx }
260    }
261
262    /// Relay completion through cleanup that can observe either generated
263    /// completion-cleanup evidence or a typed waiter failure.
264    pub fn with_completion_cleanup<F, Fut>(self, cleanup: F) -> Self
265    where
266        F: FnOnce(Result<CompletionCleanupObservation, CompletionWaitError>) -> Fut
267            + Send
268            + 'static,
269        Fut: Future<Output = ()> + Send + 'static,
270    {
271        let (tx, rx) = oneshot::channel();
272        crate::tokio::spawn(async move {
273            let outcome = self.try_wait_delivery().await;
274            match &outcome {
275                Ok(delivery) => cleanup(Ok(delivery.cleanup_observation.clone())).await,
276                Err(error) => cleanup(Err(error.clone())).await,
277            }
278            let _ = tx.send(outcome);
279        });
280        Self { rx }
281    }
282
283    #[cfg(test)]
284    fn already_resolved_internal(
285        outcome: CompletionOutcome,
286        authority: RuntimeCompletionResultAuthority,
287    ) -> Self {
288        let (tx, rx) = oneshot::channel();
289        let _ = tx.send(Ok(CompletionDelivery {
290            outcome,
291            cleanup_observation: CompletionCleanupObservation::from_authority(authority),
292        }));
293        Self { rx }
294    }
295
296    #[cfg(test)]
297    pub(crate) fn already_resolved_with_generated_class(
298        outcome: CompletionOutcome,
299        expected_class: crate::meerkat_machine::dsl::RuntimeCompletionResultClass,
300        terminal: crate::meerkat_machine::dsl::RuntimeCompletionTerminalObservation,
301        finalization: crate::meerkat_machine::dsl::RuntimeCompletionFinalizationObservation,
302    ) -> Result<Self, crate::RuntimeDriverError> {
303        let run_id = if terminal
304            == crate::meerkat_machine::dsl::RuntimeCompletionTerminalObservation::RuntimeTerminated
305        {
306            None
307        } else {
308            Some(RunId::new())
309        };
310        let authority =
311            crate::meerkat_machine::driver::machine_resolve_pre_resolved_runtime_completion_result(
312                run_id.as_ref(),
313                terminal,
314                finalization,
315            )?;
316        if !authority.allows(expected_class) {
317            return Err(crate::RuntimeDriverError::Internal(format!(
318                "generated runtime completion authority returned {:?}, expected {expected_class:?}",
319                authority.class()
320            )));
321        }
322        Ok(Self::already_resolved_internal(outcome, authority))
323    }
324
325    #[cfg(test)]
326    pub(crate) fn already_completed_without_result() -> Result<Self, crate::RuntimeDriverError> {
327        Self::already_resolved_with_generated_class(
328            CompletionOutcome::CompletedWithoutResult,
329            crate::meerkat_machine::dsl::RuntimeCompletionResultClass::CompletedWithoutResult,
330            crate::meerkat_machine::dsl::RuntimeCompletionTerminalObservation::NoResult,
331            crate::meerkat_machine::dsl::RuntimeCompletionFinalizationObservation::Succeeded,
332        )
333    }
334
335    #[cfg(test)]
336    pub(crate) fn already_runtime_apply_failed(
337        reason: String,
338        error: TurnErrorMetadata,
339    ) -> Result<Self, crate::RuntimeDriverError> {
340        Self::already_resolved_with_generated_class(
341            CompletionOutcome::AbandonedWithError { reason, error },
342            crate::meerkat_machine::dsl::RuntimeCompletionResultClass::AbandonedWithError,
343            crate::meerkat_machine::dsl::RuntimeCompletionTerminalObservation::NoResult,
344            crate::meerkat_machine::dsl::RuntimeCompletionFinalizationObservation::Failed,
345        )
346    }
347
348    #[cfg(test)]
349    pub(crate) fn already_runtime_terminated(
350        reason: String,
351    ) -> Result<Self, crate::RuntimeDriverError> {
352        Self::already_resolved_with_generated_class(
353            CompletionOutcome::runtime_terminated(&reason),
354            crate::meerkat_machine::dsl::RuntimeCompletionResultClass::RuntimeTerminated,
355            crate::meerkat_machine::dsl::RuntimeCompletionTerminalObservation::RuntimeTerminated,
356            crate::meerkat_machine::dsl::RuntimeCompletionFinalizationObservation::Succeeded,
357        )
358    }
359
360    #[cfg(test)]
361    pub(crate) fn already_callback_pending(
362        tool_name: String,
363        args: Value,
364    ) -> Result<Self, crate::RuntimeDriverError> {
365        Self::already_resolved_with_generated_class(
366            CompletionOutcome::CallbackPending { tool_name, args },
367            crate::meerkat_machine::dsl::RuntimeCompletionResultClass::CallbackPending,
368            crate::meerkat_machine::dsl::RuntimeCompletionTerminalObservation::CallbackPending,
369            crate::meerkat_machine::dsl::RuntimeCompletionFinalizationObservation::Succeeded,
370        )
371    }
372}
373
374impl CompletionOutcome {
375    /// Mint a [`RuntimeTerminated`](Self::RuntimeTerminated) outcome from a
376    /// termination reason, attaching the typed terminal failure metadata every
377    /// surface keys off (runtime stop/destroy is a fatal terminal boundary).
378    fn runtime_terminated(reason: &str) -> Self {
379        Self::RuntimeTerminated {
380            reason: reason.to_string(),
381            error: TurnErrorMetadata::terminal(
382                TurnTerminalCauseKind::FatalFailure,
383                TurnTerminalOutcome::Failed,
384                reason,
385            ),
386        }
387    }
388
389    pub fn abandoned_reason(&self) -> Option<&str> {
390        match self {
391            Self::Abandoned { reason, .. } | Self::AbandonedWithError { reason, .. } => {
392                Some(reason)
393            }
394            _ => None,
395        }
396    }
397
398    pub fn error_metadata(&self) -> Option<&TurnErrorMetadata> {
399        match self {
400            Self::Abandoned { error, .. }
401            | Self::AbandonedWithError { error, .. }
402            | Self::CompletedWithFinalizationFailure { error, .. }
403            | Self::RuntimeTerminated { error, .. } => Some(error),
404            _ => None,
405        }
406    }
407}
408
409/// Registry of pending completion waiters, keyed by InputId.
410///
411/// Uses `Vec<Sender>` per InputId to support multiple waiters for the same input
412/// (e.g. dedup of in-flight input registers a second waiter for the same InputId).
413#[derive(Default)]
414pub(crate) struct CompletionRegistry {
415    waiters:
416        HashMap<InputId, Vec<oneshot::Sender<Result<CompletionDelivery, CompletionWaitError>>>>,
417}
418
419impl CompletionRegistry {
420    pub(crate) fn new() -> Self {
421        Self::default()
422    }
423
424    fn take_waiters(
425        &mut self,
426        input_id: &InputId,
427    ) -> Option<Vec<oneshot::Sender<Result<CompletionDelivery, CompletionWaitError>>>> {
428        self.waiters.remove(input_id)
429    }
430
431    fn send_outcome(
432        senders: Vec<oneshot::Sender<Result<CompletionDelivery, CompletionWaitError>>>,
433        outcome: CompletionOutcome,
434        cleanup_observation: CompletionCleanupObservation,
435    ) {
436        for tx in senders {
437            let outcome = match &outcome {
438                CompletionOutcome::Completed(result) => {
439                    CompletionOutcome::Completed(Box::new(result.as_ref().clone()))
440                }
441                CompletionOutcome::CompletedWithoutResult => {
442                    CompletionOutcome::CompletedWithoutResult
443                }
444                CompletionOutcome::CallbackPending { tool_name, args } => {
445                    CompletionOutcome::CallbackPending {
446                        tool_name: tool_name.clone(),
447                        args: args.clone(),
448                    }
449                }
450                CompletionOutcome::Cancelled => CompletionOutcome::Cancelled,
451                CompletionOutcome::Abandoned { reason, error } => CompletionOutcome::Abandoned {
452                    reason: reason.clone(),
453                    error: error.clone(),
454                },
455                CompletionOutcome::AbandonedWithError { reason, error } => {
456                    CompletionOutcome::AbandonedWithError {
457                        reason: reason.clone(),
458                        error: error.clone(),
459                    }
460                }
461                CompletionOutcome::CompletedWithFinalizationFailure { error } => {
462                    CompletionOutcome::CompletedWithFinalizationFailure {
463                        error: error.clone(),
464                    }
465                }
466                CompletionOutcome::RuntimeTerminated { reason, error } => {
467                    CompletionOutcome::RuntimeTerminated {
468                        reason: reason.clone(),
469                        error: error.clone(),
470                    }
471                }
472            };
473            let _ = tx.send(Ok(CompletionDelivery {
474                outcome,
475                cleanup_observation: cleanup_observation.clone(),
476            }));
477        }
478    }
479
480    fn send_error(
481        senders: Vec<oneshot::Sender<Result<CompletionDelivery, CompletionWaitError>>>,
482        error: CompletionWaitError,
483    ) {
484        for tx in senders {
485            let _ = tx.send(Err(error.clone()));
486        }
487    }
488
489    fn authority_mismatch_error(
490        authority: &RuntimeCompletionResultAuthority,
491        expected: RuntimeCompletionResultClass,
492    ) -> CompletionWaitError {
493        CompletionWaitError::AuthorityUnavailable(format!(
494            "generated runtime completion authority returned {:?}, expected {expected:?}",
495            authority.class()
496        ))
497    }
498
499    fn fail_input_authority_mismatch(
500        &mut self,
501        input_id: &InputId,
502        authority: &RuntimeCompletionResultAuthority,
503        expected: RuntimeCompletionResultClass,
504    ) {
505        if let Some(senders) = self.take_waiters(input_id) {
506            Self::send_error(senders, Self::authority_mismatch_error(authority, expected));
507        }
508    }
509
510    /// Register a waiter for an input. Returns the handle the caller will await.
511    ///
512    /// Multiple waiters can be registered for the same InputId — all will be
513    /// resolved when the input reaches a terminal state.
514    pub(crate) fn register(&mut self, input_id: InputId) -> CompletionHandle {
515        let (tx, rx) = oneshot::channel();
516        self.waiters.entry(input_id).or_default().push(tx);
517        CompletionHandle { rx }
518    }
519
520    /// Resolve all waiters for a completed input.
521    fn resolve_completed(
522        &mut self,
523        input_id: &InputId,
524        result: RunResult,
525        cleanup_observation: CompletionCleanupObservation,
526    ) {
527        if let Some(senders) = self.take_waiters(input_id) {
528            Self::send_outcome(
529                senders,
530                CompletionOutcome::Completed(Box::new(result)),
531                cleanup_observation,
532            );
533        }
534    }
535
536    pub(crate) fn resolve_completed_authorized(
537        &mut self,
538        input_id: &InputId,
539        result: RunResult,
540        authority: RuntimeCompletionResultAuthority,
541    ) {
542        let expected = RuntimeCompletionResultClass::Completed;
543        if !authority.allows(expected) {
544            self.fail_input_authority_mismatch(input_id, &authority, expected);
545            return;
546        }
547        self.resolve_completed(
548            input_id,
549            result,
550            CompletionCleanupObservation::from_authority(authority),
551        );
552    }
553
554    /// Resolve all waiters for an input that completed without producing a RunResult.
555    fn resolve_without_result(
556        &mut self,
557        input_id: &InputId,
558        cleanup_observation: CompletionCleanupObservation,
559    ) {
560        if let Some(senders) = self.take_waiters(input_id) {
561            Self::send_outcome(
562                senders,
563                CompletionOutcome::CompletedWithoutResult,
564                cleanup_observation,
565            );
566        }
567    }
568
569    pub(crate) fn resolve_without_result_authorized(
570        &mut self,
571        input_id: &InputId,
572        authority: RuntimeCompletionResultAuthority,
573    ) {
574        let expected = RuntimeCompletionResultClass::CompletedWithoutResult;
575        if !authority.allows(expected) {
576            self.fail_input_authority_mismatch(input_id, &authority, expected);
577            return;
578        }
579        self.resolve_without_result(
580            input_id,
581            CompletionCleanupObservation::from_authority(authority),
582        );
583    }
584
585    /// Resolve all waiters for an input that reached a callback boundary.
586    fn resolve_callback_pending(
587        &mut self,
588        input_id: &InputId,
589        tool_name: String,
590        args: Value,
591        cleanup_observation: CompletionCleanupObservation,
592    ) {
593        if let Some(senders) = self.take_waiters(input_id) {
594            Self::send_outcome(
595                senders,
596                CompletionOutcome::CallbackPending { tool_name, args },
597                cleanup_observation,
598            );
599        }
600    }
601
602    pub(crate) fn resolve_callback_pending_authorized(
603        &mut self,
604        input_id: &InputId,
605        tool_name: String,
606        args: Value,
607        authority: RuntimeCompletionResultAuthority,
608    ) {
609        let expected = RuntimeCompletionResultClass::CallbackPending;
610        if !authority.allows(expected) {
611            self.fail_input_authority_mismatch(input_id, &authority, expected);
612            return;
613        }
614        self.resolve_callback_pending(
615            input_id,
616            tool_name,
617            args,
618            CompletionCleanupObservation::from_authority(authority),
619        );
620    }
621
622    /// Resolve all waiters for an input that reached the cancellation terminal.
623    fn resolve_cancelled(
624        &mut self,
625        input_id: &InputId,
626        cleanup_observation: CompletionCleanupObservation,
627    ) {
628        if let Some(senders) = self.take_waiters(input_id) {
629            Self::send_outcome(senders, CompletionOutcome::Cancelled, cleanup_observation);
630        }
631    }
632
633    pub(crate) fn resolve_cancelled_authorized(
634        &mut self,
635        input_id: &InputId,
636        authority: RuntimeCompletionResultAuthority,
637    ) {
638        let expected = RuntimeCompletionResultClass::Cancelled;
639        if !authority.allows(expected) {
640            self.fail_input_authority_mismatch(input_id, &authority, expected);
641            return;
642        }
643        self.resolve_cancelled(
644            input_id,
645            CompletionCleanupObservation::from_authority(authority),
646        );
647    }
648
649    /// Resolve all waiters for an abandoned input with typed failure metadata.
650    fn resolve_abandoned_with_error(
651        &mut self,
652        input_id: &InputId,
653        reason: String,
654        error: TurnErrorMetadata,
655        cleanup_observation: CompletionCleanupObservation,
656    ) {
657        if let Some(senders) = self.take_waiters(input_id) {
658            Self::send_outcome(
659                senders,
660                CompletionOutcome::AbandonedWithError { reason, error },
661                cleanup_observation,
662            );
663        }
664    }
665
666    pub(crate) fn resolve_abandoned_with_error_authorized(
667        &mut self,
668        input_id: &InputId,
669        reason: String,
670        error: TurnErrorMetadata,
671        authority: RuntimeCompletionResultAuthority,
672    ) {
673        let expected = RuntimeCompletionResultClass::AbandonedWithError;
674        if !authority.allows(expected) {
675            self.fail_input_authority_mismatch(input_id, &authority, expected);
676            return;
677        }
678        self.resolve_abandoned_with_error(
679            input_id,
680            reason,
681            error,
682            CompletionCleanupObservation::from_authority(authority),
683        );
684    }
685
686    /// Resolve all waiters for a turn whose output exists but finalization
687    /// failed after output production.
688    fn resolve_completed_with_finalization_failure(
689        &mut self,
690        input_id: &InputId,
691        error: TurnErrorMetadata,
692        cleanup_observation: CompletionCleanupObservation,
693    ) {
694        if let Some(senders) = self.take_waiters(input_id) {
695            Self::send_outcome(
696                senders,
697                CompletionOutcome::CompletedWithFinalizationFailure { error },
698                cleanup_observation,
699            );
700        }
701    }
702
703    pub(crate) fn resolve_completed_with_finalization_failure_authorized(
704        &mut self,
705        input_id: &InputId,
706        error: TurnErrorMetadata,
707        authority: RuntimeCompletionResultAuthority,
708    ) {
709        let expected = RuntimeCompletionResultClass::CompletedWithFinalizationFailure;
710        if !authority.allows(expected) {
711            self.fail_input_authority_mismatch(input_id, &authority, expected);
712            return;
713        }
714        self.resolve_completed_with_finalization_failure(
715            input_id,
716            error,
717            CompletionCleanupObservation::from_authority(authority),
718        );
719    }
720
721    /// Resolve all pending waiters with a termination error.
722    ///
723    /// The public termination result class is supplied by generated
724    /// MeerkatMachine authority; this registry method only fans the authorized
725    /// class out to waiter channels.
726    pub(crate) fn resolve_all_runtime_terminated(
727        &mut self,
728        reason: &str,
729        authority: RuntimeCompletionResultAuthority,
730    ) {
731        let expected = RuntimeCompletionResultClass::RuntimeTerminated;
732        if !authority.allows(expected) {
733            let error = Self::authority_mismatch_error(&authority, expected);
734            self.fail_all_waiters(error);
735            return;
736        }
737        let cleanup_observation = CompletionCleanupObservation::from_authority(authority);
738        for (_, senders) in self.waiters.drain() {
739            Self::send_outcome(
740                senders,
741                CompletionOutcome::runtime_terminated(reason),
742                cleanup_observation.clone(),
743            );
744        }
745    }
746
747    pub(crate) fn resolve_inputs_runtime_terminated<I>(
748        &mut self,
749        input_ids: I,
750        reason: &str,
751        authority: RuntimeCompletionResultAuthority,
752    ) where
753        I: IntoIterator<Item = InputId>,
754    {
755        let input_ids: Vec<InputId> = input_ids.into_iter().collect();
756        let expected = RuntimeCompletionResultClass::RuntimeTerminated;
757        if !authority.allows(expected) {
758            let error = Self::authority_mismatch_error(&authority, expected);
759            self.fail_inputs(input_ids, error);
760            return;
761        }
762        let cleanup_observation = CompletionCleanupObservation::from_authority(authority);
763        for input_id in input_ids {
764            if let Some(senders) = self.take_waiters(&input_id) {
765                Self::send_outcome(
766                    senders,
767                    CompletionOutcome::runtime_terminated(reason),
768                    cleanup_observation.clone(),
769                );
770            }
771        }
772    }
773
774    pub(crate) fn fail_all_waiters(&mut self, error: CompletionWaitError) {
775        for (_, senders) in self.waiters.drain() {
776            Self::send_error(senders, error.clone());
777        }
778    }
779
780    pub(crate) fn fail_inputs<I>(&mut self, input_ids: I, error: CompletionWaitError)
781    where
782        I: IntoIterator<Item = InputId>,
783    {
784        for input_id in input_ids {
785            if let Some(senders) = self.take_waiters(&input_id) {
786                Self::send_error(senders, error.clone());
787            }
788        }
789    }
790
791    /// Resolve waiters whose input IDs are no longer pending after a
792    /// lifecycle reconciliation (for example runtime recycle/recovery).
793    pub(crate) fn resolve_not_pending_runtime_terminated<F>(
794        &mut self,
795        mut is_still_pending: F,
796        reason: &str,
797        authority: RuntimeCompletionResultAuthority,
798    ) where
799        F: FnMut(&InputId) -> bool,
800    {
801        let expected = RuntimeCompletionResultClass::RuntimeTerminated;
802        if !authority.allows(expected) {
803            let error = Self::authority_mismatch_error(&authority, expected);
804            self.waiters.retain(|input_id, senders| {
805                if is_still_pending(input_id) {
806                    return true;
807                }
808
809                Self::send_error(std::mem::take(senders), error.clone());
810                false
811            });
812            return;
813        }
814        let cleanup_observation = CompletionCleanupObservation::from_authority(authority);
815        self.waiters.retain(|input_id, senders| {
816            if is_still_pending(input_id) {
817                return true;
818            }
819
820            Self::send_outcome(
821                std::mem::take(senders),
822                CompletionOutcome::runtime_terminated(reason),
823                cleanup_observation.clone(),
824            );
825            false
826        });
827    }
828
829    /// Snapshot the current waiter carrier without mutating it.
830    pub(crate) fn diagnostic_snapshot(&self) -> CompletionRegistrySnapshot {
831        let mut waiting_inputs: Vec<_> = self
832            .waiters
833            .iter()
834            .map(|(input_id, senders)| CompletionWaiterEntrySnapshot {
835                input_id: input_id.clone(),
836                waiter_count: senders.len(),
837            })
838            .collect();
839        waiting_inputs
840            .sort_by(|left, right| left.input_id.to_string().cmp(&right.input_id.to_string()));
841
842        CompletionRegistrySnapshot {
843            input_count: waiting_inputs.len(),
844            waiter_count: waiting_inputs.iter().map(|entry| entry.waiter_count).sum(),
845            waiting_inputs,
846        }
847    }
848
849    /// Check if there are any pending waiters.
850    ///
851    /// Test-only introspection. Production code must treat the registry as
852    /// waiter plumbing rather than semantic runtime truth.
853    #[cfg(test)]
854    pub fn debug_has_waiters(&self) -> bool {
855        !self.waiters.is_empty()
856    }
857
858    /// Number of pending waiters (total across all InputIds).
859    ///
860    /// Test-only introspection. Production code must treat the registry as
861    /// waiter plumbing rather than semantic runtime truth.
862    #[cfg(test)]
863    pub fn debug_waiter_count(&self) -> usize {
864        self.waiters.values().map(Vec::len).sum()
865    }
866}
867
868#[cfg(test)]
869#[allow(clippy::unwrap_used, clippy::panic)]
870mod tests {
871    use super::*;
872    use crate::meerkat_machine::dsl::{
873        RuntimeCompletionObservedOutcome, RuntimeCompletionResultClass,
874    };
875    use meerkat_core::types::{SessionId, Usage};
876
877    fn make_run_result() -> RunResult {
878        RunResult {
879            text: "hello".into(),
880            session_id: SessionId::new(),
881            usage: Usage::default(),
882            turns: 1,
883            tool_calls: 0,
884            terminal_cause_kind: None,
885            structured_output: None,
886            extraction_error: None,
887            schema_warnings: None,
888            skill_diagnostics: None,
889        }
890    }
891
892    fn authority(
893        result_class: RuntimeCompletionResultClass,
894        cleanup_observation: RuntimeCompletionObservedOutcome,
895    ) -> RuntimeCompletionResultAuthority {
896        crate::meerkat_machine::driver::test_runtime_completion_authority(
897            result_class,
898            cleanup_observation,
899        )
900    }
901
902    #[tokio::test]
903    async fn register_and_complete() {
904        let mut registry = CompletionRegistry::new();
905        let input_id = InputId::new();
906        let handle = registry.register(input_id.clone());
907
908        assert!(registry.debug_has_waiters());
909        assert_eq!(registry.debug_waiter_count(), 1);
910
911        let result = make_run_result();
912        registry.resolve_completed_authorized(
913            &input_id,
914            result,
915            authority(
916                RuntimeCompletionResultClass::Completed,
917                RuntimeCompletionObservedOutcome::Completed,
918            ),
919        );
920
921        match handle.wait_authorized().await {
922            CompletionOutcome::Completed(r) => assert_eq!(r.text, "hello"),
923            other => panic!("Expected Completed, got {other:?}"),
924        }
925    }
926
927    #[tokio::test]
928    async fn register_and_fail_waiter() {
929        let mut registry = CompletionRegistry::new();
930        let input_id = InputId::new();
931        let handle = registry.register(input_id.clone());
932
933        registry.fail_inputs(
934            [input_id],
935            CompletionWaitError::AuthorityUnavailable("retired".into()),
936        );
937
938        match handle.try_wait().await {
939            Err(CompletionWaitError::AuthorityUnavailable(reason)) => assert_eq!(reason, "retired"),
940            other => panic!("Expected wait error, got {other:?}"),
941        }
942    }
943
944    #[tokio::test]
945    async fn mismatched_result_authority_fails_waiter_closed() {
946        let mut registry = CompletionRegistry::new();
947        let input_id = InputId::new();
948        let handle = registry.register(input_id.clone());
949
950        registry.resolve_completed_authorized(
951            &input_id,
952            make_run_result(),
953            authority(
954                RuntimeCompletionResultClass::Cancelled,
955                RuntimeCompletionObservedOutcome::Cancelled,
956            ),
957        );
958
959        assert!(!registry.debug_has_waiters());
960        match handle.try_wait().await {
961            Err(CompletionWaitError::AuthorityUnavailable(reason)) => {
962                assert!(reason.contains("Cancelled"));
963                assert!(reason.contains("Completed"));
964            }
965            other => panic!("Expected authority mismatch wait error, got {other:?}"),
966        }
967    }
968
969    #[tokio::test]
970    async fn resolve_all_runtime_terminated() {
971        let mut registry = CompletionRegistry::new();
972        let h1 = registry.register(InputId::new());
973        let h2 = registry.register(InputId::new());
974
975        registry.resolve_all_runtime_terminated(
976            "runtime stopped",
977            authority(
978                RuntimeCompletionResultClass::RuntimeTerminated,
979                RuntimeCompletionObservedOutcome::RuntimeTerminated,
980            ),
981        );
982
983        assert!(!registry.debug_has_waiters());
984
985        match h1.wait_authorized().await {
986            CompletionOutcome::RuntimeTerminated { reason, .. } => {
987                assert_eq!(reason, "runtime stopped");
988            }
989            other => panic!("Expected RuntimeTerminated, got {other:?}"),
990        }
991        match h2.wait_authorized().await {
992            CompletionOutcome::RuntimeTerminated { reason, .. } => {
993                assert_eq!(reason, "runtime stopped");
994            }
995            other => panic!("Expected RuntimeTerminated, got {other:?}"),
996        }
997    }
998
999    #[tokio::test]
1000    async fn mismatched_runtime_terminated_authority_fails_all_waiters_closed() {
1001        let mut registry = CompletionRegistry::new();
1002        let h1 = registry.register(InputId::new());
1003        let h2 = registry.register(InputId::new());
1004
1005        registry.resolve_all_runtime_terminated(
1006            "runtime stopped",
1007            authority(
1008                RuntimeCompletionResultClass::CompletedWithoutResult,
1009                RuntimeCompletionObservedOutcome::CompletedWithoutResult,
1010            ),
1011        );
1012
1013        assert!(!registry.debug_has_waiters());
1014        for handle in [h1, h2] {
1015            match handle.try_wait().await {
1016                Err(CompletionWaitError::AuthorityUnavailable(reason)) => {
1017                    assert!(reason.contains("CompletedWithoutResult"));
1018                    assert!(reason.contains("RuntimeTerminated"));
1019                }
1020                other => panic!("Expected authority mismatch wait error, got {other:?}"),
1021            }
1022        }
1023    }
1024
1025    #[tokio::test]
1026    async fn cleanup_rejects_observation_from_another_session() {
1027        let adapter = crate::meerkat_machine::MeerkatMachine::ephemeral();
1028        let source_session_id = SessionId::new();
1029        let target_session_id = SessionId::new();
1030        adapter
1031            .prepare_bindings(source_session_id.clone())
1032            .await
1033            .expect("source session should prepare runtime bindings");
1034        adapter
1035            .prepare_bindings(target_session_id.clone())
1036            .await
1037            .expect("target session should prepare runtime bindings");
1038
1039        let input = crate::Input::Prompt(crate::PromptInput::new(
1040            "source session pending completion",
1041            None,
1042        ));
1043        let (_outcome, handle) = adapter
1044            .accept_input_with_completion(&source_session_id, input)
1045            .await
1046            .expect("source input should be accepted");
1047        let handle = handle.expect("source input should have a completion waiter");
1048        adapter
1049            .stop_runtime_executor(&source_session_id, "source stopped")
1050            .await
1051            .expect("source stop should resolve waiter");
1052        let (_outcome, observation) = handle
1053            .try_wait_with_cleanup_observation()
1054            .await
1055            .expect("waiter should resolve with generated cleanup observation");
1056
1057        assert_eq!(observation.owner_session_id(), &source_session_id);
1058        let err = adapter
1059            .resolve_runtime_completion_cleanup(
1060                &target_session_id,
1061                observation,
1062                false,
1063                crate::meerkat_machine::dsl::RuntimeCompletionLiveSessionObservation::Absent,
1064            )
1065            .await
1066            .expect_err("cleanup must reject an observation minted for another session");
1067        assert!(
1068            matches!(err, crate::RuntimeDriverError::ValidationFailed { .. }),
1069            "expected generated cleanup validation failure, got {err:?}"
1070        );
1071    }
1072
1073    #[tokio::test]
1074    async fn cleanup_rejects_stale_same_session_observation_after_rebinding() {
1075        let adapter = crate::meerkat_machine::MeerkatMachine::ephemeral();
1076        let session_id = SessionId::new();
1077        adapter
1078            .prepare_bindings(session_id.clone())
1079            .await
1080            .expect("session should prepare initial runtime bindings");
1081
1082        let input = crate::Input::Prompt(crate::PromptInput::new(
1083            "same session pending completion",
1084            None,
1085        ));
1086        let (_outcome, handle) = adapter
1087            .accept_input_with_completion(&session_id, input)
1088            .await
1089            .expect("input should be accepted");
1090        let handle = handle.expect("input should have a completion waiter");
1091        adapter
1092            .stop_runtime_executor(&session_id, "first runtime stopped")
1093            .await
1094            .expect("stop should resolve waiter");
1095        let (_outcome, stale_observation) = handle
1096            .try_wait_with_cleanup_observation()
1097            .await
1098            .expect("waiter should resolve with generated cleanup observation");
1099
1100        adapter.unregister_session(&session_id).await;
1101        adapter
1102            .prepare_bindings(session_id.clone())
1103            .await
1104            .expect("session should prepare replacement runtime bindings");
1105
1106        let err = adapter
1107            .resolve_runtime_completion_cleanup(
1108                &session_id,
1109                stale_observation,
1110                false,
1111                crate::meerkat_machine::dsl::RuntimeCompletionLiveSessionObservation::Absent,
1112            )
1113            .await
1114            .expect_err("cleanup must reject an observation minted for a prior runtime binding");
1115        assert!(
1116            matches!(err, crate::RuntimeDriverError::ValidationFailed { .. }),
1117            "expected generated cleanup validation failure, got {err:?}"
1118        );
1119    }
1120
1121    #[tokio::test]
1122    async fn wait_failure_authority_releases_pre_admission_and_classifies_public_reason() {
1123        let adapter = crate::meerkat_machine::MeerkatMachine::ephemeral();
1124        let session_id = SessionId::new();
1125        adapter
1126            .prepare_bindings(session_id.clone())
1127            .await
1128            .expect("session should prepare runtime bindings");
1129
1130        let authority = adapter
1131            .resolve_runtime_completion_wait_failure(
1132                &session_id,
1133                &CompletionWaitError::AuthorityUnavailable("missing generated result".into()),
1134            )
1135            .await
1136            .expect("wait-failure authority should resolve");
1137
1138        assert!(authority.releases_pre_admission());
1139        assert_eq!(
1140            authority.public_error_class,
1141            crate::meerkat_machine::dsl::RuntimeCompletionWaitFailurePublicErrorClass::InternalError
1142        );
1143        assert_eq!(
1144            authority.public_reason,
1145            crate::meerkat_machine::dsl::RuntimeCompletionWaitFailurePublicReason::CompletionAuthorityUnavailable
1146        );
1147        assert!(!authority.resumable);
1148    }
1149
1150    #[tokio::test]
1151    async fn wait_failure_authority_rejects_missing_session_authority() {
1152        let adapter = crate::meerkat_machine::MeerkatMachine::ephemeral();
1153        let session_id = SessionId::new();
1154
1155        let err = adapter
1156            .resolve_runtime_completion_wait_failure(
1157                &session_id,
1158                &CompletionWaitError::ChannelClosed,
1159            )
1160            .await
1161            .expect_err("wait-failure authority must fail closed without a session authority");
1162
1163        assert!(
1164            matches!(err, crate::RuntimeDriverError::ValidationFailed { .. }),
1165            "expected generated wait-failure validation failure, got {err:?}"
1166        );
1167    }
1168
1169    #[tokio::test]
1170    async fn resolve_nonexistent_is_a_noop() {
1171        let mut registry = CompletionRegistry::new();
1172        registry.resolve_completed_authorized(
1173            &InputId::new(),
1174            make_run_result(),
1175            authority(
1176                RuntimeCompletionResultClass::Completed,
1177                RuntimeCompletionObservedOutcome::Completed,
1178            ),
1179        );
1180        registry.fail_inputs(
1181            [InputId::new()],
1182            CompletionWaitError::AuthorityUnavailable("gone".into()),
1183        );
1184        assert!(!registry.debug_has_waiters());
1185    }
1186
1187    #[tokio::test]
1188    async fn dropped_sender_gives_wait_error() {
1189        let mut registry = CompletionRegistry::new();
1190        let input_id = InputId::new();
1191        let handle = registry.register(input_id);
1192
1193        // Drop the registry (and thus the sender)
1194        drop(registry);
1195
1196        assert!(matches!(
1197            handle.try_wait().await,
1198            Err(CompletionWaitError::ChannelClosed)
1199        ));
1200    }
1201
1202    #[tokio::test]
1203    async fn multi_waiter_all_receive_result() {
1204        let mut registry = CompletionRegistry::new();
1205        let input_id = InputId::new();
1206
1207        let h1 = registry.register(input_id.clone());
1208        let h2 = registry.register(input_id.clone());
1209        let h3 = registry.register(input_id.clone());
1210
1211        assert_eq!(registry.debug_waiter_count(), 3);
1212
1213        let result = make_run_result();
1214        registry.resolve_completed_authorized(
1215            &input_id,
1216            result,
1217            authority(
1218                RuntimeCompletionResultClass::Completed,
1219                RuntimeCompletionObservedOutcome::Completed,
1220            ),
1221        );
1222
1223        assert!(!registry.debug_has_waiters());
1224
1225        for handle in [h1, h2, h3] {
1226            match handle.wait_authorized().await {
1227                CompletionOutcome::Completed(r) => assert_eq!(r.text, "hello"),
1228                other => panic!("Expected Completed, got {other:?}"),
1229            }
1230        }
1231    }
1232
1233    #[tokio::test]
1234    async fn resolve_without_result_sends_variant() {
1235        let mut registry = CompletionRegistry::new();
1236        let input_id = InputId::new();
1237        let handle = registry.register(input_id.clone());
1238
1239        registry.resolve_without_result_authorized(
1240            &input_id,
1241            authority(
1242                RuntimeCompletionResultClass::CompletedWithoutResult,
1243                RuntimeCompletionObservedOutcome::CompletedWithoutResult,
1244            ),
1245        );
1246
1247        match handle.wait_authorized().await {
1248            CompletionOutcome::CompletedWithoutResult => {}
1249            other => panic!("Expected CompletedWithoutResult, got {other:?}"),
1250        }
1251    }
1252
1253    #[tokio::test]
1254    async fn resolve_without_result_multi_waiter() {
1255        let mut registry = CompletionRegistry::new();
1256        let input_id = InputId::new();
1257        let h1 = registry.register(input_id.clone());
1258        let h2 = registry.register(input_id.clone());
1259
1260        registry.resolve_without_result_authorized(
1261            &input_id,
1262            authority(
1263                RuntimeCompletionResultClass::CompletedWithoutResult,
1264                RuntimeCompletionObservedOutcome::CompletedWithoutResult,
1265            ),
1266        );
1267
1268        for handle in [h1, h2] {
1269            match handle.wait_authorized().await {
1270                CompletionOutcome::CompletedWithoutResult => {}
1271                other => panic!("Expected CompletedWithoutResult, got {other:?}"),
1272            }
1273        }
1274    }
1275
1276    #[tokio::test]
1277    async fn resolve_callback_pending_sends_variant() {
1278        let mut registry = CompletionRegistry::new();
1279        let input_id = InputId::new();
1280        let handle = registry.register(input_id.clone());
1281
1282        registry.resolve_callback_pending_authorized(
1283            &input_id,
1284            "browser".to_string(),
1285            serde_json::json!({ "url": "https://example.com" }),
1286            authority(
1287                RuntimeCompletionResultClass::CallbackPending,
1288                RuntimeCompletionObservedOutcome::CallbackPending,
1289            ),
1290        );
1291
1292        match handle.wait_authorized().await {
1293            CompletionOutcome::CallbackPending { tool_name, args } => {
1294                assert_eq!(tool_name, "browser");
1295                assert_eq!(args, serde_json::json!({ "url": "https://example.com" }));
1296            }
1297            other => panic!("Expected CallbackPending, got {other:?}"),
1298        }
1299    }
1300
1301    #[tokio::test]
1302    async fn resolve_cancelled_sends_variant() {
1303        let mut registry = CompletionRegistry::new();
1304        let input_id = InputId::new();
1305        let handle = registry.register(input_id.clone());
1306
1307        registry.resolve_cancelled_authorized(
1308            &input_id,
1309            authority(
1310                RuntimeCompletionResultClass::Cancelled,
1311                RuntimeCompletionObservedOutcome::Cancelled,
1312            ),
1313        );
1314
1315        match handle.wait_authorized().await {
1316            CompletionOutcome::Cancelled => {}
1317            other => panic!("Expected Cancelled, got {other:?}"),
1318        }
1319    }
1320
1321    #[tokio::test]
1322    async fn already_resolved_handle() {
1323        let handle = CompletionHandle::already_completed_without_result()
1324            .expect("generated completion authority should classify no-result completion");
1325        match handle.wait_authorized().await {
1326            CompletionOutcome::CompletedWithoutResult => {}
1327            other => panic!("Expected CompletedWithoutResult, got {other:?}"),
1328        }
1329    }
1330
1331    #[tokio::test]
1332    async fn outcome_cleanup_observes_and_relays_result() {
1333        use std::sync::Arc;
1334        use std::sync::atomic::{AtomicBool, Ordering};
1335
1336        let mut registry = CompletionRegistry::new();
1337        let input_id = InputId::new();
1338        let handle = registry.register(input_id.clone());
1339        let observed = Arc::new(AtomicBool::new(false));
1340        let cleanup_observed = Arc::clone(&observed);
1341        let handle = handle.with_outcome_cleanup(move |observation| async move {
1342            if observation.observed_outcome()
1343                == crate::meerkat_machine::dsl::RuntimeCompletionObservedOutcome::CompletedWithoutResult
1344            {
1345                cleanup_observed.store(true, Ordering::Release);
1346            }
1347        });
1348
1349        registry.resolve_without_result_authorized(
1350            &input_id,
1351            authority(
1352                RuntimeCompletionResultClass::CompletedWithoutResult,
1353                RuntimeCompletionObservedOutcome::CompletedWithoutResult,
1354            ),
1355        );
1356        match handle.wait_authorized().await {
1357            CompletionOutcome::CompletedWithoutResult => {}
1358            other => panic!("Expected CompletedWithoutResult, got {other:?}"),
1359        }
1360        assert!(observed.load(Ordering::Acquire));
1361    }
1362
1363    #[tokio::test]
1364    async fn multi_waiter_terminated_on_reset() {
1365        let mut registry = CompletionRegistry::new();
1366        let input_id = InputId::new();
1367        let h1 = registry.register(input_id.clone());
1368        let h2 = registry.register(input_id);
1369
1370        registry.resolve_all_runtime_terminated(
1371            "runtime reset",
1372            authority(
1373                RuntimeCompletionResultClass::RuntimeTerminated,
1374                RuntimeCompletionObservedOutcome::RuntimeTerminated,
1375            ),
1376        );
1377
1378        for handle in [h1, h2] {
1379            match handle.wait_authorized().await {
1380                CompletionOutcome::RuntimeTerminated { reason, .. } => {
1381                    assert_eq!(reason, "runtime reset");
1382                }
1383                other => panic!("Expected RuntimeTerminated, got {other:?}"),
1384            }
1385        }
1386    }
1387
1388    #[tokio::test]
1389    async fn resolve_not_pending_keeps_pending_waiters() {
1390        let mut registry = CompletionRegistry::new();
1391        let keep_id = InputId::new();
1392        let drop_id = InputId::new();
1393
1394        let keep_handle = registry.register(keep_id.clone());
1395        let drop_handle = registry.register(drop_id.clone());
1396        registry.resolve_not_pending_runtime_terminated(
1397            |input_id| input_id == &keep_id,
1398            "runtime recycled",
1399            authority(
1400                RuntimeCompletionResultClass::RuntimeTerminated,
1401                RuntimeCompletionObservedOutcome::RuntimeTerminated,
1402            ),
1403        );
1404        assert_eq!(registry.debug_waiter_count(), 1);
1405
1406        match drop_handle.wait_authorized().await {
1407            CompletionOutcome::RuntimeTerminated { reason, .. } => {
1408                assert_eq!(reason, "runtime recycled");
1409            }
1410            other => panic!("Expected RuntimeTerminated, got {other:?}"),
1411        }
1412
1413        registry.resolve_without_result_authorized(
1414            &keep_id,
1415            authority(
1416                RuntimeCompletionResultClass::CompletedWithoutResult,
1417                RuntimeCompletionObservedOutcome::CompletedWithoutResult,
1418            ),
1419        );
1420        match keep_handle.wait_authorized().await {
1421            CompletionOutcome::CompletedWithoutResult => {}
1422            other => panic!("Expected CompletedWithoutResult, got {other:?}"),
1423        }
1424    }
1425
1426    #[tokio::test]
1427    async fn mismatched_not_pending_runtime_terminated_authority_fails_selected_waiters_closed() {
1428        let mut registry = CompletionRegistry::new();
1429        let keep_id = InputId::new();
1430        let drop_id = InputId::new();
1431
1432        let keep_handle = registry.register(keep_id.clone());
1433        let drop_handle = registry.register(drop_id);
1434        registry.resolve_not_pending_runtime_terminated(
1435            |input_id| input_id == &keep_id,
1436            "runtime recycled",
1437            authority(
1438                RuntimeCompletionResultClass::Completed,
1439                RuntimeCompletionObservedOutcome::Completed,
1440            ),
1441        );
1442
1443        assert_eq!(registry.debug_waiter_count(), 1);
1444        match drop_handle.try_wait().await {
1445            Err(CompletionWaitError::AuthorityUnavailable(reason)) => {
1446                assert!(reason.contains("Completed"));
1447                assert!(reason.contains("RuntimeTerminated"));
1448            }
1449            other => panic!("Expected authority mismatch wait error, got {other:?}"),
1450        }
1451
1452        registry.resolve_without_result_authorized(
1453            &keep_id,
1454            authority(
1455                RuntimeCompletionResultClass::CompletedWithoutResult,
1456                RuntimeCompletionObservedOutcome::CompletedWithoutResult,
1457            ),
1458        );
1459        match keep_handle.wait_authorized().await {
1460            CompletionOutcome::CompletedWithoutResult => {}
1461            other => panic!("Expected CompletedWithoutResult, got {other:?}"),
1462        }
1463    }
1464
1465    #[tokio::test]
1466    async fn resolve_without_result_nonexistent_is_a_noop() {
1467        let mut registry = CompletionRegistry::new();
1468        registry.resolve_without_result_authorized(
1469            &InputId::new(),
1470            authority(
1471                RuntimeCompletionResultClass::CompletedWithoutResult,
1472                RuntimeCompletionObservedOutcome::CompletedWithoutResult,
1473            ),
1474        );
1475        assert!(!registry.debug_has_waiters());
1476    }
1477
1478    #[test]
1479    fn abandoned_carries_typed_error_metadata() {
1480        let error = TurnErrorMetadata::runtime_apply_failure("apply blew up");
1481        let outcome = CompletionOutcome::Abandoned {
1482            reason: "abandoned".into(),
1483            error: error.clone(),
1484        };
1485        assert_eq!(outcome.abandoned_reason(), Some("abandoned"));
1486        assert_eq!(outcome.error_metadata(), Some(&error));
1487    }
1488
1489    #[test]
1490    fn runtime_terminated_carries_typed_error_metadata() {
1491        let outcome = CompletionOutcome::runtime_terminated("runtime stopped");
1492        match &outcome {
1493            CompletionOutcome::RuntimeTerminated { reason, .. } => {
1494                assert_eq!(reason, "runtime stopped");
1495            }
1496            other => panic!("Expected RuntimeTerminated, got {other:?}"),
1497        }
1498        let metadata = outcome
1499            .error_metadata()
1500            .expect("RuntimeTerminated must carry typed turn error metadata");
1501        assert_eq!(metadata.kind, TurnTerminalCauseKind::FatalFailure);
1502        assert_eq!(metadata.outcome, Some(TurnTerminalOutcome::Failed));
1503        assert!(metadata.terminal);
1504        assert_eq!(metadata.detail.as_deref(), Some("runtime stopped"));
1505    }
1506}