1use super::*;
2use crate::input_state::StoredInputState;
3
4#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
5#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
6impl SessionServiceRuntimeExt for MeerkatMachine {
7 async fn accept_input(
8 &self,
9 session_id: &SessionId,
10 input: Input,
11 ) -> Result<AcceptOutcome, RuntimeDriverError> {
12 match self
13 .execute_meerkat_machine_command(
14 None,
15 MeerkatMachineCommand::AcceptWithCompletion {
16 session_id: session_id.clone(),
17 input,
18 register_completion: false,
19 },
20 )
21 .await
22 .map_err(MeerkatMachine::driver_error_from_command_error)?
23 {
24 MeerkatMachineCommandResult::AcceptWithCompletion {
25 outcome,
26 handle: _,
27 admission_signal: _,
28 } => Ok(outcome),
29 other => Err(RuntimeDriverError::Internal(format!(
30 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::accept_input: {other:?}"
31 ))),
32 }
33 }
34
35 async fn accept_input_with_completion(
36 &self,
37 session_id: &SessionId,
38 input: Input,
39 ) -> Result<(AcceptOutcome, Option<crate::completion::CompletionHandle>), RuntimeDriverError>
40 {
41 tracing::debug!(
42 session_id = %session_id,
43 input_id = %input.id(),
44 "SessionServiceRuntimeExt::accept_input_with_completion entered"
45 );
46 self.accept_input_with_completion_boxed(session_id, input)
47 .await
48 }
49
50 async fn runtime_state(
51 &self,
52 session_id: &SessionId,
53 ) -> Result<RuntimeState, RuntimeDriverError> {
54 let runtime_id = MeerkatMachine::logical_runtime_id(session_id);
55 match self
56 .execute_meerkat_machine_command(
57 None,
58 MeerkatMachineCommand::RuntimeState { runtime_id },
59 )
60 .await
61 .map_err(MeerkatMachine::driver_error_from_command_error)?
62 {
63 MeerkatMachineCommandResult::RuntimeState(state) => Ok(state),
64 other => Err(RuntimeDriverError::Internal(format!(
65 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::runtime_state: {other:?}"
66 ))),
67 }
68 }
69
70 async fn retire_runtime(
71 &self,
72 session_id: &SessionId,
73 ) -> Result<RetireReport, RuntimeDriverError> {
74 let runtime_id = MeerkatMachine::logical_runtime_id(session_id);
75 match self
76 .execute_meerkat_machine_command(None, MeerkatMachineCommand::Retire { runtime_id })
77 .await
78 .map_err(MeerkatMachine::driver_error_from_command_error)?
79 {
80 MeerkatMachineCommandResult::RetireReport(report) => Ok(report),
81 other => Err(RuntimeDriverError::Internal(format!(
82 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::retire_runtime: {other:?}"
83 ))),
84 }
85 }
86
87 async fn reset_runtime(
88 &self,
89 session_id: &SessionId,
90 ) -> Result<ResetReport, RuntimeDriverError> {
91 let runtime_id = MeerkatMachine::logical_runtime_id(session_id);
92 match self
93 .execute_meerkat_machine_command(None, MeerkatMachineCommand::Reset { runtime_id })
94 .await
95 .map_err(MeerkatMachine::driver_error_from_command_error)?
96 {
97 MeerkatMachineCommandResult::ResetReport(report) => Ok(report),
98 other => Err(RuntimeDriverError::Internal(format!(
99 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::reset_runtime: {other:?}"
100 ))),
101 }
102 }
103
104 async fn input_state(
105 &self,
106 session_id: &SessionId,
107 input_id: &InputId,
108 ) -> Result<Option<StoredInputState>, RuntimeDriverError> {
109 match self
110 .execute_meerkat_machine_command(
111 None,
112 MeerkatMachineCommand::InputState {
113 session_id: session_id.clone(),
114 input_id: input_id.clone(),
115 },
116 )
117 .await
118 .map_err(MeerkatMachine::driver_error_from_command_error)?
119 {
120 MeerkatMachineCommandResult::InputState(state) => Ok(state),
121 other => Err(RuntimeDriverError::Internal(format!(
122 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::input_state: {other:?}"
123 ))),
124 }
125 }
126
127 async fn list_active_inputs(
128 &self,
129 session_id: &SessionId,
130 ) -> Result<Vec<InputId>, RuntimeDriverError> {
131 match self
132 .execute_meerkat_machine_command(
133 None,
134 MeerkatMachineCommand::ListActiveInputs {
135 session_id: session_id.clone(),
136 },
137 )
138 .await
139 .map_err(MeerkatMachine::driver_error_from_command_error)?
140 {
141 MeerkatMachineCommandResult::ActiveInputs(inputs) => Ok(inputs),
142 other => Err(RuntimeDriverError::Internal(format!(
143 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::list_active_inputs: {other:?}"
144 ))),
145 }
146 }
147
148 async fn reconfigure_session_llm_identity(
149 &self,
150 session_id: &SessionId,
151 request: SessionLlmReconfigureRequest,
152 ) -> Result<SessionLlmReconfigureReport, RuntimeDriverError> {
153 let command = self
154 .prepare_reconfigure_session_llm_command(session_id, request)
155 .await?;
156 match self
157 .execute_meerkat_machine_command(None, command)
158 .await
159 .map_err(MeerkatMachine::driver_error_from_command_error)?
160 {
161 MeerkatMachineCommandResult::LlmReconfigured(report) => Ok(report),
162 other => Err(RuntimeDriverError::Internal(format!(
163 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::reconfigure_session_llm_identity: {other:?}"
164 ))),
165 }
166 }
167
168 async fn resolved_session_llm_capabilities(
169 &self,
170 session_id: &SessionId,
171 ) -> Result<Option<SessionLlmCapabilitySurface>, RuntimeDriverError> {
172 match self
173 .execute_meerkat_machine_command(
174 None,
175 MeerkatMachineCommand::ResolvedSessionLlmCapabilities {
176 session_id: session_id.clone(),
177 },
178 )
179 .await
180 .map_err(MeerkatMachine::driver_error_from_command_error)?
181 {
182 MeerkatMachineCommandResult::ResolvedSessionLlmCapabilities(capabilities) => {
183 Ok(capabilities)
184 }
185 other => Err(RuntimeDriverError::Internal(format!(
186 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::resolved_session_llm_capabilities: {other:?}"
187 ))),
188 }
189 }
190
191 async fn configure_model_routing_baseline(
192 &self,
193 session_id: &SessionId,
194 baseline_model: meerkat_core::lifecycle::run_primitive::ModelId,
195 realtime_capable: bool,
196 ) -> Result<(), RuntimeDriverError> {
197 match self
198 .execute_meerkat_machine_command(
199 None,
200 MeerkatMachineCommand::ConfigureModelRoutingBaseline {
201 session_id: session_id.clone(),
202 baseline_model,
203 realtime_capable,
204 },
205 )
206 .await
207 .map_err(MeerkatMachine::driver_error_from_command_error)?
208 {
209 MeerkatMachineCommandResult::Unit => Ok(()),
210 other => Err(RuntimeDriverError::Internal(format!(
211 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::configure_model_routing_baseline: {other:?}"
212 ))),
213 }
214 }
215
216 async fn session_model_routing_status(
217 &self,
218 session_id: &SessionId,
219 ) -> Result<meerkat_core::image_generation::SessionModelRoutingStatus, RuntimeDriverError> {
220 match self
221 .execute_meerkat_machine_command(
222 None,
223 MeerkatMachineCommand::SessionModelRoutingStatus {
224 session_id: session_id.clone(),
225 },
226 )
227 .await
228 .map_err(MeerkatMachine::driver_error_from_command_error)?
229 {
230 MeerkatMachineCommandResult::SessionModelRoutingStatus(status) => Ok(status),
231 other => Err(RuntimeDriverError::Internal(format!(
232 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::session_model_routing_status: {other:?}"
233 ))),
234 }
235 }
236
237 async fn request_switch_turn(
238 &self,
239 session_id: &SessionId,
240 request: crate::meerkat_machine_types::SwitchTurnRequest,
241 ) -> Result<meerkat_core::image_generation::SwitchTurnControlResult, RuntimeDriverError> {
242 match self
243 .execute_meerkat_machine_command(
244 None,
245 MeerkatMachineCommand::RequestSwitchTurn {
246 session_id: session_id.clone(),
247 request: Box::new(request),
248 },
249 )
250 .await
251 .map_err(MeerkatMachine::driver_error_from_command_error)?
252 {
253 MeerkatMachineCommandResult::SwitchTurnControlResult(result) => Ok(result),
254 other => Err(RuntimeDriverError::Internal(format!(
255 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::request_switch_turn: {other:?}"
256 ))),
257 }
258 }
259
260 async fn admit_model_routing_assistant_turn(
261 &self,
262 session_id: &SessionId,
263 ) -> Result<(), RuntimeDriverError> {
264 match self
265 .execute_meerkat_machine_command(
266 None,
267 MeerkatMachineCommand::AdmitModelRoutingAssistantTurn {
268 session_id: session_id.clone(),
269 },
270 )
271 .await
272 .map_err(MeerkatMachine::driver_error_from_command_error)?
273 {
274 MeerkatMachineCommandResult::Unit => Ok(()),
275 other => Err(RuntimeDriverError::Internal(format!(
276 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::admit_model_routing_assistant_turn: {other:?}"
277 ))),
278 }
279 }
280
281 async fn begin_image_operation(
282 &self,
283 session_id: &SessionId,
284 request: crate::meerkat_machine_types::ImageOperationRoutingRequest,
285 ) -> Result<crate::meerkat_machine_types::ImageOperationRoutingResult, RuntimeDriverError> {
286 match self
287 .execute_meerkat_machine_command(
288 None,
289 MeerkatMachineCommand::BeginImageOperation {
290 session_id: session_id.clone(),
291 request: Box::new(request),
292 },
293 )
294 .await
295 .map_err(MeerkatMachine::driver_error_from_command_error)?
296 {
297 MeerkatMachineCommandResult::ImageOperationRoutingResult(result) => Ok(result),
298 other => Err(RuntimeDriverError::Internal(format!(
299 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::begin_image_operation: {other:?}"
300 ))),
301 }
302 }
303
304 async fn deny_image_operation_plan(
305 &self,
306 session_id: &SessionId,
307 operation_id: meerkat_core::image_generation::ImageOperationId,
308 reason: meerkat_core::image_generation::ImageOperationDenialReason,
309 ) -> Result<meerkat_core::image_generation::ImageOperationPhase, RuntimeDriverError> {
310 match self
311 .execute_meerkat_machine_command(
312 None,
313 MeerkatMachineCommand::DenyImageOperationPlan {
314 session_id: session_id.clone(),
315 operation_id,
316 reason,
317 },
318 )
319 .await
320 .map_err(MeerkatMachine::driver_error_from_command_error)?
321 {
322 MeerkatMachineCommandResult::ImageOperationPhase(phase) => Ok(phase),
323 other => Err(RuntimeDriverError::Internal(format!(
324 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::deny_image_operation_plan: {other:?}"
325 ))),
326 }
327 }
328
329 async fn activate_image_operation_override(
330 &self,
331 session_id: &SessionId,
332 operation_id: meerkat_core::image_generation::ImageOperationId,
333 ) -> Result<meerkat_core::image_generation::ImageOperationPhase, RuntimeDriverError> {
334 match self
335 .execute_meerkat_machine_command(
336 None,
337 MeerkatMachineCommand::ActivateImageOperationOverride {
338 session_id: session_id.clone(),
339 operation_id,
340 },
341 )
342 .await
343 .map_err(MeerkatMachine::driver_error_from_command_error)?
344 {
345 MeerkatMachineCommandResult::ImageOperationPhase(phase) => Ok(phase),
346 other => Err(RuntimeDriverError::Internal(format!(
347 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::activate_image_operation_override: {other:?}"
348 ))),
349 }
350 }
351
352 async fn complete_image_operation(
353 &self,
354 session_id: &SessionId,
355 operation_id: meerkat_core::image_generation::ImageOperationId,
356 terminal: meerkat_core::image_generation::ImageOperationTerminalClass,
357 ) -> Result<meerkat_core::image_generation::ImageOperationPhase, RuntimeDriverError> {
358 match self
359 .execute_meerkat_machine_command(
360 None,
361 MeerkatMachineCommand::CompleteImageOperation {
362 session_id: session_id.clone(),
363 operation_id,
364 terminal,
365 },
366 )
367 .await
368 .map_err(MeerkatMachine::driver_error_from_command_error)?
369 {
370 MeerkatMachineCommandResult::ImageOperationPhase(phase) => Ok(phase),
371 other => Err(RuntimeDriverError::Internal(format!(
372 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::complete_image_operation: {other:?}"
373 ))),
374 }
375 }
376
377 async fn classify_image_operation_terminal(
378 &self,
379 session_id: &SessionId,
380 operation_id: meerkat_core::image_generation::ImageOperationId,
381 observation: meerkat_core::image_generation::ImageProviderTerminalObservation,
382 provider_text: meerkat_core::image_generation::ProviderTextDisposition,
383 ) -> Result<meerkat_core::image_generation::ImageOperationTerminalClass, RuntimeDriverError>
384 {
385 match self
386 .execute_meerkat_machine_command(
387 None,
388 MeerkatMachineCommand::ClassifyImageOperationTerminal {
389 session_id: session_id.clone(),
390 operation_id,
391 observation,
392 provider_text,
393 },
394 )
395 .await
396 .map_err(MeerkatMachine::driver_error_from_command_error)?
397 {
398 MeerkatMachineCommandResult::ImageOperationTerminalClass(terminal) => Ok(terminal),
399 other => Err(RuntimeDriverError::Internal(format!(
400 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::classify_image_operation_terminal: {other:?}"
401 ))),
402 }
403 }
404
405 async fn restore_image_operation_override(
406 &self,
407 session_id: &SessionId,
408 operation_id: meerkat_core::image_generation::ImageOperationId,
409 ) -> Result<meerkat_core::image_generation::ImageOperationPhase, RuntimeDriverError> {
410 match self
411 .execute_meerkat_machine_command(
412 None,
413 MeerkatMachineCommand::RestoreImageOperationOverride {
414 session_id: session_id.clone(),
415 operation_id,
416 },
417 )
418 .await
419 .map_err(MeerkatMachine::driver_error_from_command_error)?
420 {
421 MeerkatMachineCommandResult::ImageOperationPhase(phase) => Ok(phase),
422 other => Err(RuntimeDriverError::Internal(format!(
423 "unexpected MeerkatMachineCommandResult for SessionServiceRuntimeExt::restore_image_operation_override: {other:?}"
424 ))),
425 }
426 }
427}
428
429impl MeerkatMachine {
434 pub(crate) fn logical_runtime_id(session_id: &SessionId) -> LogicalRuntimeId {
435 LogicalRuntimeId::for_session(session_id)
436 }
437
438 pub(super) fn post_admission_signal_from_effects(
439 effects: &[crate::meerkat_machine::dsl::MeerkatMachineEffect],
440 ) -> crate::driver::ephemeral::PostAdmissionSignal {
441 effects
442 .iter()
443 .find_map(|effect| match effect {
444 crate::meerkat_machine::dsl::MeerkatMachineEffect::PostAdmissionSignal {
445 signal,
446 } => Some(match signal {
447 crate::meerkat_machine::dsl::PostAdmissionSignalKind::WakeLoop => {
448 crate::driver::ephemeral::PostAdmissionSignal::WakeLoop
449 }
450 crate::meerkat_machine::dsl::PostAdmissionSignalKind::InterruptYielding => {
451 crate::driver::ephemeral::PostAdmissionSignal::InterruptYielding
452 }
453 crate::meerkat_machine::dsl::PostAdmissionSignalKind::RequestImmediateProcessing => {
454 crate::driver::ephemeral::PostAdmissionSignal::RequestImmediateProcessing
455 }
456 }),
457 _ => None,
458 })
459 .unwrap_or(crate::driver::ephemeral::PostAdmissionSignal::None)
460 }
461
462 pub(super) fn driver_error_from_command_error(
463 err: MeerkatMachineCommandError,
464 ) -> RuntimeDriverError {
465 match err {
466 MeerkatMachineCommandError::Driver(err) => err,
467 MeerkatMachineCommandError::Control(err) => {
468 Self::driver_error_from_control_plane_error(err)
469 }
470 }
471 }
472
473 pub(super) fn control_plane_error_from_command_error(
474 err: MeerkatMachineCommandError,
475 ) -> RuntimeControlPlaneError {
476 match err {
477 MeerkatMachineCommandError::Control(err) => err,
478 MeerkatMachineCommandError::Driver(err) => {
479 RuntimeControlPlaneError::Internal(err.to_string())
480 }
481 }
482 }
483
484 pub(super) fn driver_error_from_control_plane_error(
485 err: RuntimeControlPlaneError,
486 ) -> RuntimeDriverError {
487 match err {
488 RuntimeControlPlaneError::NotFound(runtime_id) => {
489 RuntimeDriverError::NotFound { runtime_id }
490 }
491 RuntimeControlPlaneError::InvalidState { state } => {
492 RuntimeDriverError::NotReady { state }
493 }
494 RuntimeControlPlaneError::StoreError(message)
495 | RuntimeControlPlaneError::Internal(message) => RuntimeDriverError::Internal(message),
496 }
497 }
498
499 pub(super) async fn resolve_session_id(
501 &self,
502 runtime_id: &LogicalRuntimeId,
503 ) -> Result<SessionId, RuntimeControlPlaneError> {
504 let sessions = self.sessions.read().await;
505 sessions
506 .iter()
507 .find_map(|(session_id, entry)| {
508 (&entry.runtime_id == runtime_id).then(|| session_id.clone())
509 })
510 .ok_or_else(|| RuntimeControlPlaneError::NotFound(runtime_id.clone()))
511 }
512
513 pub(super) async fn existing_session_runtime_state(
514 &self,
515 session_id: &SessionId,
516 ) -> Option<RuntimeState> {
517 let sessions = self.sessions.read().await;
518 let entry = sessions.get(session_id)?;
519 let control = entry.control_snapshot();
524 let authority = entry
525 .dsl_authority
526 .lock()
527 .unwrap_or_else(std::sync::PoisonError::into_inner);
528 let dsl_phase = dsl_authority::runtime_phase_from_authority(&authority);
529 let dsl_pre_run_phase = dsl_authority::pre_run_phase_from_authority(&authority);
530 match crate::meerkat_machine::resolve_visible_runtime_phase(
537 dsl_phase,
538 dsl_pre_run_phase,
539 control.phase,
540 control.pre_run_phase,
541 self.has_runtime_persistence(),
542 ) {
543 Ok(plan) => Some(plan.selected_raw_phase),
544 Err(reason) => {
545 tracing::error!(%session_id, %reason, "MeerkatMachine visible runtime phase resolution failed; failing closed to Destroyed");
546 Some(RuntimeState::Destroyed)
547 }
548 }
549 }
550
551 pub(super) async fn existing_session_visible_runtime_state(
552 &self,
553 session_id: &SessionId,
554 ) -> Option<RuntimeState> {
555 let sessions = self.sessions.read().await;
556 let entry = sessions.get(session_id)?;
557 let control = entry.control_snapshot();
558 let authority = entry
559 .dsl_authority
560 .lock()
561 .unwrap_or_else(std::sync::PoisonError::into_inner);
562 let dsl_phase = dsl_authority::runtime_phase_from_authority(&authority);
563 let dsl_pre_run_phase = dsl_authority::pre_run_phase_from_authority(&authority);
564 match crate::meerkat_machine::resolve_visible_runtime_phase(
570 dsl_phase,
571 dsl_pre_run_phase,
572 control.phase,
573 control.pre_run_phase,
574 self.has_runtime_persistence(),
575 ) {
576 Ok(plan) => Some(plan.visible_phase),
577 Err(reason) => {
578 tracing::error!(%session_id, %reason, "MeerkatMachine visible runtime phase resolution failed; failing closed to Destroyed");
579 Some(RuntimeState::Destroyed)
580 }
581 }
582 }
583
584 pub(super) async fn lookup_entry(
587 &self,
588 runtime_id: &LogicalRuntimeId,
589 ) -> Result<
590 (
591 SessionId,
592 SharedDriver,
593 SharedCompletionRegistry,
594 Option<mpsc::Sender<()>>,
595 ),
596 RuntimeControlPlaneError,
597 > {
598 let sessions = self.sessions.read().await;
599 let (session_id, entry) = sessions
600 .iter()
601 .find(|(_, entry)| &entry.runtime_id == runtime_id)
602 .ok_or_else(|| RuntimeControlPlaneError::NotFound(runtime_id.clone()))?;
603 Ok((
604 session_id.clone(),
605 entry.driver.clone(),
606 entry.completions.clone(),
607 entry.wake_sender(),
608 ))
609 }
610
611 pub async fn retire_runtime_control_plane(
612 &self,
613 runtime_id: &LogicalRuntimeId,
614 ) -> Result<RetireReport, RuntimeControlPlaneError> {
615 tracing::info!(
616 runtime_id = %runtime_id,
617 "MeerkatMachine::retire_runtime_control_plane start"
618 );
619 let (session_id, driver, completions, wake_tx) = self.lookup_entry(runtime_id).await?;
620 let gate = self.session_mutation_gate(&session_id).await;
621 let _gate_guard = match gate {
622 Some(ref gate) => Some(gate.lock().await),
623 None => None,
624 };
625
626 let staged_dsl = self
627 .stage_session_dsl_transition(
628 &session_id,
629 crate::meerkat_machine::dsl::MeerkatMachineInput::Retire {
630 session_id: crate::meerkat_machine::dsl::SessionId::from_domain(&session_id),
631 },
632 "Retire",
633 )
634 .await
635 .map_err(RuntimeControlPlaneError::Internal)?;
636
637 let mut drv = driver.lock().await;
638 let mut report = match Box::pin(machine_retire(&mut drv)).await {
639 Ok(report) => report,
640 Err(err) => {
641 drv.sync_control_projection_from_dsl_authority();
642 return Err(RuntimeControlPlaneError::Internal(err.to_string()));
643 }
644 };
645 drop(drv);
646
647 let mut commit_error = None;
648 if let Err(reason) = self
649 .commit_session_dsl_transition_preserving_committed_state(
650 &session_id,
651 staged_dsl,
652 "Retire",
653 )
654 .await
655 {
656 driver
657 .lock()
658 .await
659 .sync_control_projection_from_dsl_authority();
660 commit_error = Some(reason);
661 }
662
663 if report.inputs_pending_drain > 0 {
664 if let Some(ref tx) = wake_tx
665 && tx.send(()).await.is_ok()
666 {
667 if let Some(reason) = commit_error {
668 return Err(RuntimeControlPlaneError::Internal(reason));
669 }
670 return Ok(report);
671 }
672
673 let mut drv = driver.lock().await;
674 let abandoned = drv
675 .abandon_pending_inputs(crate::input_state::InputAbandonReason::Retired)
676 .await
677 .map_err(|err| RuntimeControlPlaneError::Internal(err.to_string()))?;
678 drop(drv);
679 let result_class =
680 crate::meerkat_machine::driver::machine_resolve_runtime_terminated_completion_result(
681 &driver,
682 )
683 .await
684 .map_err(|err| RuntimeControlPlaneError::Internal(err.to_string()))?;
685 let mut comp = completions.lock().await;
686 comp.resolve_all_runtime_terminated("retired without runtime loop", result_class);
687 report.inputs_abandoned += abandoned;
688 report.inputs_pending_drain = 0;
689 }
690 if let Some(reason) = commit_error {
691 return Err(RuntimeControlPlaneError::Internal(reason));
692 }
693 Ok(report)
694 }
695}
696
697#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
698#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
699impl crate::traits::RuntimeControlPlane for MeerkatMachine {
700 async fn ingest(
701 &self,
702 runtime_id: &LogicalRuntimeId,
703 input: Input,
704 ) -> Result<AcceptOutcome, RuntimeControlPlaneError> {
705 match self
706 .execute_meerkat_machine_command(
707 None,
708 MeerkatMachineCommand::Ingest {
709 runtime_id: runtime_id.clone(),
710 input,
711 },
712 )
713 .await
714 .map_err(MeerkatMachine::control_plane_error_from_command_error)?
715 {
716 MeerkatMachineCommandResult::AcceptOutcome(outcome) => Ok(outcome),
717 other => Err(RuntimeControlPlaneError::Internal(format!(
718 "unexpected MeerkatMachineCommandResult for ingest: {other:?}"
719 ))),
720 }
721 }
722
723 async fn publish_event(
724 &self,
725 event: crate::runtime_event::RuntimeEventEnvelope,
726 ) -> Result<(), RuntimeControlPlaneError> {
727 match self
728 .execute_meerkat_machine_command(None, MeerkatMachineCommand::PublishEvent { event })
729 .await
730 .map_err(MeerkatMachine::control_plane_error_from_command_error)?
731 {
732 MeerkatMachineCommandResult::Unit => Ok(()),
733 other => Err(RuntimeControlPlaneError::Internal(format!(
734 "unexpected MeerkatMachineCommandResult for publish_event: {other:?}"
735 ))),
736 }
737 }
738
739 async fn retire(
740 &self,
741 runtime_id: &LogicalRuntimeId,
742 ) -> Result<RetireReport, RuntimeControlPlaneError> {
743 self.retire_runtime_control_plane(runtime_id).await
744 }
745
746 async fn recycle(
747 &self,
748 runtime_id: &LogicalRuntimeId,
749 ) -> Result<RecycleReport, RuntimeControlPlaneError> {
750 match self
751 .execute_meerkat_machine_command(
752 None,
753 MeerkatMachineCommand::Recycle {
754 runtime_id: runtime_id.clone(),
755 },
756 )
757 .await
758 .map_err(MeerkatMachine::control_plane_error_from_command_error)?
759 {
760 MeerkatMachineCommandResult::RecycleReport(report) => Ok(report),
761 other => Err(RuntimeControlPlaneError::Internal(format!(
762 "unexpected MeerkatMachineCommandResult for recycle: {other:?}"
763 ))),
764 }
765 }
766
767 async fn reset(
768 &self,
769 runtime_id: &LogicalRuntimeId,
770 ) -> Result<crate::traits::ResetReport, RuntimeControlPlaneError> {
771 match self
772 .execute_meerkat_machine_command(
773 None,
774 MeerkatMachineCommand::Reset {
775 runtime_id: runtime_id.clone(),
776 },
777 )
778 .await
779 .map_err(MeerkatMachine::control_plane_error_from_command_error)?
780 {
781 MeerkatMachineCommandResult::ResetReport(report) => Ok(report),
782 other => Err(RuntimeControlPlaneError::Internal(format!(
783 "unexpected MeerkatMachineCommandResult for reset: {other:?}"
784 ))),
785 }
786 }
787
788 async fn recover(
789 &self,
790 runtime_id: &LogicalRuntimeId,
791 ) -> Result<RecoveryReport, RuntimeControlPlaneError> {
792 match self
793 .execute_meerkat_machine_command(
794 None,
795 MeerkatMachineCommand::Recover {
796 runtime_id: runtime_id.clone(),
797 },
798 )
799 .await
800 .map_err(MeerkatMachine::control_plane_error_from_command_error)?
801 {
802 MeerkatMachineCommandResult::RecoveryReport(report) => Ok(report),
803 other => Err(RuntimeControlPlaneError::Internal(format!(
804 "unexpected MeerkatMachineCommandResult for recover: {other:?}"
805 ))),
806 }
807 }
808
809 async fn destroy(
810 &self,
811 runtime_id: &LogicalRuntimeId,
812 ) -> Result<DestroyReport, RuntimeControlPlaneError> {
813 match self
814 .execute_meerkat_machine_command(
815 None,
816 MeerkatMachineCommand::Destroy {
817 runtime_id: runtime_id.clone(),
818 },
819 )
820 .await
821 .map_err(MeerkatMachine::control_plane_error_from_command_error)?
822 {
823 MeerkatMachineCommandResult::DestroyReport(report) => Ok(report),
824 other => Err(RuntimeControlPlaneError::Internal(format!(
825 "unexpected MeerkatMachineCommandResult for destroy: {other:?}"
826 ))),
827 }
828 }
829
830 async fn runtime_state(
831 &self,
832 runtime_id: &LogicalRuntimeId,
833 ) -> Result<RuntimeState, RuntimeControlPlaneError> {
834 match self
835 .execute_meerkat_machine_command(
836 None,
837 MeerkatMachineCommand::RuntimeState {
838 runtime_id: runtime_id.clone(),
839 },
840 )
841 .await
842 .map_err(MeerkatMachine::control_plane_error_from_command_error)?
843 {
844 MeerkatMachineCommandResult::RuntimeState(state) => Ok(state),
845 other => Err(RuntimeControlPlaneError::Internal(format!(
846 "unexpected MeerkatMachineCommandResult for runtime_state: {other:?}"
847 ))),
848 }
849 }
850
851 async fn load_boundary_receipt(
852 &self,
853 runtime_id: &LogicalRuntimeId,
854 run_id: &RunId,
855 sequence: u64,
856 ) -> Result<Option<meerkat_core::lifecycle::RunBoundaryReceipt>, RuntimeControlPlaneError> {
857 match self
858 .execute_meerkat_machine_command(
859 None,
860 MeerkatMachineCommand::LoadBoundaryReceipt {
861 runtime_id: runtime_id.clone(),
862 run_id: run_id.clone(),
863 sequence,
864 },
865 )
866 .await
867 .map_err(MeerkatMachine::control_plane_error_from_command_error)?
868 {
869 MeerkatMachineCommandResult::BoundaryReceipt(receipt) => Ok(receipt),
870 other => Err(RuntimeControlPlaneError::Internal(format!(
871 "unexpected MeerkatMachineCommandResult for load_boundary_receipt: {other:?}"
872 ))),
873 }
874 }
875}
876
877#[cfg(test)]
878#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
879mod tests {
880 use super::*;
881
882 #[test]
887 fn control_plane_not_found_maps_to_driver_not_found() {
888 let runtime_id = LogicalRuntimeId("missing-runtime".to_string());
889 let mapped = MeerkatMachine::driver_error_from_control_plane_error(
890 RuntimeControlPlaneError::NotFound(runtime_id.clone()),
891 );
892
893 match mapped {
894 RuntimeDriverError::NotFound {
895 runtime_id: mapped_id,
896 } => assert_eq!(mapped_id, runtime_id),
897 other => panic!(
898 "expected RuntimeDriverError::NotFound, got {other:?} (must not collapse absence into NotReady/Destroyed)"
899 ),
900 }
901 }
902
903 #[test]
906 fn control_plane_not_found_is_not_destroyed_not_ready() {
907 let mapped = MeerkatMachine::driver_error_from_control_plane_error(
908 RuntimeControlPlaneError::NotFound(LogicalRuntimeId("missing-runtime".to_string())),
909 );
910
911 assert!(
912 !matches!(
913 mapped,
914 RuntimeDriverError::NotReady {
915 state: RuntimeState::Destroyed
916 }
917 ),
918 "not-found must not be laundered into NotReady{{Destroyed}}"
919 );
920 }
921}