Skip to main content

meerkat_runtime/meerkat_machine/
traits.rs

1use super::*;
2use crate::input_state::StoredInputState;
3
4#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
5#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
6impl SessionServiceRuntimeExt for MeerkatMachine {
7    async fn accept_input(
8        &self,
9        session_id: &SessionId,
10        input: Input,
11    ) -> Result<AcceptOutcome, RuntimeDriverError> {
12        match self
13            .execute_meerkat_machine_command(
14                None,
15                MeerkatMachineCommand::AcceptWithCompletion {
16                    session_id: session_id.clone(),
17                    input,
18                    register_completion: false,
19                },
20            )
21            .await
22            .map_err(MeerkatMachine::driver_error_from_command_error)?
23        {
24            MeerkatMachineCommandResult::AcceptWithCompletion {
25                outcome,
26                handle: _,
27                admission_signal: _,
28            } => Ok(outcome),
29            other => Err(RuntimeDriverError::Internal(format!(
30                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::accept_input: {other:?}"
31            ))),
32        }
33    }
34
35    async fn accept_input_with_completion(
36        &self,
37        session_id: &SessionId,
38        input: Input,
39    ) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
40    {
41        tracing::debug!(
42            session_id = %session_id,
43            input_id = %input.id(),
44            "SessionServiceRuntimeExt::accept_input_with_completion entered"
45        );
46        self.accept_input_with_completion_boxed(session_id, input)
47            .await
48    }
49
50    async fn runtime_state(
51        &self,
52        session_id: &SessionId,
53    ) -> Result<RuntimeState, RuntimeDriverError> {
54        let runtime_id = MeerkatMachine::logical_runtime_id(session_id);
55        match self
56            .execute_meerkat_machine_command(
57                None,
58                MeerkatMachineCommand::RuntimeState { runtime_id },
59            )
60            .await
61            .map_err(MeerkatMachine::driver_error_from_command_error)?
62        {
63            MeerkatMachineCommandResult::RuntimeState(state) => Ok(state),
64            other => Err(RuntimeDriverError::Internal(format!(
65                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::runtime_state: {other:?}"
66            ))),
67        }
68    }
69
70    async fn retire_runtime(
71        &self,
72        session_id: &SessionId,
73    ) -> Result<RetireReport, RuntimeDriverError> {
74        let runtime_id = MeerkatMachine::logical_runtime_id(session_id);
75        match self
76            .execute_meerkat_machine_command(None, MeerkatMachineCommand::Retire { runtime_id })
77            .await
78            .map_err(MeerkatMachine::driver_error_from_command_error)?
79        {
80            MeerkatMachineCommandResult::RetireReport(report) => Ok(report),
81            other => Err(RuntimeDriverError::Internal(format!(
82                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::retire_runtime: {other:?}"
83            ))),
84        }
85    }
86
87    async fn reset_runtime(
88        &self,
89        session_id: &SessionId,
90    ) -> Result<ResetReport, RuntimeDriverError> {
91        let runtime_id = MeerkatMachine::logical_runtime_id(session_id);
92        match self
93            .execute_meerkat_machine_command(None, MeerkatMachineCommand::Reset { runtime_id })
94            .await
95            .map_err(MeerkatMachine::driver_error_from_command_error)?
96        {
97            MeerkatMachineCommandResult::ResetReport(report) => Ok(report),
98            other => Err(RuntimeDriverError::Internal(format!(
99                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::reset_runtime: {other:?}"
100            ))),
101        }
102    }
103
104    async fn input_state(
105        &self,
106        session_id: &SessionId,
107        input_id: &InputId,
108    ) -> Result<Option<StoredInputState>, RuntimeDriverError> {
109        match self
110            .execute_meerkat_machine_command(
111                None,
112                MeerkatMachineCommand::InputState {
113                    session_id: session_id.clone(),
114                    input_id: input_id.clone(),
115                },
116            )
117            .await
118            .map_err(MeerkatMachine::driver_error_from_command_error)?
119        {
120            MeerkatMachineCommandResult::InputState(state) => Ok(state),
121            other => Err(RuntimeDriverError::Internal(format!(
122                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::input_state: {other:?}"
123            ))),
124        }
125    }
126
127    async fn list_active_inputs(
128        &self,
129        session_id: &SessionId,
130    ) -> Result<Vec<InputId>, RuntimeDriverError> {
131        match self
132            .execute_meerkat_machine_command(
133                None,
134                MeerkatMachineCommand::ListActiveInputs {
135                    session_id: session_id.clone(),
136                },
137            )
138            .await
139            .map_err(MeerkatMachine::driver_error_from_command_error)?
140        {
141            MeerkatMachineCommandResult::ActiveInputs(inputs) => Ok(inputs),
142            other => Err(RuntimeDriverError::Internal(format!(
143                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::list_active_inputs: {other:?}"
144            ))),
145        }
146    }
147
148    async fn reconfigure_session_llm_identity(
149        &self,
150        session_id: &SessionId,
151        request: SessionLlmReconfigureRequest,
152    ) -> Result<SessionLlmReconfigureReport, RuntimeDriverError> {
153        let command = self
154            .prepare_reconfigure_session_llm_command(session_id, request)
155            .await?;
156        match self
157            .execute_meerkat_machine_command(None, command)
158            .await
159            .map_err(MeerkatMachine::driver_error_from_command_error)?
160        {
161            MeerkatMachineCommandResult::LlmReconfigured(report) => Ok(report),
162            other => Err(RuntimeDriverError::Internal(format!(
163                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::reconfigure_session_llm_identity: {other:?}"
164            ))),
165        }
166    }
167
168    async fn resolved_session_llm_capabilities(
169        &self,
170        session_id: &SessionId,
171    ) -> Result<Option<SessionLlmCapabilitySurface>, RuntimeDriverError> {
172        match self
173            .execute_meerkat_machine_command(
174                None,
175                MeerkatMachineCommand::ResolvedSessionLlmCapabilities {
176                    session_id: session_id.clone(),
177                },
178            )
179            .await
180            .map_err(MeerkatMachine::driver_error_from_command_error)?
181        {
182            MeerkatMachineCommandResult::ResolvedSessionLlmCapabilities(capabilities) => {
183                Ok(capabilities)
184            }
185            other => Err(RuntimeDriverError::Internal(format!(
186                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::resolved_session_llm_capabilities: {other:?}"
187            ))),
188        }
189    }
190
191    async fn configure_model_routing_baseline(
192        &self,
193        session_id: &SessionId,
194        baseline_model: meerkat_core::lifecycle::run_primitive::ModelId,
195        realtime_capable: bool,
196    ) -> Result<(), RuntimeDriverError> {
197        match self
198            .execute_meerkat_machine_command(
199                None,
200                MeerkatMachineCommand::ConfigureModelRoutingBaseline {
201                    session_id: session_id.clone(),
202                    baseline_model,
203                    realtime_capable,
204                },
205            )
206            .await
207            .map_err(MeerkatMachine::driver_error_from_command_error)?
208        {
209            MeerkatMachineCommandResult::Unit => Ok(()),
210            other => Err(RuntimeDriverError::Internal(format!(
211                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::configure_model_routing_baseline: {other:?}"
212            ))),
213        }
214    }
215
216    async fn session_model_routing_status(
217        &self,
218        session_id: &SessionId,
219    ) -> Result<meerkat_core::image_generation::SessionModelRoutingStatus, RuntimeDriverError> {
220        match self
221            .execute_meerkat_machine_command(
222                None,
223                MeerkatMachineCommand::SessionModelRoutingStatus {
224                    session_id: session_id.clone(),
225                },
226            )
227            .await
228            .map_err(MeerkatMachine::driver_error_from_command_error)?
229        {
230            MeerkatMachineCommandResult::SessionModelRoutingStatus(status) => Ok(status),
231            other => Err(RuntimeDriverError::Internal(format!(
232                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::session_model_routing_status: {other:?}"
233            ))),
234        }
235    }
236
237    async fn request_switch_turn(
238        &self,
239        session_id: &SessionId,
240        request: crate::meerkat_machine_types::SwitchTurnRequest,
241    ) -> Result<meerkat_core::image_generation::SwitchTurnControlResult, RuntimeDriverError> {
242        match self
243            .execute_meerkat_machine_command(
244                None,
245                MeerkatMachineCommand::RequestSwitchTurn {
246                    session_id: session_id.clone(),
247                    request: Box::new(request),
248                },
249            )
250            .await
251            .map_err(MeerkatMachine::driver_error_from_command_error)?
252        {
253            MeerkatMachineCommandResult::SwitchTurnControlResult(result) => Ok(result),
254            other => Err(RuntimeDriverError::Internal(format!(
255                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::request_switch_turn: {other:?}"
256            ))),
257        }
258    }
259
260    async fn admit_model_routing_assistant_turn(
261        &self,
262        session_id: &SessionId,
263    ) -> Result<(), RuntimeDriverError> {
264        match self
265            .execute_meerkat_machine_command(
266                None,
267                MeerkatMachineCommand::AdmitModelRoutingAssistantTurn {
268                    session_id: session_id.clone(),
269                },
270            )
271            .await
272            .map_err(MeerkatMachine::driver_error_from_command_error)?
273        {
274            MeerkatMachineCommandResult::Unit => Ok(()),
275            other => Err(RuntimeDriverError::Internal(format!(
276                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::admit_model_routing_assistant_turn: {other:?}"
277            ))),
278        }
279    }
280
281    async fn begin_image_operation(
282        &self,
283        session_id: &SessionId,
284        request: crate::meerkat_machine_types::ImageOperationRoutingRequest,
285    ) -> Result<crate::meerkat_machine_types::ImageOperationRoutingResult, RuntimeDriverError> {
286        match self
287            .execute_meerkat_machine_command(
288                None,
289                MeerkatMachineCommand::BeginImageOperation {
290                    session_id: session_id.clone(),
291                    request: Box::new(request),
292                },
293            )
294            .await
295            .map_err(MeerkatMachine::driver_error_from_command_error)?
296        {
297            MeerkatMachineCommandResult::ImageOperationRoutingResult(result) => Ok(result),
298            other => Err(RuntimeDriverError::Internal(format!(
299                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::begin_image_operation: {other:?}"
300            ))),
301        }
302    }
303
304    async fn deny_image_operation_plan(
305        &self,
306        session_id: &SessionId,
307        operation_id: meerkat_core::image_generation::ImageOperationId,
308        reason: meerkat_core::image_generation::ImageOperationDenialReason,
309    ) -> Result<meerkat_core::image_generation::ImageOperationPhase, RuntimeDriverError> {
310        match self
311            .execute_meerkat_machine_command(
312                None,
313                MeerkatMachineCommand::DenyImageOperationPlan {
314                    session_id: session_id.clone(),
315                    operation_id,
316                    reason,
317                },
318            )
319            .await
320            .map_err(MeerkatMachine::driver_error_from_command_error)?
321        {
322            MeerkatMachineCommandResult::ImageOperationPhase(phase) => Ok(phase),
323            other => Err(RuntimeDriverError::Internal(format!(
324                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::deny_image_operation_plan: {other:?}"
325            ))),
326        }
327    }
328
329    async fn activate_image_operation_override(
330        &self,
331        session_id: &SessionId,
332        operation_id: meerkat_core::image_generation::ImageOperationId,
333    ) -> Result<meerkat_core::image_generation::ImageOperationPhase, RuntimeDriverError> {
334        match self
335            .execute_meerkat_machine_command(
336                None,
337                MeerkatMachineCommand::ActivateImageOperationOverride {
338                    session_id: session_id.clone(),
339                    operation_id,
340                },
341            )
342            .await
343            .map_err(MeerkatMachine::driver_error_from_command_error)?
344        {
345            MeerkatMachineCommandResult::ImageOperationPhase(phase) => Ok(phase),
346            other => Err(RuntimeDriverError::Internal(format!(
347                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::activate_image_operation_override: {other:?}"
348            ))),
349        }
350    }
351
352    async fn complete_image_operation(
353        &self,
354        session_id: &SessionId,
355        operation_id: meerkat_core::image_generation::ImageOperationId,
356        terminal: meerkat_core::image_generation::ImageOperationTerminalClass,
357    ) -> Result<meerkat_core::image_generation::ImageOperationPhase, RuntimeDriverError> {
358        match self
359            .execute_meerkat_machine_command(
360                None,
361                MeerkatMachineCommand::CompleteImageOperation {
362                    session_id: session_id.clone(),
363                    operation_id,
364                    terminal,
365                },
366            )
367            .await
368            .map_err(MeerkatMachine::driver_error_from_command_error)?
369        {
370            MeerkatMachineCommandResult::ImageOperationPhase(phase) => Ok(phase),
371            other => Err(RuntimeDriverError::Internal(format!(
372                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::complete_image_operation: {other:?}"
373            ))),
374        }
375    }
376
377    async fn classify_image_operation_terminal(
378        &self,
379        session_id: &SessionId,
380        operation_id: meerkat_core::image_generation::ImageOperationId,
381        observation: meerkat_core::image_generation::ImageProviderTerminalObservation,
382        provider_text: meerkat_core::image_generation::ProviderTextDisposition,
383    ) -> Result<meerkat_core::image_generation::ImageOperationTerminalClass, RuntimeDriverError>
384    {
385        match self
386            .execute_meerkat_machine_command(
387                None,
388                MeerkatMachineCommand::ClassifyImageOperationTerminal {
389                    session_id: session_id.clone(),
390                    operation_id,
391                    observation,
392                    provider_text,
393                },
394            )
395            .await
396            .map_err(MeerkatMachine::driver_error_from_command_error)?
397        {
398            MeerkatMachineCommandResult::ImageOperationTerminalClass(terminal) => Ok(terminal),
399            other => Err(RuntimeDriverError::Internal(format!(
400                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::classify_image_operation_terminal: {other:?}"
401            ))),
402        }
403    }
404
405    async fn restore_image_operation_override(
406        &self,
407        session_id: &SessionId,
408        operation_id: meerkat_core::image_generation::ImageOperationId,
409    ) -> Result<meerkat_core::image_generation::ImageOperationPhase, RuntimeDriverError> {
410        match self
411            .execute_meerkat_machine_command(
412                None,
413                MeerkatMachineCommand::RestoreImageOperationOverride {
414                    session_id: session_id.clone(),
415                    operation_id,
416                },
417            )
418            .await
419            .map_err(MeerkatMachine::driver_error_from_command_error)?
420        {
421            MeerkatMachineCommandResult::ImageOperationPhase(phase) => Ok(phase),
422            other => Err(RuntimeDriverError::Internal(format!(
423                "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::restore_image_operation_override: {other:?}"
424            ))),
425        }
426    }
427}
428
429// ---------------------------------------------------------------------------
430// RuntimeControlPlane implementation
431// ---------------------------------------------------------------------------
432
433impl MeerkatMachine {
434    pub(crate) fn logical_runtime_id(session_id: &SessionId) -> LogicalRuntimeId {
435        LogicalRuntimeId::for_session(session_id)
436    }
437
438    pub(super) fn post_admission_signal_from_effects(
439        effects: &[crate::meerkat_machine::dsl::MeerkatMachineEffect],
440    ) -> crate::driver::ephemeral::PostAdmissionSignal {
441        effects
442            .iter()
443            .find_map(|effect| match effect {
444                crate::meerkat_machine::dsl::MeerkatMachineEffect::PostAdmissionSignal {
445                    signal,
446                } => Some(match signal {
447                    crate::meerkat_machine::dsl::PostAdmissionSignalKind::WakeLoop => {
448                        crate::driver::ephemeral::PostAdmissionSignal::WakeLoop
449                    }
450                    crate::meerkat_machine::dsl::PostAdmissionSignalKind::InterruptYielding => {
451                        crate::driver::ephemeral::PostAdmissionSignal::InterruptYielding
452                    }
453                    crate::meerkat_machine::dsl::PostAdmissionSignalKind::RequestImmediateProcessing => {
454                        crate::driver::ephemeral::PostAdmissionSignal::RequestImmediateProcessing
455                    }
456                }),
457                _ => None,
458            })
459            .unwrap_or(crate::driver::ephemeral::PostAdmissionSignal::None)
460    }
461
462    pub(super) fn driver_error_from_command_error(
463        err: MeerkatMachineCommandError,
464    ) -> RuntimeDriverError {
465        match err {
466            MeerkatMachineCommandError::Driver(err) => err,
467            MeerkatMachineCommandError::Control(err) => {
468                Self::driver_error_from_control_plane_error(err)
469            }
470        }
471    }
472
473    pub(super) fn control_plane_error_from_command_error(
474        err: MeerkatMachineCommandError,
475    ) -> RuntimeControlPlaneError {
476        match err {
477            MeerkatMachineCommandError::Control(err) => err,
478            MeerkatMachineCommandError::Driver(err) => {
479                RuntimeControlPlaneError::Internal(err.to_string())
480            }
481        }
482    }
483
484    pub(super) fn driver_error_from_control_plane_error(
485        err: RuntimeControlPlaneError,
486    ) -> RuntimeDriverError {
487        match err {
488            RuntimeControlPlaneError::NotFound(runtime_id) => {
489                RuntimeDriverError::NotFound { runtime_id }
490            }
491            RuntimeControlPlaneError::InvalidState { state } => {
492                RuntimeDriverError::NotReady { state }
493            }
494            RuntimeControlPlaneError::StoreError(message)
495            | RuntimeControlPlaneError::Internal(message) => RuntimeDriverError::Internal(message),
496        }
497    }
498
499    /// Resolve a LogicalRuntimeId to a registered SessionId for internal lookup.
500    pub(super) async fn resolve_session_id(
501        &self,
502        runtime_id: &LogicalRuntimeId,
503    ) -> Result<SessionId, RuntimeControlPlaneError> {
504        let sessions = self.sessions.read().await;
505        sessions
506            .iter()
507            .find_map(|(session_id, entry)| {
508                (&entry.runtime_id == runtime_id).then(|| session_id.clone())
509            })
510            .ok_or_else(|| RuntimeControlPlaneError::NotFound(runtime_id.clone()))
511    }
512
513    pub(super) async fn existing_session_runtime_state(
514        &self,
515        session_id: &SessionId,
516    ) -> Option<RuntimeState> {
517        let sessions = self.sessions.read().await;
518        let entry = sessions.get(session_id)?;
519        // DSL remains the transition authority for live, non-terminal states.
520        // Persistent drivers use the published control projection as the
521        // visibility barrier when DSL has crossed a run-return or terminal
522        // lifecycle boundary before the durable commit has published it.
523        let control = entry.control_snapshot();
524        let authority = entry
525            .dsl_authority
526            .lock()
527            .unwrap_or_else(std::sync::PoisonError::into_inner);
528        let dsl_phase = dsl_authority::runtime_phase_from_authority(&authority);
529        let dsl_pre_run_phase = dsl_authority::pre_run_phase_from_authority(&authority);
530        // The visible-phase arbitration verdict is machine-owned: mirror the
531        // generated `selected_raw_phase` (the chosen phase without the
532        // visibility rewrite). The classifier is total over the pure
533        // observations, so a failure is structurally unreachable; if it ever
534        // arises we fail closed to the most-terminal phase rather than re-derive
535        // a disposition in the shell.
536        match crate::meerkat_machine::resolve_visible_runtime_phase(
537            dsl_phase,
538            dsl_pre_run_phase,
539            control.phase,
540            control.pre_run_phase,
541            self.has_runtime_persistence(),
542        ) {
543            Ok(plan) => Some(plan.selected_raw_phase),
544            Err(reason) => {
545                tracing::error!(%session_id, %reason, "MeerkatMachine visible runtime phase resolution failed; failing closed to Destroyed");
546                Some(RuntimeState::Destroyed)
547            }
548        }
549    }
550
551    pub(super) async fn existing_session_visible_runtime_state(
552        &self,
553        session_id: &SessionId,
554    ) -> Option<RuntimeState> {
555        let sessions = self.sessions.read().await;
556        let entry = sessions.get(session_id)?;
557        let control = entry.control_snapshot();
558        let authority = entry
559            .dsl_authority
560            .lock()
561            .unwrap_or_else(std::sync::PoisonError::into_inner);
562        let dsl_phase = dsl_authority::runtime_phase_from_authority(&authority);
563        let dsl_pre_run_phase = dsl_authority::pre_run_phase_from_authority(&authority);
564        // Mirror the machine-owned `visible_phase` verdict (the externally-
565        // visible phase after the Running+pre_run(Retired)->Retired rewrite).
566        // The classifier is total; a failure is structurally unreachable and
567        // fails closed to the most-terminal phase rather than re-deriving in the
568        // shell.
569        match crate::meerkat_machine::resolve_visible_runtime_phase(
570            dsl_phase,
571            dsl_pre_run_phase,
572            control.phase,
573            control.pre_run_phase,
574            self.has_runtime_persistence(),
575        ) {
576            Ok(plan) => Some(plan.visible_phase),
577            Err(reason) => {
578                tracing::error!(%session_id, %reason, "MeerkatMachine visible runtime phase resolution failed; failing closed to Destroyed");
579                Some(RuntimeState::Destroyed)
580            }
581        }
582    }
583
584    /// Look up the session entry for a runtime ID, returning a control-plane error
585    /// if not found.
586    pub(super) async fn lookup_entry(
587        &self,
588        runtime_id: &LogicalRuntimeId,
589    ) -> Result<
590        (
591            SessionId,
592            SharedDriver,
593            SharedCompletionRegistry,
594            Option<mpsc::Sender<()>>,
595        ),
596        RuntimeControlPlaneError,
597    > {
598        let sessions = self.sessions.read().await;
599        let (session_id, entry) = sessions
600            .iter()
601            .find(|(_, entry)| &entry.runtime_id == runtime_id)
602            .ok_or_else(|| RuntimeControlPlaneError::NotFound(runtime_id.clone()))?;
603        Ok((
604            session_id.clone(),
605            entry.driver.clone(),
606            entry.completions.clone(),
607            entry.wake_sender(),
608        ))
609    }
610
611    pub async fn retire_runtime_control_plane(
612        &self,
613        runtime_id: &LogicalRuntimeId,
614    ) -> Result<RetireReport, RuntimeControlPlaneError> {
615        tracing::info!(
616            runtime_id = %runtime_id,
617            "MeerkatMachine::retire_runtime_control_plane start"
618        );
619        let (session_id, driver, completions, wake_tx) = self.lookup_entry(runtime_id).await?;
620        let gate = self.session_mutation_gate(&session_id).await;
621        let _gate_guard = match gate {
622            Some(ref gate) => Some(gate.lock().await),
623            None => None,
624        };
625
626        let staged_dsl = self
627            .stage_session_dsl_transition(
628                &session_id,
629                crate::meerkat_machine::dsl::MeerkatMachineInput::Retire {
630                    session_id: crate::meerkat_machine::dsl::SessionId::from_domain(&session_id),
631                },
632                "Retire",
633            )
634            .await
635            .map_err(RuntimeControlPlaneError::Internal)?;
636
637        let mut drv = driver.lock().await;
638        let mut report = match Box::pin(machine_retire(&mut drv)).await {
639            Ok(report) => report,
640            Err(err) => {
641                drv.sync_control_projection_from_dsl_authority();
642                return Err(RuntimeControlPlaneError::Internal(err.to_string()));
643            }
644        };
645        drop(drv);
646
647        let mut commit_error = None;
648        if let Err(reason) = self
649            .commit_session_dsl_transition_preserving_committed_state(
650                &session_id,
651                staged_dsl,
652                "Retire",
653            )
654            .await
655        {
656            driver
657                .lock()
658                .await
659                .sync_control_projection_from_dsl_authority();
660            commit_error = Some(reason);
661        }
662
663        if report.inputs_pending_drain > 0 {
664            if let Some(ref tx) = wake_tx
665                && tx.send(()).await.is_ok()
666            {
667                if let Some(reason) = commit_error {
668                    return Err(RuntimeControlPlaneError::Internal(reason));
669                }
670                return Ok(report);
671            }
672
673            let mut drv = driver.lock().await;
674            let abandoned = drv
675                .abandon_pending_inputs(crate::input_state::InputAbandonReason::Retired)
676                .await
677                .map_err(|err| RuntimeControlPlaneError::Internal(err.to_string()))?;
678            drop(drv);
679            let result_class =
680                crate::meerkat_machine::driver::machine_resolve_runtime_terminated_completion_result(
681                    &driver,
682                )
683                .await
684                .map_err(|err| RuntimeControlPlaneError::Internal(err.to_string()))?;
685            let mut comp = completions.lock().await;
686            comp.resolve_all_runtime_terminated("retired without runtime loop", result_class);
687            report.inputs_abandoned += abandoned;
688            report.inputs_pending_drain = 0;
689        }
690        if let Some(reason) = commit_error {
691            return Err(RuntimeControlPlaneError::Internal(reason));
692        }
693        Ok(report)
694    }
695}
696
697#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
698#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
699impl crate::traits::RuntimeControlPlane for MeerkatMachine {
700    async fn ingest(
701        &self,
702        runtime_id: &LogicalRuntimeId,
703        input: Input,
704    ) -> Result<AcceptOutcome, RuntimeControlPlaneError> {
705        match self
706            .execute_meerkat_machine_command(
707                None,
708                MeerkatMachineCommand::Ingest {
709                    runtime_id: runtime_id.clone(),
710                    input,
711                },
712            )
713            .await
714            .map_err(MeerkatMachine::control_plane_error_from_command_error)?
715        {
716            MeerkatMachineCommandResult::AcceptOutcome(outcome) => Ok(outcome),
717            other => Err(RuntimeControlPlaneError::Internal(format!(
718                "unexpected MeerkatMachineCommandResult for ingest: {other:?}"
719            ))),
720        }
721    }
722
723    async fn publish_event(
724        &self,
725        event: crate::runtime_event::RuntimeEventEnvelope,
726    ) -> Result<(), RuntimeControlPlaneError> {
727        match self
728            .execute_meerkat_machine_command(None, MeerkatMachineCommand::PublishEvent { event })
729            .await
730            .map_err(MeerkatMachine::control_plane_error_from_command_error)?
731        {
732            MeerkatMachineCommandResult::Unit => Ok(()),
733            other => Err(RuntimeControlPlaneError::Internal(format!(
734                "unexpected MeerkatMachineCommandResult for publish_event: {other:?}"
735            ))),
736        }
737    }
738
739    async fn retire(
740        &self,
741        runtime_id: &LogicalRuntimeId,
742    ) -> Result<RetireReport, RuntimeControlPlaneError> {
743        self.retire_runtime_control_plane(runtime_id).await
744    }
745
746    async fn recycle(
747        &self,
748        runtime_id: &LogicalRuntimeId,
749    ) -> Result<RecycleReport, RuntimeControlPlaneError> {
750        match self
751            .execute_meerkat_machine_command(
752                None,
753                MeerkatMachineCommand::Recycle {
754                    runtime_id: runtime_id.clone(),
755                },
756            )
757            .await
758            .map_err(MeerkatMachine::control_plane_error_from_command_error)?
759        {
760            MeerkatMachineCommandResult::RecycleReport(report) => Ok(report),
761            other => Err(RuntimeControlPlaneError::Internal(format!(
762                "unexpected MeerkatMachineCommandResult for recycle: {other:?}"
763            ))),
764        }
765    }
766
767    async fn reset(
768        &self,
769        runtime_id: &LogicalRuntimeId,
770    ) -> Result<crate::traits::ResetReport, RuntimeControlPlaneError> {
771        match self
772            .execute_meerkat_machine_command(
773                None,
774                MeerkatMachineCommand::Reset {
775                    runtime_id: runtime_id.clone(),
776                },
777            )
778            .await
779            .map_err(MeerkatMachine::control_plane_error_from_command_error)?
780        {
781            MeerkatMachineCommandResult::ResetReport(report) => Ok(report),
782            other => Err(RuntimeControlPlaneError::Internal(format!(
783                "unexpected MeerkatMachineCommandResult for reset: {other:?}"
784            ))),
785        }
786    }
787
788    async fn recover(
789        &self,
790        runtime_id: &LogicalRuntimeId,
791    ) -> Result<RecoveryReport, RuntimeControlPlaneError> {
792        match self
793            .execute_meerkat_machine_command(
794                None,
795                MeerkatMachineCommand::Recover {
796                    runtime_id: runtime_id.clone(),
797                },
798            )
799            .await
800            .map_err(MeerkatMachine::control_plane_error_from_command_error)?
801        {
802            MeerkatMachineCommandResult::RecoveryReport(report) => Ok(report),
803            other => Err(RuntimeControlPlaneError::Internal(format!(
804                "unexpected MeerkatMachineCommandResult for recover: {other:?}"
805            ))),
806        }
807    }
808
809    async fn destroy(
810        &self,
811        runtime_id: &LogicalRuntimeId,
812    ) -> Result<DestroyReport, RuntimeControlPlaneError> {
813        match self
814            .execute_meerkat_machine_command(
815                None,
816                MeerkatMachineCommand::Destroy {
817                    runtime_id: runtime_id.clone(),
818                },
819            )
820            .await
821            .map_err(MeerkatMachine::control_plane_error_from_command_error)?
822        {
823            MeerkatMachineCommandResult::DestroyReport(report) => Ok(report),
824            other => Err(RuntimeControlPlaneError::Internal(format!(
825                "unexpected MeerkatMachineCommandResult for destroy: {other:?}"
826            ))),
827        }
828    }
829
830    async fn runtime_state(
831        &self,
832        runtime_id: &LogicalRuntimeId,
833    ) -> Result<RuntimeState, RuntimeControlPlaneError> {
834        match self
835            .execute_meerkat_machine_command(
836                None,
837                MeerkatMachineCommand::RuntimeState {
838                    runtime_id: runtime_id.clone(),
839                },
840            )
841            .await
842            .map_err(MeerkatMachine::control_plane_error_from_command_error)?
843        {
844            MeerkatMachineCommandResult::RuntimeState(state) => Ok(state),
845            other => Err(RuntimeControlPlaneError::Internal(format!(
846                "unexpected MeerkatMachineCommandResult for runtime_state: {other:?}"
847            ))),
848        }
849    }
850
851    async fn load_boundary_receipt(
852        &self,
853        runtime_id: &LogicalRuntimeId,
854        run_id: &RunId,
855        sequence: u64,
856    ) -> Result<Option<meerkat_core::lifecycle::RunBoundaryReceipt>, RuntimeControlPlaneError> {
857        match self
858            .execute_meerkat_machine_command(
859                None,
860                MeerkatMachineCommand::LoadBoundaryReceipt {
861                    runtime_id: runtime_id.clone(),
862                    run_id: run_id.clone(),
863                    sequence,
864                },
865            )
866            .await
867            .map_err(MeerkatMachine::control_plane_error_from_command_error)?
868        {
869            MeerkatMachineCommandResult::BoundaryReceipt(receipt) => Ok(receipt),
870            other => Err(RuntimeControlPlaneError::Internal(format!(
871                "unexpected MeerkatMachineCommandResult for load_boundary_receipt: {other:?}"
872            ))),
873        }
874    }
875}
876
877#[cfg(test)]
878#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
879mod tests {
880    use super::*;
881
882    /// Row #45 gate: control-plane not-found must map to the dedicated
883    /// `RuntimeDriverError::NotFound` carrying the runtime id, NOT to
884    /// `NotReady { state: Destroyed }` (which conflates never-existed/absent
885    /// with a torn-down lifecycle).
886    #[test]
887    fn control_plane_not_found_maps_to_driver_not_found() {
888        let runtime_id = LogicalRuntimeId("missing-runtime".to_string());
889        let mapped = MeerkatMachine::driver_error_from_control_plane_error(
890            RuntimeControlPlaneError::NotFound(runtime_id.clone()),
891        );
892
893        match mapped {
894            RuntimeDriverError::NotFound {
895                runtime_id: mapped_id,
896            } => assert_eq!(mapped_id, runtime_id),
897            other => panic!(
898                "expected RuntimeDriverError::NotFound, got {other:?} (must not collapse absence into NotReady/Destroyed)"
899            ),
900        }
901    }
902
903    /// Guard the negative half explicitly: the not-found mapping must never
904    /// surface as `NotReady { state: Destroyed }`.
905    #[test]
906    fn control_plane_not_found_is_not_destroyed_not_ready() {
907        let mapped = MeerkatMachine::driver_error_from_control_plane_error(
908            RuntimeControlPlaneError::NotFound(LogicalRuntimeId("missing-runtime".to_string())),
909        );
910
911        assert!(
912            !matches!(
913                mapped,
914                RuntimeDriverError::NotReady {
915                    state: RuntimeState::Destroyed
916                }
917            ),
918            "not-found must not be laundered into NotReady{{Destroyed}}"
919        );
920    }
921}