1use std::collections::{BTreeSet, HashMap, HashSet};
20use std::future::Future;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::sync::RwLock as StdRwLock;
24#[cfg(not(target_arch = "wasm32"))]
25use std::sync::{Mutex as StdMutex, OnceLock, Weak};
26
27use meerkat_core::lifecycle::{InputId, RunId};
28use meerkat_core::tool_scope::ToolScopeTurnOverlay;
29use meerkat_core::types::SessionId;
30use meerkat_core::{BlobId, BlobPayload, BlobRef, BlobStore, BlobStoreError};
31use meerkat_core::{
32 DeferredToolLoadAuthority, SessionToolVisibilityState, ToolFilter, ToolScopeApplyError,
33 ToolScopeRevision, ToolScopeStageError, ToolVisibilityOwner, ToolVisibilityWitness,
34};
35
36use crate::accept::AcceptOutcome;
37use crate::driver::ephemeral::EphemeralRuntimeDriver;
38use crate::driver::persistent::PersistentRuntimeDriver;
39use crate::identifiers::LogicalRuntimeId;
40use crate::input::Input;
41use crate::input_state::{
42 InputAbandonReason, InputLifecycleState, InputStateSeed, InputTerminalOutcome,
43};
44use crate::meerkat_machine_types::{
45 HydratedSessionLlmState, MeerkatAdmittedInputSnapshot, MeerkatArchiveSnapshot,
46 MeerkatBindingSnapshot, MeerkatCompletionWaiterSnapshot, MeerkatCompletionWaitersSnapshot,
47 MeerkatControlSnapshot, MeerkatCursorSnapshot, MeerkatDrainSnapshot, MeerkatDriverKind,
48 MeerkatFormalStateProjection, MeerkatInputsSnapshot, MeerkatLedgerSnapshot,
49 MeerkatMachineCommand, MeerkatMachineCommandError, MeerkatMachineCommandResult,
50 MeerkatMachineRunFailure, MeerkatMachineSpineSnapshot, MeerkatOpsSnapshot,
51 SessionLlmCapabilityDelta, SessionLlmCapabilitySurface, SessionLlmReconfigureHost,
52 SessionLlmReconfigureReport, SessionLlmReconfigureRequest, SessionToolVisibilityDelta,
53};
54use crate::runtime_state::RuntimeState;
55use crate::service_ext::SessionServiceRuntimeExt;
56use crate::store::RuntimeStore;
57use crate::tokio;
58use crate::tokio::sync::{Mutex, RwLock, mpsc};
59#[cfg(test)]
60use crate::traits::RuntimeDriver;
61use crate::traits::{
62 DestroyReport, RecoveryReport, RecycleReport, ResetReport, RetireReport,
63 RuntimeControlPlaneError, RuntimeDriverError,
64};
65
66#[allow(clippy::expect_used)]
67pub(crate) fn recover_projected_authority(
68 state: dsl::MeerkatMachineState,
69 context: &'static str,
70) -> dsl::MeerkatMachineAuthority {
71 dsl::MeerkatMachineAuthority::recover_from_state(state).expect(context)
72}
73
74struct ToolVisibilityOwnerGeneratedAuthorityBridgeToken;
75
76static TOOL_VISIBILITY_OWNER_GENERATED_AUTHORITY_BRIDGE_TOKEN:
77 ToolVisibilityOwnerGeneratedAuthorityBridgeToken =
78 ToolVisibilityOwnerGeneratedAuthorityBridgeToken;
79
80fn tool_visibility_owner_generated_authority_bridge_token()
81-> &'static (dyn std::any::Any + Send + Sync) {
82 &TOOL_VISIBILITY_OWNER_GENERATED_AUTHORITY_BRIDGE_TOKEN
83}
84
85#[doc(hidden)]
86#[allow(improper_ctypes_definitions, unsafe_code)]
87#[unsafe(export_name = concat!(
88 "__meerkat_runtime_generated_authority_bridge_token_is_valid_v1_tool_visibility_owner_",
89 env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
90))]
91pub extern "Rust" fn tool_visibility_owner_generated_authority_bridge_token_is_valid(
92 token: &(dyn std::any::Any + Send + Sync),
93) -> bool {
94 token.is::<ToolVisibilityOwnerGeneratedAuthorityBridgeToken>()
95}
96
97fn generated_tool_visibility_owner(
98 owner: Arc<dyn ToolVisibilityOwner>,
99) -> Result<meerkat_core::GeneratedToolVisibilityOwner, String> {
100 #[allow(improper_ctypes_definitions, unsafe_code)]
101 unsafe extern "Rust" {
102 #[link_name = concat!(
103 "__meerkat_core_runtime_generated_tool_visibility_owner_build_v1_",
104 env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
105 )]
106 fn core_runtime_generated_tool_visibility_owner_build(
107 token: &'static (dyn std::any::Any + Send + Sync),
108 owner: Arc<dyn ToolVisibilityOwner>,
109 ) -> Result<meerkat_core::GeneratedToolVisibilityOwner, String>;
110 }
111 #[allow(unsafe_code)]
112 unsafe {
113 core_runtime_generated_tool_visibility_owner_build(
114 tool_visibility_owner_generated_authority_bridge_token(),
115 owner,
116 )
117 }
118}
119
120pub fn standalone_tool_visibility_owner(
127 session_id: &SessionId,
128 current_identity: &meerkat_core::SessionLlmIdentity,
129 model_profile: Option<&meerkat_core::model_profile::ModelProfile>,
130 capability_base_filter: &ToolFilter,
131) -> Result<meerkat_core::GeneratedToolVisibilityOwner, String> {
132 let mut authority = dsl_authority::recover_authority_from_runtime_observation(
133 session_id,
134 RuntimeState::Idle,
135 None,
136 None,
137 None,
138 BTreeSet::new(),
139 None,
140 None,
141 None,
142 )
143 .map_err(|err| dsl_authority::map_error(err, "standalone visibility authority"))?;
144 let (current_capability_surface, current_capability_surface_status) = match model_profile {
145 Some(profile) => (
146 Some(dsl::SessionLlmCapabilitySurface {
147 supports_temperature: profile.supports_temperature,
148 supports_thinking: profile.supports_thinking,
149 supports_reasoning: profile.supports_reasoning,
150 inline_video: profile.inline_video,
151 vision: profile.vision,
152 image_input: profile.image_input,
153 image_tool_results: profile.image_tool_results,
154 supports_web_search: profile.supports_web_search,
155 image_generation: profile.image_generation,
156 realtime: profile.realtime,
157 call_timeout_secs: profile.call_timeout_secs,
158 }),
159 dsl::SessionLlmCapabilitySurfaceStatus::Resolved,
160 ),
161 None => (None, dsl::SessionLlmCapabilitySurfaceStatus::Unresolved),
162 };
163 dsl::MeerkatMachineMutator::apply(
164 &mut authority,
165 dsl::MeerkatMachineInput::HydrateSessionLlmState {
166 current_identity: dsl::SessionLlmIdentity::from_domain(current_identity),
167 current_capability_surface,
168 current_capability_surface_status,
169 current_capability_base_filter: dsl::ToolFilter::from_domain(capability_base_filter),
170 },
171 )
172 .map_err(|err| dsl_authority::map_error(err, "standalone visibility hydration"))?;
173 let authority = Arc::new(std::sync::Mutex::new(authority));
174 let owner = Arc::new(MachineToolVisibilityOwner::new());
175 owner.bind_dsl_authority(authority);
176 generated_tool_visibility_owner(owner as Arc<dyn ToolVisibilityOwner>)
177}
178
179#[derive(Debug, thiserror::Error)]
181pub enum RuntimeBindingsError {
182 #[error("session {0} not found in runtime adapter after registration")]
184 SessionNotFound(SessionId),
185 #[error("failed to prepare runtime bindings for session {0}: {1}")]
187 PrepareFailed(SessionId, String),
188}
189
190#[derive(Debug, Clone, Copy, PartialEq, Eq)]
192pub struct InputPublicStateProjection {
193 pub lifecycle_state: dsl::InputPublicLifecycleState,
194 pub terminal_outcome: Option<dsl::InputPublicTerminalOutcome>,
195}
196
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub struct RuntimeLifecycleFacts {
201 pub terminality: dsl::RuntimeLifecycleTerminality,
202 pub input_admission: dsl::RuntimeInputAdmission,
203 pub queue_admission: dsl::RuntimeQueueAdmission,
204 pub prepare_admission: dsl::RuntimePrepareAdmission,
205 pub ingress_admission: dsl::RuntimeIngressAdmission,
206}
207
208impl RuntimeLifecycleFacts {
209 #[must_use]
210 pub fn can_accept_input(self) -> bool {
211 self.input_admission == dsl::RuntimeInputAdmission::AcceptsInput
212 }
213
214 #[must_use]
215 pub fn can_process_queue(self) -> bool {
216 self.queue_admission == dsl::RuntimeQueueAdmission::ProcessesQueue
217 }
218
219 #[must_use]
220 pub fn can_prepare_run(self) -> bool {
221 self.prepare_admission == dsl::RuntimePrepareAdmission::Ready
222 }
223
224 #[must_use]
225 pub fn is_terminal(self) -> bool {
226 self.terminality == dsl::RuntimeLifecycleTerminality::Terminal
227 }
228}
229
230#[derive(Debug, Clone, Copy, PartialEq, Eq)]
233pub struct RuntimeLoopQueueAdmissionPlan {
234 pub queue_admission: dsl::RuntimeQueueAdmission,
235 pub run_binding: dsl::RuntimeLoopRunBinding,
236}
237
238impl RuntimeLoopQueueAdmissionPlan {
239 #[must_use]
240 pub fn can_process_queue(self) -> bool {
241 self.queue_admission == dsl::RuntimeQueueAdmission::ProcessesQueue
242 }
243
244 #[must_use]
245 pub fn uses_prebound_run(self) -> bool {
246 self.run_binding == dsl::RuntimeLoopRunBinding::UsePrebound
247 }
248}
249
250pub fn classify_runtime_lifecycle_state(
254 state: RuntimeState,
255) -> Result<RuntimeLifecycleFacts, String> {
256 let observed_state = dsl_authority::observed_runtime_lifecycle_state(state);
257 let mut authority = projection_authority();
258 let transition = dsl::MeerkatMachineMutator::apply(
259 &mut authority,
260 dsl::MeerkatMachineInput::ClassifyRuntimeLifecycleState {
261 state: observed_state,
262 },
263 )
264 .map_err(|err| {
265 format!("MeerkatMachine rejected runtime lifecycle classification for {state}: {err}")
266 })?;
267
268 transition
269 .into_effects()
270 .into_iter()
271 .find_map(|effect| match effect {
272 dsl::MeerkatMachineEffect::RuntimeLifecycleStateClassified {
273 state,
274 terminality,
275 input_admission,
276 queue_admission,
277 prepare_admission,
278 ingress_admission,
279 } if state == observed_state => Some(RuntimeLifecycleFacts {
280 terminality,
281 input_admission,
282 queue_admission,
283 prepare_admission,
284 ingress_admission,
285 }),
286 _ => None,
287 })
288 .ok_or_else(|| {
289 format!("MeerkatMachine emitted no runtime lifecycle classification for {state}")
290 })
291}
292
293pub fn classify_runtime_lifecycle_durable_state(
297 state: RuntimeState,
298) -> Result<RuntimeState, String> {
299 let observed_state = dsl_authority::observed_runtime_lifecycle_state(state);
300 let mut authority = projection_authority();
301 let transition = dsl::MeerkatMachineMutator::apply(
302 &mut authority,
303 dsl::MeerkatMachineInput::ClassifyRuntimeLifecycleDurability {
304 state: observed_state,
305 },
306 )
307 .map_err(|err| {
308 format!(
309 "MeerkatMachine rejected runtime lifecycle durability classification for {state}: {err}"
310 )
311 })?;
312
313 transition
314 .into_effects()
315 .into_iter()
316 .find_map(|effect| match effect {
317 dsl::MeerkatMachineEffect::RuntimeLifecycleDurabilityClassified {
318 state,
319 durable_state,
320 } if state == observed_state => Some(
321 dsl_authority::runtime_state_from_observed_lifecycle_state(durable_state),
322 ),
323 _ => None,
324 })
325 .ok_or_else(|| {
326 format!(
327 "MeerkatMachine emitted no runtime lifecycle durability classification for {state}"
328 )
329 })
330}
331
332pub fn classify_runtime_loop_queue_admission(
337 state: RuntimeState,
338 current_run_bound: bool,
339) -> Result<RuntimeLoopQueueAdmissionPlan, String> {
340 let observed_state = dsl_authority::observed_runtime_lifecycle_state(state);
341 let mut authority = projection_authority();
342 let transition = dsl::MeerkatMachineMutator::apply(
343 &mut authority,
344 dsl::MeerkatMachineInput::ClassifyRuntimeLoopQueueAdmission {
345 state: observed_state,
346 current_run_bound,
347 },
348 )
349 .map_err(|err| {
350 format!(
351 "MeerkatMachine rejected runtime-loop queue admission for {state} with current_run_bound={current_run_bound}: {err}"
352 )
353 })?;
354
355 transition
356 .into_effects()
357 .into_iter()
358 .find_map(|effect| match effect {
359 dsl::MeerkatMachineEffect::RuntimeLoopQueueAdmissionClassified {
360 state,
361 current_run_bound: observed_current_run_bound,
362 queue_admission,
363 run_binding,
364 } if state == observed_state && observed_current_run_bound == current_run_bound => {
365 Some(RuntimeLoopQueueAdmissionPlan {
366 queue_admission,
367 run_binding,
368 })
369 }
370 _ => None,
371 })
372 .ok_or_else(|| {
373 format!(
374 "MeerkatMachine emitted no runtime-loop queue admission for {state} with current_run_bound={current_run_bound}"
375 )
376 })
377}
378
379#[derive(Debug, Clone, Copy, PartialEq, Eq)]
387pub struct VisibleRuntimePhasePlan {
388 pub publish_control: bool,
389 pub selected_raw_phase: RuntimeState,
390 pub visible_phase: RuntimeState,
391}
392
393pub fn resolve_visible_runtime_phase(
401 dsl_phase: RuntimeState,
402 dsl_pre_run_phase: Option<RuntimeState>,
403 control_phase: RuntimeState,
404 control_pre_run_phase: Option<RuntimeState>,
405 has_runtime_persistence: bool,
406) -> Result<VisibleRuntimePhasePlan, String> {
407 let observed_dsl = dsl_authority::observed_runtime_lifecycle_state(dsl_phase);
408 let observed_control = dsl_authority::observed_runtime_lifecycle_state(control_phase);
409 let observed_dsl_pre_run =
410 dsl_pre_run_phase.map(dsl_authority::observed_runtime_lifecycle_state);
411 let observed_control_pre_run =
412 control_pre_run_phase.map(dsl_authority::observed_runtime_lifecycle_state);
413 let mut authority = projection_authority();
414 let transition = dsl::MeerkatMachineMutator::apply(
415 &mut authority,
416 dsl::MeerkatMachineInput::ResolveVisibleRuntimePhase {
417 dsl_phase: observed_dsl,
418 dsl_pre_run_phase: observed_dsl_pre_run,
419 control_phase: observed_control,
420 control_pre_run_phase: observed_control_pre_run,
421 has_runtime_persistence,
422 },
423 )
424 .map_err(|err| {
425 format!(
426 "MeerkatMachine rejected visible runtime phase resolution \
427 (dsl={dsl_phase}, control={control_phase}, persistence={has_runtime_persistence}): {err}"
428 )
429 })?;
430
431 transition
432 .into_effects()
433 .into_iter()
434 .find_map(|effect| match effect {
435 dsl::MeerkatMachineEffect::VisibleRuntimePhaseResolved {
436 publish_control,
437 selected_raw_phase,
438 visible_phase,
439 } => Some(VisibleRuntimePhasePlan {
440 publish_control,
441 selected_raw_phase: dsl_authority::runtime_state_from_observed_lifecycle_state(
442 selected_raw_phase,
443 ),
444 visible_phase: dsl_authority::runtime_state_from_observed_lifecycle_state(
445 visible_phase,
446 ),
447 }),
448 _ => None,
449 })
450 .ok_or_else(|| {
451 format!(
452 "MeerkatMachine emitted no visible runtime phase resolution \
453 (dsl={dsl_phase}, control={control_phase}, persistence={has_runtime_persistence})"
454 )
455 })
456}
457
458pub fn resolve_input_public_lifecycle_projection(
461 input_id: &InputId,
462 phase: InputLifecycleState,
463) -> Result<dsl::InputPublicLifecycleState, String> {
464 let input_key = input_id.to_string();
465 let mut authority = projection_authority();
466 let transition = dsl::MeerkatMachineMutator::apply(
467 &mut authority,
468 dsl::MeerkatMachineInput::ResolveInputPublicLifecycle {
469 input_id: input_key.clone(),
470 phase: observed_input_phase(phase),
471 },
472 )
473 .map_err(|err| {
474 format!("MeerkatMachine rejected public lifecycle projection for '{input_id}': {err}")
475 })?;
476
477 transition
478 .into_effects()
479 .into_iter()
480 .find_map(|effect| match effect {
481 dsl::MeerkatMachineEffect::InputPublicLifecycleResolved { input_id, phase }
482 if input_id == input_key =>
483 {
484 Some(phase)
485 }
486 _ => None,
487 })
488 .ok_or_else(|| {
489 format!("MeerkatMachine emitted no public lifecycle projection for '{input_id}'")
490 })
491}
492
493pub fn resolve_input_public_state_projection(
496 input_id: &InputId,
497 seed: &InputStateSeed,
498) -> Result<InputPublicStateProjection, String> {
499 let lifecycle_state = resolve_input_public_lifecycle_projection(input_id, seed.phase)?;
500 let terminal_outcome = resolve_input_public_terminal_projection(input_id, seed)?;
501 Ok(InputPublicStateProjection {
502 lifecycle_state,
503 terminal_outcome,
504 })
505}
506
507pub(crate) fn input_seed_behavioral_terminality_via_authority(
508 input_id: &InputId,
509 seed: &InputStateSeed,
510) -> Result<bool, String> {
511 classify_input_behavioral_terminality(input_id, seed.phase, seed.terminal_outcome.as_ref())
512}
513
514pub(crate) fn input_phase_behavioral_terminality_via_authority(
515 input_id: &InputId,
516 phase: InputLifecycleState,
517 terminal_outcome: Option<InputTerminalOutcome>,
518) -> Result<bool, String> {
519 classify_input_behavioral_terminality(input_id, phase, terminal_outcome.as_ref())
520}
521
522pub(crate) fn authorize_stored_input_state_seed(
525 input_id: &InputId,
526 seed: &InputStateSeed,
527) -> Result<(), String> {
528 let input_key = input_id.to_string();
529 let (terminal_kind, superseded_by, aggregate_id, abandon_reason, abandon_attempt_count) =
530 input_seed_terminal_parts(seed)?;
531 let mut authority = projection_authority();
532 let transition = dsl::MeerkatMachineMutator::apply(
533 &mut authority,
534 dsl::MeerkatMachineInput::AuthorizeStoredInputStateSeed {
535 input_id: input_key.clone(),
536 phase: observed_input_phase(seed.phase),
537 terminal_kind,
538 superseded_by,
539 aggregate_id,
540 abandon_reason,
541 abandon_attempt_count,
542 attempt_count: u64::from(seed.attempt_count),
543 run_id: seed
544 .last_run_id
545 .as_ref()
546 .map(std::string::ToString::to_string),
547 boundary_sequence: seed.last_boundary_sequence,
548 admission_sequence: seed.admission_sequence,
549 recovery_lane: seed.recovery_lane.map(dsl::InputLane::from),
550 },
551 )
552 .map_err(|err| {
553 format!("MeerkatMachine rejected stored input-state seed for '{input_id}': {err}")
554 })?;
555
556 transition
557 .into_effects()
558 .into_iter()
559 .find_map(|effect| match effect {
560 dsl::MeerkatMachineEffect::StoredInputStateSeedAuthorized { input_id }
561 if input_id == input_key =>
562 {
563 Some(())
564 }
565 _ => None,
566 })
567 .ok_or_else(|| {
568 format!("MeerkatMachine emitted no stored input-state seed authority for '{input_id}'")
569 })
570}
571
572fn classify_input_behavioral_terminality(
573 input_id: &InputId,
574 phase: InputLifecycleState,
575 terminal_outcome: Option<&InputTerminalOutcome>,
576) -> Result<bool, String> {
577 let input_key = input_id.to_string();
578 let (terminal_kind, abandon_reason) = input_terminality_parts(terminal_outcome);
579 let mut authority = projection_authority();
580 let transition = dsl::MeerkatMachineMutator::apply(
581 &mut authority,
582 dsl::MeerkatMachineInput::ClassifyInputTerminality {
583 input_id: input_key.clone(),
584 phase: observed_input_phase(phase),
585 terminal_kind,
586 abandon_reason,
587 },
588 )
589 .map_err(|err| {
590 format!("MeerkatMachine rejected behavioral input terminality for '{input_id}': {err}")
591 })?;
592
593 let mut terminality = None;
594 for effect in transition.into_effects() {
595 match effect {
596 dsl::MeerkatMachineEffect::InputBehavioralTerminalityResolved {
597 input_id,
598 terminal,
599 } if input_id == input_key => terminality = Some(terminal),
600 other => {
601 return Err(format!(
602 "MeerkatMachine emitted unexpected behavioral input terminality effect for '{input_id}': {other:?}"
603 ));
604 }
605 }
606 }
607 terminality.ok_or_else(|| {
608 format!("MeerkatMachine emitted no behavioral input terminality for '{input_id}'")
609 })
610}
611
612fn resolve_input_public_terminal_projection(
613 input_id: &InputId,
614 seed: &InputStateSeed,
615) -> Result<Option<dsl::InputPublicTerminalOutcome>, String> {
616 let input_key = input_id.to_string();
617 let (terminal_kind, abandon_reason) = input_terminality_parts(seed.terminal_outcome.as_ref());
618 let mut authority = projection_authority();
619 let transition = dsl::MeerkatMachineMutator::apply(
620 &mut authority,
621 dsl::MeerkatMachineInput::ResolveInputPublicTerminalOutcome {
622 input_id: input_key.clone(),
623 phase: observed_input_phase(seed.phase),
624 terminal_kind,
625 abandon_reason,
626 },
627 )
628 .map_err(|err| {
629 format!("MeerkatMachine rejected public terminal projection for '{input_id}': {err}")
630 })?;
631
632 transition
633 .into_effects()
634 .into_iter()
635 .find_map(|effect| match effect {
636 dsl::MeerkatMachineEffect::InputPublicTerminalOutcomeResolved {
637 input_id,
638 terminal_outcome,
639 } if input_id == input_key => Some(terminal_outcome),
640 _ => None,
641 })
642 .ok_or_else(|| {
643 format!("MeerkatMachine emitted no public terminal projection for '{input_id}'")
644 })
645}
646
647fn projection_authority() -> dsl::MeerkatMachineAuthority {
648 dsl_authority::new_initialized_authority("projection authority must initialize")
649}
650
651#[cfg(feature = "live")]
652fn live_unbound_rejection_authority() -> crate::driver::ephemeral::SharedIngressDslAuthority {
653 Arc::new(std::sync::Mutex::new(
654 dsl_authority::new_initialized_authority(
655 "live unbound rejection authority must initialize",
656 ),
657 ))
658}
659
660fn observed_input_phase(phase: InputLifecycleState) -> dsl::RecoveredInputObservedPhase {
661 match phase {
662 InputLifecycleState::Accepted => dsl::RecoveredInputObservedPhase::Accepted,
663 InputLifecycleState::Queued => dsl::RecoveredInputObservedPhase::Queued,
664 InputLifecycleState::Staged => dsl::RecoveredInputObservedPhase::Staged,
665 InputLifecycleState::Applied => dsl::RecoveredInputObservedPhase::Applied,
666 InputLifecycleState::AppliedPendingConsumption => {
667 dsl::RecoveredInputObservedPhase::AppliedPendingConsumption
668 }
669 InputLifecycleState::Consumed => dsl::RecoveredInputObservedPhase::Consumed,
670 InputLifecycleState::Superseded => dsl::RecoveredInputObservedPhase::Superseded,
671 InputLifecycleState::Coalesced => dsl::RecoveredInputObservedPhase::Coalesced,
672 InputLifecycleState::Abandoned => dsl::RecoveredInputObservedPhase::Abandoned,
673 }
674}
675
676type InputSeedTerminalParts = (
677 Option<dsl::InputTerminalKind>,
678 Option<String>,
679 Option<String>,
680 Option<dsl::InputAbandonReason>,
681 u64,
682);
683
684fn input_seed_terminal_parts(seed: &InputStateSeed) -> Result<InputSeedTerminalParts, String> {
685 match seed.terminal_outcome.as_ref() {
686 None => Ok((None, None, None, None, 0)),
687 Some(InputTerminalOutcome::Consumed) => {
688 Ok((Some(dsl::InputTerminalKind::Consumed), None, None, None, 0))
689 }
690 Some(InputTerminalOutcome::Superseded { superseded_by }) => Ok((
691 Some(dsl::InputTerminalKind::Superseded),
692 Some(superseded_by.to_string()),
693 None,
694 None,
695 0,
696 )),
697 Some(InputTerminalOutcome::Coalesced { aggregate_id }) => Ok((
698 Some(dsl::InputTerminalKind::Coalesced),
699 None,
700 Some(aggregate_id.to_string()),
701 None,
702 0,
703 )),
704 Some(InputTerminalOutcome::Abandoned { reason }) => {
705 let abandon_attempt_count = match reason {
706 InputAbandonReason::MaxAttemptsExhausted { attempts } => u64::from(*attempts),
707 _ => u64::from(seed.attempt_count),
708 };
709 Ok((
710 Some(dsl::InputTerminalKind::Abandoned),
711 None,
712 None,
713 input_terminality_parts(seed.terminal_outcome.as_ref()).1,
714 abandon_attempt_count,
715 ))
716 }
717 }
718}
719
720fn input_terminality_parts(
721 outcome: Option<&InputTerminalOutcome>,
722) -> (
723 Option<dsl::InputTerminalKind>,
724 Option<dsl::InputAbandonReason>,
725) {
726 match outcome {
727 None => (None, None),
728 Some(InputTerminalOutcome::Consumed) => (Some(dsl::InputTerminalKind::Consumed), None),
729 Some(InputTerminalOutcome::Superseded { .. }) => {
730 (Some(dsl::InputTerminalKind::Superseded), None)
731 }
732 Some(InputTerminalOutcome::Coalesced { .. }) => {
733 (Some(dsl::InputTerminalKind::Coalesced), None)
734 }
735 Some(InputTerminalOutcome::Abandoned { reason }) => (
736 Some(dsl::InputTerminalKind::Abandoned),
737 Some(match reason {
738 InputAbandonReason::Retired => dsl::InputAbandonReason::Retired,
739 InputAbandonReason::Reset => dsl::InputAbandonReason::Reset,
740 InputAbandonReason::Stopped => dsl::InputAbandonReason::Stopped,
741 InputAbandonReason::Destroyed => dsl::InputAbandonReason::Destroyed,
742 InputAbandonReason::Cancelled => dsl::InputAbandonReason::Cancelled,
743 InputAbandonReason::MaxAttemptsExhausted { .. } => {
744 dsl::InputAbandonReason::MaxAttemptsExhausted
745 }
746 }),
747 ),
748 }
749}
750
751#[derive(Debug, Default)]
752struct UnavailableBlobStore;
753
754impl UnavailableBlobStore {
755 fn error() -> BlobStoreError {
756 BlobStoreError::Unsupported(
757 "persistent runtime constructed without blob store; blob-backed inputs require a BlobStore"
758 .to_string(),
759 )
760 }
761}
762
763#[cfg(not(target_arch = "wasm32"))]
764struct PersistentAuthAuthorityBundle {
765 store: StdMutex<Weak<dyn RuntimeStore>>,
766 auth_lease: Arc<crate::handles::RuntimeAuthLeaseHandle>,
767 oauth_flows: Arc<crate::handles::RuntimeOAuthFlowHandle>,
768}
769
770#[cfg(not(target_arch = "wasm32"))]
771#[derive(Debug, Clone, PartialEq, Eq, Hash)]
772enum PersistentAuthAuthorityKey {
773 Durable(String),
774 Process(usize),
775}
776
777#[cfg(not(target_arch = "wasm32"))]
778static PERSISTENT_AUTH_AUTHORITIES: OnceLock<
779 StdMutex<HashMap<PersistentAuthAuthorityKey, Arc<PersistentAuthAuthorityBundle>>>,
780> = OnceLock::new();
781
782#[cfg(not(target_arch = "wasm32"))]
783fn runtime_store_identity(store: &Arc<dyn RuntimeStore>) -> PersistentAuthAuthorityKey {
784 store
785 .auth_authority_key()
786 .map(PersistentAuthAuthorityKey::Durable)
787 .unwrap_or_else(|| {
788 PersistentAuthAuthorityKey::Process(Arc::as_ptr(store).cast::<()>() as usize)
789 })
790}
791
792fn runtime_stores_share_authority(a: &Arc<dyn RuntimeStore>, b: &Arc<dyn RuntimeStore>) -> bool {
793 match (a.auth_authority_key(), b.auth_authority_key()) {
794 (Some(a), Some(b)) => a == b,
795 _ => Arc::ptr_eq(a, b),
796 }
797}
798
799fn generated_runtime_auth_lease_handle(
800 handle: Arc<crate::handles::RuntimeAuthLeaseHandle>,
801) -> meerkat_core::handles::GeneratedAuthLeaseHandle {
802 #[allow(clippy::expect_used)]
803 crate::protocol_auth_lease_lifecycle_publication::generated_auth_lease_handle(handle)
804 .expect("runtime AuthLeaseHandle must be certified by generated AuthMachine authority")
805}
806
807#[cfg(not(target_arch = "wasm32"))]
808fn persistent_auth_authorities(
809 store: &Arc<dyn RuntimeStore>,
810) -> Arc<PersistentAuthAuthorityBundle> {
811 let key = runtime_store_identity(store);
812 let authorities = PERSISTENT_AUTH_AUTHORITIES.get_or_init(|| StdMutex::new(HashMap::new()));
813 let mut authorities = authorities
814 .lock()
815 .unwrap_or_else(std::sync::PoisonError::into_inner);
816 if let Some(existing) = authorities.get(&key) {
817 let stored_store_alive = existing
818 .store
819 .lock()
820 .unwrap_or_else(std::sync::PoisonError::into_inner)
821 .upgrade()
822 .is_some();
823 if matches!(key, PersistentAuthAuthorityKey::Durable(_)) || stored_store_alive {
824 existing.oauth_flows.bind_persistent_store(store);
825 *existing
826 .store
827 .lock()
828 .unwrap_or_else(std::sync::PoisonError::into_inner) = Arc::downgrade(store);
829 return Arc::clone(existing);
830 }
831 }
832 let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
833 let oauth_flows = Arc::new(
834 crate::handles::RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
835 std::time::Duration::from_secs(10 * 60),
836 Arc::clone(&auth_lease),
837 store,
838 ),
839 );
840 let bundle = Arc::new(PersistentAuthAuthorityBundle {
841 store: StdMutex::new(Arc::downgrade(store)),
842 auth_lease,
843 oauth_flows,
844 });
845 authorities.insert(key, Arc::clone(&bundle));
846 bundle
847}
848
849#[cfg(all(test, not(target_arch = "wasm32")))]
850pub(crate) fn clear_persistent_auth_authorities_for_test() {
851 if let Some(authorities) = PERSISTENT_AUTH_AUTHORITIES.get() {
852 authorities
853 .lock()
854 .unwrap_or_else(std::sync::PoisonError::into_inner)
855 .clear();
856 }
857}
858
859#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
860#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
861impl BlobStore for UnavailableBlobStore {
862 async fn put_image(&self, _media_type: &str, _data: &str) -> Result<BlobRef, BlobStoreError> {
863 Err(Self::error())
864 }
865
866 async fn get(&self, _blob_id: &BlobId) -> Result<BlobPayload, BlobStoreError> {
867 Err(Self::error())
868 }
869
870 async fn delete(&self, _blob_id: &BlobId) -> Result<(), BlobStoreError> {
871 Err(Self::error())
872 }
873
874 async fn exists(&self, _blob_id: &BlobId) -> Result<bool, BlobStoreError> {
875 Err(Self::error())
876 }
877
878 fn is_persistent(&self) -> bool {
879 false
880 }
881}
882
883#[cfg(not(target_arch = "wasm32"))]
884type MeerkatMachineCommandFuture<'a> = Pin<
885 Box<
886 dyn Future<Output = Result<MeerkatMachineCommandResult, MeerkatMachineCommandError>>
887 + Send
888 + 'a,
889 >,
890>;
891
892#[cfg(target_arch = "wasm32")]
893type MeerkatMachineCommandFuture<'a> = Pin<
894 Box<dyn Future<Output = Result<MeerkatMachineCommandResult, MeerkatMachineCommandError>> + 'a>,
895>;
896
897pub(crate) use driver::{
898 DriverEntry, SharedCompletionRegistry, SharedDriver, cancel_runtime_loop_run,
899 commit_runtime_loop_run, fail_machine_run, fail_runtime_loop_run,
900 machine_batch_primitive_projections, machine_batch_runtime_semantics,
901 machine_commit_prepared_destroy, machine_commit_service_turn_terminal_receipt,
902 machine_prepare_bindings_projection, machine_prepare_destroy, machine_recover_ephemeral_driver,
903 machine_recover_persistent_driver, machine_recycle_preserving_work, machine_reset,
904 machine_retire, machine_select_runtime_loop_batch, machine_stop_runtime,
905 prepare_runtime_loop_batch_start,
906};
907
908pub(crate) mod driver;
909
910mod comms_drain;
911pub mod composition;
912mod dispatch_control;
913mod dispatch_drain;
914mod dispatch_ingress;
915mod dispatch_session;
916#[allow(unused_variables, dead_code, clippy::cmp_owned)]
917#[allow(clippy::assign_op_pattern)]
918pub mod dsl;
919pub(crate) mod dsl_authority;
920mod dsl_effects;
921mod llm_reconfigure;
922mod runtime_control;
923mod session_management;
924mod traits;
925mod visibility;
926
927pub use composition::{MeerkatCompositionSignalDispatcher, MeerkatConsumerSurface};
928
929pub use comms_drain::{
930 CommsDrainMode, CommsDrainPhase, DrainExitReason, PeerEndpointStageError, PeerIngressOwner,
931 SupervisorBinding, SupervisorBindingStageError,
932};
933pub(crate) use comms_drain::{
934 CommsDrainSlot, SupervisorAuthorizeAdmission, SupervisorBindAdmission,
935 SupervisorBridgeCommandAdmission, abort_slot,
936};
937pub(crate) use dsl_effects::{DslTransitionEffects, apply_dsl_transition_on_authority};
938pub(crate) use visibility::MachineToolVisibilityOwner;
939
940struct StagedSessionDslInput {
941 previous_snapshot: dsl::MeerkatMachineAuthoritySnapshot,
942 committed_snapshot: dsl::MeerkatMachineAuthoritySnapshot,
943 effects: DslTransitionEffects,
944}
945
946#[derive(Clone, Copy)]
947enum CommittedEffectDispatchFailure {
948 PreserveCommittedDslState,
949}
950
951struct RuntimeSessionEntry {
953 runtime_id: LogicalRuntimeId,
955 mutation_gate: Arc<Mutex<()>>,
967 driver: SharedDriver,
969 control_projection: Arc<StdRwLock<crate::driver::ephemeral::RuntimeControlProjection>>,
975 ops_lifecycle: Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
977 epoch_id: meerkat_core::RuntimeEpochId,
979 handle_teardown_gate: Arc<crate::handles::HandleTeardownGate>,
984 cursor_state: Arc<meerkat_core::EpochCursorState>,
986 completions: SharedCompletionRegistry,
988 tool_visibility_owner: Arc<MachineToolVisibilityOwner>,
990 attachment_slot: RuntimeLoopAttachmentSlot,
995 provisional_interrupt_handle:
998 Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
999 dsl_authority: Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>,
1009 drain_slot: CommsDrainSlot,
1019}
1020
1021struct RuntimeLoopAttachment {
1026 wake_tx: mpsc::Sender<()>,
1027 effect_tx: mpsc::Sender<crate::effect::RuntimeEffect>,
1028 boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
1029 interrupt_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
1030 loop_handle: tokio::task::JoinHandle<()>,
1031}
1032
1033enum RuntimeLoopAttachmentSlot {
1035 Empty,
1036 Attached(RuntimeLoopAttachment),
1037}
1038
1039impl RuntimeSessionEntry {
1040 fn control_snapshot(&self) -> crate::driver::ephemeral::RuntimeControlProjection {
1041 self.control_projection
1042 .read()
1043 .map(|guard| guard.clone())
1044 .unwrap_or_else(|poisoned| {
1045 tracing::error!("runtime control projection lock poisoned");
1046 poisoned.into_inner().clone()
1047 })
1048 }
1049
1050 fn attachment_is_live(&self) -> bool {
1051 match &self.attachment_slot {
1052 RuntimeLoopAttachmentSlot::Attached(attachment) => {
1053 !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed()
1054 }
1055 RuntimeLoopAttachmentSlot::Empty => false,
1056 }
1057 }
1058
1059 fn generated_executor_registration_active(&self) -> bool {
1060 let authority = self
1061 .dsl_authority
1062 .lock()
1063 .unwrap_or_else(std::sync::PoisonError::into_inner);
1064 matches!(
1065 authority.state().registration_phase,
1066 dsl::RegistrationPhase::Active
1067 )
1068 }
1069
1070 fn close_handle_teardown_gate(&self) {
1071 let _guard = self
1072 .dsl_authority
1073 .lock()
1074 .unwrap_or_else(std::sync::PoisonError::into_inner);
1075 self.handle_teardown_gate.close();
1076 }
1077
1078 fn generated_executor_registration_active_or_draining(&self) -> bool {
1086 let authority = self
1087 .dsl_authority
1088 .lock()
1089 .unwrap_or_else(std::sync::PoisonError::into_inner);
1090 matches!(
1091 authority.state().registration_phase,
1092 dsl::RegistrationPhase::Active | dsl::RegistrationPhase::Draining
1093 )
1094 }
1095
1096 fn generated_stop_deferred(&self) -> bool {
1097 self.dsl_authority
1098 .lock()
1099 .unwrap_or_else(std::sync::PoisonError::into_inner)
1100 .state()
1101 .runtime_stop_deferred
1102 }
1103
1104 fn stage_generated_executor_registration_claim(
1105 &self,
1106 session_id: &SessionId,
1107 ) -> Result<StagedSessionDslInput, String> {
1108 let staged = MeerkatMachine::stage_dsl_transition_on_authority(
1109 &self.dsl_authority,
1110 dsl::MeerkatMachineInput::EnsureSessionWithExecutor {
1111 session_id: dsl::SessionId::from_domain(session_id),
1112 },
1113 "EnsureSessionWithExecutor",
1114 )?;
1115 if self.generated_executor_registration_active() {
1116 Ok(staged)
1117 } else {
1118 let mut authority = self
1119 .dsl_authority
1120 .lock()
1121 .unwrap_or_else(std::sync::PoisonError::into_inner);
1122 authority.restore_snapshot(staged.previous_snapshot);
1123 Err("generated MeerkatMachine did not grant active executor registration".into())
1124 }
1125 }
1126
1127 fn stage_generated_executor_exit_observation(&self) -> Result<StagedSessionDslInput, String> {
1128 MeerkatMachine::stage_runtime_internal_dsl_transition_on_authority(
1129 &self.dsl_authority,
1130 crate::meerkat_machine_types::MeerkatMachineFieldlessRuntimeInternalInput::RuntimeExecutorExited,
1131 )
1132 }
1133
1134 fn has_live_attachment(&self) -> bool {
1137 self.attachment_is_live()
1138 }
1139
1140 fn attach_runtime_loop(
1141 &mut self,
1142 wake_tx: mpsc::Sender<()>,
1143 effect_tx: mpsc::Sender<crate::effect::RuntimeEffect>,
1144 boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
1145 interrupt_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
1146 loop_handle: tokio::task::JoinHandle<()>,
1147 ) {
1148 self.provisional_interrupt_handle = None;
1149 self.attachment_slot = RuntimeLoopAttachmentSlot::Attached(RuntimeLoopAttachment {
1150 wake_tx,
1151 effect_tx,
1152 boundary_handle,
1153 interrupt_handle,
1154 loop_handle,
1155 });
1156 }
1157
1158 fn take_loop_join_handle(&mut self) -> Option<tokio::task::JoinHandle<()>> {
1166 match std::mem::replace(&mut self.attachment_slot, RuntimeLoopAttachmentSlot::Empty) {
1167 RuntimeLoopAttachmentSlot::Attached(attachment) => Some(attachment.loop_handle),
1168 RuntimeLoopAttachmentSlot::Empty => None,
1169 }
1170 }
1171
1172 fn clear_dead_attachment(&mut self) -> bool {
1173 if matches!(self.attachment_slot, RuntimeLoopAttachmentSlot::Attached(_))
1174 && !self.attachment_is_live()
1175 {
1176 self.attachment_slot = RuntimeLoopAttachmentSlot::Empty;
1177 return true;
1178 }
1179 false
1180 }
1181
1182 fn wake_sender(&self) -> Option<mpsc::Sender<()>> {
1183 match &self.attachment_slot {
1184 RuntimeLoopAttachmentSlot::Attached(attachment)
1185 if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
1186 {
1187 Some(attachment.wake_tx.clone())
1188 }
1189 _ => None,
1190 }
1191 }
1192
1193 fn effect_sender(&self) -> Option<mpsc::Sender<crate::effect::RuntimeEffect>> {
1194 match &self.attachment_slot {
1195 RuntimeLoopAttachmentSlot::Attached(attachment)
1196 if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
1197 {
1198 Some(attachment.effect_tx.clone())
1199 }
1200 _ => None,
1201 }
1202 }
1203
1204 fn boundary_handle(
1205 &self,
1206 ) -> Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>> {
1207 match &self.attachment_slot {
1208 RuntimeLoopAttachmentSlot::Attached(attachment)
1209 if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
1210 {
1211 attachment.boundary_handle.clone()
1212 }
1213 _ => None,
1214 }
1215 }
1216
1217 fn interrupt_handle(
1218 &self,
1219 ) -> Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>> {
1220 match &self.attachment_slot {
1221 RuntimeLoopAttachmentSlot::Attached(attachment)
1222 if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
1223 {
1224 attachment.interrupt_handle.clone()
1225 }
1226 _ => self.provisional_interrupt_handle.clone(),
1227 }
1228 }
1229
1230 fn install_provisional_interrupt_handle(
1231 &mut self,
1232 handle: Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>,
1233 ) {
1234 if !self.attachment_is_live() {
1235 self.provisional_interrupt_handle = Some(handle);
1236 }
1237 }
1238}
1239
1240impl MeerkatMachine {
1241 async fn session_mutation_gate(&self, session_id: &SessionId) -> Option<Arc<Mutex<()>>> {
1247 let sessions = self.sessions.read().await;
1248 sessions
1249 .get(session_id)
1250 .map(|entry| Arc::clone(&entry.mutation_gate))
1251 }
1252
1253 async fn lock_current_session_mutation_gate(
1254 &self,
1255 session_id: &SessionId,
1256 ) -> Option<crate::tokio::sync::OwnedMutexGuard<()>> {
1257 loop {
1258 let gate = self.session_mutation_gate(session_id).await?;
1259 let gate_guard = Arc::clone(&gate).lock_owned().await;
1260 let sessions = self.sessions.read().await;
1261 let entry = sessions.get(session_id)?;
1262 if Arc::ptr_eq(&entry.mutation_gate, &gate) {
1263 return Some(gate_guard);
1264 }
1265 }
1266 }
1267
1268 pub(crate) async fn lock_current_session_driver_gate(
1269 &self,
1270 session_id: &SessionId,
1271 driver: &SharedDriver,
1272 ) -> Result<crate::tokio::sync::OwnedMutexGuard<()>, RuntimeDriverError> {
1273 let gate_guard = self
1274 .lock_current_session_mutation_gate(session_id)
1275 .await
1276 .ok_or(RuntimeDriverError::NotReady {
1277 state: RuntimeState::Destroyed,
1278 })?;
1279 {
1280 let sessions = self.sessions.read().await;
1281 let entry = sessions
1282 .get(session_id)
1283 .ok_or(RuntimeDriverError::NotReady {
1284 state: RuntimeState::Destroyed,
1285 })?;
1286 if !Arc::ptr_eq(&entry.driver, driver) {
1287 return Err(RuntimeDriverError::NotReady {
1288 state: RuntimeState::Destroyed,
1289 });
1290 }
1291 }
1292 Ok(gate_guard)
1293 }
1294
1295 pub(crate) async fn lock_current_runtime_loop_driver_authority(
1296 &self,
1297 session_id: &SessionId,
1298 driver: &SharedDriver,
1299 ) -> Result<crate::tokio::sync::OwnedMutexGuard<()>, RuntimeDriverError> {
1300 let gate_guard = self
1301 .lock_current_session_driver_gate(session_id, driver)
1302 .await?;
1303 {
1304 let sessions = self.sessions.read().await;
1305 let entry = sessions
1306 .get(session_id)
1307 .ok_or(RuntimeDriverError::NotReady {
1308 state: RuntimeState::Destroyed,
1309 })?;
1310 if !entry.generated_executor_registration_active_or_draining() {
1311 return Err(RuntimeDriverError::ValidationFailed {
1312 reason:
1313 "generated MeerkatMachine has no active runtime-loop executor registration"
1314 .to_string(),
1315 });
1316 }
1317 }
1318 Ok(gate_guard)
1319 }
1320
1321 async fn current_session_driver_with_authority(
1322 &self,
1323 session_id: &SessionId,
1324 ) -> Result<(SharedDriver, crate::tokio::sync::OwnedMutexGuard<()>), RuntimeDriverError> {
1325 let gate_guard = self
1326 .lock_current_session_mutation_gate(session_id)
1327 .await
1328 .ok_or(RuntimeDriverError::NotReady {
1329 state: RuntimeState::Destroyed,
1330 })?;
1331 let driver = {
1332 let sessions = self.sessions.read().await;
1333 sessions
1334 .get(session_id)
1335 .ok_or(RuntimeDriverError::NotReady {
1336 state: RuntimeState::Destroyed,
1337 })?
1338 .driver
1339 .clone()
1340 };
1341 Ok((driver, gate_guard))
1342 }
1343
1344 async fn session_dsl_authority(
1345 &self,
1346 session_id: &SessionId,
1347 ) -> Result<Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>, String> {
1348 let sessions = self.sessions.read().await;
1349 sessions
1350 .get(session_id)
1351 .map(|entry| Arc::clone(&entry.dsl_authority))
1352 .ok_or_else(|| {
1353 RuntimeDriverError::NotReady {
1354 state: RuntimeState::Destroyed,
1355 }
1356 .to_string()
1357 })
1358 }
1359
1360 #[cfg(any(test, feature = "test-support"))]
1361 async fn session_handle_teardown_gate(
1362 &self,
1363 session_id: &SessionId,
1364 ) -> Result<Arc<crate::handles::HandleTeardownGate>, String> {
1365 let sessions = self.sessions.read().await;
1366 sessions
1367 .get(session_id)
1368 .map(|entry| Arc::clone(&entry.handle_teardown_gate))
1369 .ok_or_else(|| {
1370 RuntimeDriverError::NotReady {
1371 state: RuntimeState::Destroyed,
1372 }
1373 .to_string()
1374 })
1375 }
1376
1377 #[cfg(any(test, feature = "test-support"))]
1385 pub async fn test_install_session_peer_comms_handle_on_runtime(
1386 &self,
1387 session_id: &SessionId,
1388 runtime: &(dyn meerkat_core::handles::PeerCommsInstallTarget + '_),
1389 ) -> Result<(), String> {
1390 let dsl = self
1391 .session_dsl_authority(session_id)
1392 .await
1393 .map_err(|error| format!("session dsl authority unavailable: {error}"))?;
1394 let teardown_gate = self
1395 .session_handle_teardown_gate(session_id)
1396 .await
1397 .map_err(|error| format!("session handle teardown gate unavailable: {error}"))?;
1398 let handle = std::sync::Arc::new(
1399 crate::handles::HandleDslAuthority::from_shared_with_teardown_gate(dsl, teardown_gate),
1400 );
1401 crate::handles::RuntimePeerCommsHandle::install_generated_on(handle, runtime)
1402 }
1403
1404 fn preview_dsl_input_on_state(
1405 state: &dsl::MeerkatMachineState,
1406 input: dsl::MeerkatMachineInput,
1407 context: &str,
1408 ) -> Result<Vec<dsl::MeerkatMachineEffect>, String> {
1409 let mut preview = dsl::MeerkatMachineAuthority::recover_from_state(state.clone())
1410 .map_err(|err| dsl_authority::map_error(err, context))?;
1411 dsl::MeerkatMachineMutator::apply(&mut preview, input)
1412 .map(|transition| transition.into_effects())
1413 .map_err(|err| dsl_authority::map_error(err, context))
1414 }
1415
1416 async fn preview_session_dsl_input(
1417 &self,
1418 session_id: &SessionId,
1419 input: dsl::MeerkatMachineInput,
1420 context: &str,
1421 ) -> Result<Vec<dsl::MeerkatMachineEffect>, String> {
1422 let authority = self.session_dsl_authority(session_id).await?;
1423 let state = {
1424 let authority = authority
1425 .lock()
1426 .unwrap_or_else(std::sync::PoisonError::into_inner);
1427 authority.state().clone()
1428 };
1429 Self::preview_dsl_input_on_state(&state, input, context)
1430 }
1431
1432 async fn session_dsl_state(
1433 &self,
1434 session_id: &SessionId,
1435 ) -> Result<dsl::MeerkatMachineState, RuntimeControlPlaneError> {
1436 let authority = self
1437 .session_dsl_authority(session_id)
1438 .await
1439 .map_err(RuntimeControlPlaneError::Internal)?;
1440 let authority = authority
1441 .lock()
1442 .unwrap_or_else(std::sync::PoisonError::into_inner);
1443 Ok(authority.state().clone())
1444 }
1445
1446 async fn commit_session_dsl_transition(
1447 &self,
1448 session_id: &SessionId,
1449 staged: StagedSessionDslInput,
1450 context: &str,
1451 ) -> Result<(), String> {
1452 self.commit_session_dsl_transition_with_dispatch_failure(
1453 session_id,
1454 staged,
1455 context,
1456 CommittedEffectDispatchFailure::PreserveCommittedDslState,
1457 )
1458 .await
1459 }
1460
1461 async fn commit_session_dsl_transition_preserving_committed_state(
1462 &self,
1463 session_id: &SessionId,
1464 staged: StagedSessionDslInput,
1465 context: &str,
1466 ) -> Result<(), String> {
1467 self.commit_session_dsl_transition_with_dispatch_failure(
1468 session_id,
1469 staged,
1470 context,
1471 CommittedEffectDispatchFailure::PreserveCommittedDslState,
1472 )
1473 .await
1474 }
1475
1476 async fn commit_session_dsl_transition_with_dispatch_failure(
1477 &self,
1478 _session_id: &SessionId,
1479 staged: StagedSessionDslInput,
1480 context: &str,
1481 dispatch_failure: CommittedEffectDispatchFailure,
1482 ) -> Result<(), String> {
1483 if let Err(error) = self
1484 .dispatch_routed_signals_from_effects(&staged.effects)
1485 .await
1486 {
1487 let CommittedEffectDispatchFailure::PreserveCommittedDslState = dispatch_failure;
1488 return Err(format!(
1489 "DSL authority ({context}): committed effect dispatch failed: {error}"
1490 ));
1491 }
1492 Ok(())
1493 }
1494
1495 async fn dispatch_routed_signals_from_effects(
1496 &self,
1497 effects: &[dsl::MeerkatMachineEffect],
1498 ) -> Result<(), String> {
1499 let dispatcher = {
1500 self.composition_signal_dispatcher
1501 .read()
1502 .unwrap_or_else(std::sync::PoisonError::into_inner)
1503 .clone()
1504 };
1505 let Some(dispatcher) = dispatcher else {
1506 return Ok(());
1507 };
1508
1509 for effect in effects {
1510 if let Some(signal) = composition::lift_routed_signal(effect) {
1511 composition::dispatch_routed_signal(&dispatcher, signal).await?;
1512 }
1513 }
1514 Ok(())
1515 }
1516
1517 async fn clear_dead_runtime_attachment(&self, session_id: &SessionId) {
1518 let mut sessions = self.sessions.write().await;
1519 if let Some(entry) = sessions.get_mut(session_id) {
1520 let cleared = entry.clear_dead_attachment();
1521 if cleared && let Err(error) = entry.stage_generated_executor_exit_observation() {
1522 tracing::warn!(
1523 %session_id,
1524 error = %error,
1525 "generated MeerkatMachine rejected executor-exit observation while clearing dead attachment"
1526 );
1527 }
1528 }
1529 }
1530
1531 async fn dispatch_cancel_after_boundary_runtime_effect(
1532 &self,
1533 session_id: &SessionId,
1534 effect_tx: Option<mpsc::Sender<crate::effect::RuntimeEffect>>,
1535 boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
1536 projected_effect: crate::effect::ProjectedRuntimeEffect,
1537 context: &str,
1538 ) -> Result<(), RuntimeDriverError> {
1539 let Some(effect_tx) = effect_tx else {
1540 let state = self
1541 .existing_session_runtime_state(session_id)
1542 .await
1543 .unwrap_or(RuntimeState::Destroyed);
1544 return Err(RuntimeDriverError::NotReady { state });
1545 };
1546
1547 let reason = projected_effect.reason().to_string();
1548 if let Some(boundary_handle) = boundary_handle {
1549 boundary_handle
1550 .cancel_after_boundary(reason)
1551 .await
1552 .map_err(|err| {
1553 RuntimeDriverError::Internal(format!(
1554 "{context}: failed to apply live boundary cancel: {err}"
1555 ))
1556 })?;
1557 }
1558
1559 match effect_tx.send(projected_effect.into_effect()).await {
1560 Ok(()) => Ok(()),
1561 Err(_) => {
1562 self.clear_dead_runtime_attachment(session_id).await;
1563 Err(RuntimeDriverError::NotReady {
1564 state: RuntimeState::Idle,
1565 })
1566 }
1567 }
1568 }
1569
1570 async fn restore_session_dsl_state(
1571 &self,
1572 session_id: &SessionId,
1573 snapshot: dsl::MeerkatMachineAuthoritySnapshot,
1574 ) {
1575 if let Ok(authority) = self.session_dsl_authority(session_id).await {
1576 Self::restore_dsl_authority_snapshot(&authority, snapshot);
1577 }
1578 }
1579
1580 async fn restore_session_dsl_state_if_current(
1581 &self,
1582 session_id: &SessionId,
1583 expected_current: dsl::MeerkatMachineAuthoritySnapshot,
1584 restore: dsl::MeerkatMachineAuthoritySnapshot,
1585 ) -> bool {
1586 let Ok(authority) = self.session_dsl_authority(session_id).await else {
1587 return false;
1588 };
1589 Self::restore_dsl_authority_snapshot_if_current(&authority, expected_current, restore)
1590 }
1591
1592 fn restore_dsl_authority_snapshot(
1593 authority: &Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>,
1594 snapshot: dsl::MeerkatMachineAuthoritySnapshot,
1595 ) {
1596 let mut authority = authority
1597 .lock()
1598 .unwrap_or_else(std::sync::PoisonError::into_inner);
1599 authority.restore_snapshot(snapshot);
1600 }
1601
1602 fn restore_dsl_authority_snapshot_if_current(
1603 authority: &Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>,
1604 expected_current: dsl::MeerkatMachineAuthoritySnapshot,
1605 restore: dsl::MeerkatMachineAuthoritySnapshot,
1606 ) -> bool {
1607 let mut authority = authority
1608 .lock()
1609 .unwrap_or_else(std::sync::PoisonError::into_inner);
1610 let current = authority.snapshot();
1611 if current.state() == expected_current.state() {
1612 authority.restore_snapshot(restore);
1613 true
1614 } else {
1615 false
1616 }
1617 }
1618}
1619
1620#[derive(Debug, Clone, Copy)]
1623pub struct MachineSessionControlAuthority {
1624 _private: (),
1625}
1626
1627#[cfg(feature = "live")]
1628struct LiveOpenAdmissionGeneratedAuthorityBridgeToken;
1629
1630#[cfg(feature = "live")]
1631struct LiveCloseResultGeneratedAuthorityBridgeToken;
1632
1633#[cfg(feature = "live")]
1634struct LiveChannelStatusResultGeneratedAuthorityBridgeToken;
1635
1636#[cfg(feature = "live")]
1637static LIVE_OPEN_ADMISSION_GENERATED_AUTHORITY_BRIDGE_TOKEN:
1638 LiveOpenAdmissionGeneratedAuthorityBridgeToken = LiveOpenAdmissionGeneratedAuthorityBridgeToken;
1639
1640#[cfg(feature = "live")]
1641static LIVE_CLOSE_RESULT_GENERATED_AUTHORITY_BRIDGE_TOKEN:
1642 LiveCloseResultGeneratedAuthorityBridgeToken = LiveCloseResultGeneratedAuthorityBridgeToken;
1643
1644#[cfg(feature = "live")]
1645static LIVE_CHANNEL_STATUS_RESULT_GENERATED_AUTHORITY_BRIDGE_TOKEN:
1646 LiveChannelStatusResultGeneratedAuthorityBridgeToken =
1647 LiveChannelStatusResultGeneratedAuthorityBridgeToken;
1648
1649#[cfg(feature = "live")]
1650fn live_open_admission_generated_authority_bridge_token()
1651-> &'static (dyn std::any::Any + Send + Sync) {
1652 &LIVE_OPEN_ADMISSION_GENERATED_AUTHORITY_BRIDGE_TOKEN
1653}
1654
1655#[cfg(feature = "live")]
1656fn live_close_result_generated_authority_bridge_token() -> &'static (dyn std::any::Any + Send + Sync)
1657{
1658 &LIVE_CLOSE_RESULT_GENERATED_AUTHORITY_BRIDGE_TOKEN
1659}
1660
1661#[cfg(feature = "live")]
1662fn live_channel_status_result_generated_authority_bridge_token()
1663-> &'static (dyn std::any::Any + Send + Sync) {
1664 &LIVE_CHANNEL_STATUS_RESULT_GENERATED_AUTHORITY_BRIDGE_TOKEN
1665}
1666
1667#[cfg(feature = "live")]
1668#[doc(hidden)]
1669#[allow(improper_ctypes_definitions, unsafe_code)]
1670#[unsafe(export_name = concat!(
1671 "__meerkat_runtime_generated_authority_bridge_token_is_valid_v1_live_open_admission_",
1672 env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
1673))]
1674pub extern "Rust" fn live_open_admission_generated_authority_bridge_token_is_valid(
1675 token: &(dyn std::any::Any + Send + Sync),
1676) -> bool {
1677 token.is::<LiveOpenAdmissionGeneratedAuthorityBridgeToken>()
1678}
1679
1680#[cfg(feature = "live")]
1681#[doc(hidden)]
1682#[allow(improper_ctypes_definitions, unsafe_code)]
1683#[unsafe(export_name = concat!(
1684 "__meerkat_runtime_generated_authority_bridge_token_is_valid_v1_live_close_result_",
1685 env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
1686))]
1687pub extern "Rust" fn live_close_result_generated_authority_bridge_token_is_valid(
1688 token: &(dyn std::any::Any + Send + Sync),
1689) -> bool {
1690 token.is::<LiveCloseResultGeneratedAuthorityBridgeToken>()
1691}
1692
1693#[cfg(feature = "live")]
1694#[doc(hidden)]
1695#[allow(improper_ctypes_definitions, unsafe_code)]
1696#[unsafe(export_name = concat!(
1697 "__meerkat_runtime_generated_authority_bridge_token_is_valid_v1_live_channel_status_result_",
1698 env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
1699))]
1700pub extern "Rust" fn live_channel_status_result_generated_authority_bridge_token_is_valid(
1701 token: &(dyn std::any::Any + Send + Sync),
1702) -> bool {
1703 token.is::<LiveChannelStatusResultGeneratedAuthorityBridgeToken>()
1704}
1705
1706#[cfg(feature = "live")]
1707fn build_live_channel_open_authority(
1708 session_id: SessionId,
1709 channel_id: meerkat_live::LiveChannelId,
1710 sequence: u64,
1711) -> Result<meerkat_live::LiveChannelOpenAuthority, String> {
1712 #[allow(improper_ctypes_definitions, unsafe_code)]
1713 unsafe extern "Rust" {
1714 #[link_name = concat!(
1715 "__meerkat_live_runtime_generated_live_channel_open_authority_build_v1_",
1716 env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
1717 )]
1718 fn live_generated_channel_open_authority_build(
1719 token: &'static (dyn std::any::Any + Send + Sync),
1720 session_id: SessionId,
1721 channel_id: meerkat_live::LiveChannelId,
1722 sequence: u64,
1723 ) -> Result<meerkat_live::LiveChannelOpenAuthority, String>;
1724 }
1725 #[allow(unsafe_code)]
1726 unsafe {
1727 live_generated_channel_open_authority_build(
1728 live_open_admission_generated_authority_bridge_token(),
1729 session_id,
1730 channel_id,
1731 sequence,
1732 )
1733 }
1734}
1735
1736#[cfg(feature = "live")]
1737fn build_live_channel_close_commit_authority(
1738 channel_id: String,
1739 close_sequence: u64,
1740) -> Result<meerkat_live::LiveChannelCloseCommitAuthority, String> {
1741 #[allow(improper_ctypes_definitions, unsafe_code)]
1742 unsafe extern "Rust" {
1743 #[link_name = concat!(
1744 "__meerkat_live_runtime_generated_live_channel_close_commit_authority_build_v1_",
1745 env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
1746 )]
1747 fn live_generated_channel_close_commit_authority_build(
1748 token: &'static (dyn std::any::Any + Send + Sync),
1749 channel_id: String,
1750 close_sequence: u64,
1751 ) -> Result<meerkat_live::LiveChannelCloseCommitAuthority, String>;
1752 }
1753 #[allow(unsafe_code)]
1754 unsafe {
1755 live_generated_channel_close_commit_authority_build(
1756 live_close_result_generated_authority_bridge_token(),
1757 channel_id,
1758 close_sequence,
1759 )
1760 }
1761}
1762
1763#[cfg(feature = "live")]
1764fn build_live_channel_status_commit_authority(
1765 channel_id: String,
1766 status_observation_sequence: u64,
1767) -> Result<meerkat_live::LiveChannelStatusCommitAuthority, String> {
1768 #[allow(improper_ctypes_definitions, unsafe_code)]
1769 unsafe extern "Rust" {
1770 #[link_name = concat!(
1771 "__meerkat_live_runtime_generated_live_channel_status_commit_authority_build_v1_",
1772 env!("MEERKAT_GENERATED_AUTHORITY_BRIDGE_SYMBOL_SUFFIX")
1773 )]
1774 fn live_generated_channel_status_commit_authority_build(
1775 token: &'static (dyn std::any::Any + Send + Sync),
1776 channel_id: String,
1777 status_observation_sequence: u64,
1778 ) -> Result<meerkat_live::LiveChannelStatusCommitAuthority, String>;
1779 }
1780 #[allow(unsafe_code)]
1781 unsafe {
1782 live_generated_channel_status_commit_authority_build(
1783 live_channel_status_result_generated_authority_bridge_token(),
1784 channel_id,
1785 status_observation_sequence,
1786 )
1787 }
1788}
1789
1790#[derive(Debug, Clone)]
1797#[cfg(feature = "live")]
1798pub struct LiveOpenAdmissionAuthority {
1799 session_id: SessionId,
1800 channel_id: meerkat_live::LiveChannelId,
1801 admitted: bool,
1802 rejection: Option<dsl::LiveOpenAdmissionRejection>,
1803 bound_llm_identity: Option<meerkat_core::SessionLlmIdentity>,
1804 sequence: u64,
1805 channel_open_authority: Option<meerkat_live::LiveChannelOpenAuthority>,
1806}
1807
1808#[cfg(feature = "live")]
1809impl LiveOpenAdmissionAuthority {
1810 pub(crate) fn from_generated_effect(
1811 session_id: SessionId,
1812 channel_id: meerkat_live::LiveChannelId,
1813 admitted: bool,
1814 rejection: Option<dsl::LiveOpenAdmissionRejection>,
1815 bound_llm_identity: Option<dsl::SessionLlmIdentity>,
1816 sequence: u64,
1817 ) -> Result<Self, String> {
1818 let bound_llm_identity = match (admitted, bound_llm_identity) {
1819 (true, Some(identity)) => Some(identity.try_into()?),
1820 (true, None) => {
1821 return Err(
1822 "generated live-open admission was admitted without bound LLM identity"
1823 .to_string(),
1824 );
1825 }
1826 (false, _) => None,
1827 };
1828 let channel_open_authority = if admitted {
1829 Some(build_live_channel_open_authority(
1830 session_id.clone(),
1831 channel_id.clone(),
1832 sequence,
1833 )?)
1834 } else {
1835 None
1836 };
1837 Ok(Self {
1838 session_id,
1839 channel_id,
1840 admitted,
1841 rejection,
1842 bound_llm_identity,
1843 sequence,
1844 channel_open_authority,
1845 })
1846 }
1847
1848 #[must_use]
1849 pub fn session_id(&self) -> &SessionId {
1850 &self.session_id
1851 }
1852
1853 #[must_use]
1854 pub fn channel_id(&self) -> &meerkat_live::LiveChannelId {
1855 &self.channel_id
1856 }
1857
1858 #[must_use]
1859 pub fn admitted(&self) -> bool {
1860 self.admitted
1861 }
1862
1863 #[must_use]
1864 pub fn rejection(&self) -> Option<dsl::LiveOpenAdmissionRejection> {
1865 self.rejection
1866 }
1867
1868 #[must_use]
1869 pub fn bound_llm_identity(&self) -> Option<&meerkat_core::SessionLlmIdentity> {
1870 self.bound_llm_identity.as_ref()
1871 }
1872
1873 #[must_use]
1874 pub fn sequence(&self) -> u64 {
1875 self.sequence
1876 }
1877
1878 #[must_use]
1879 pub fn channel_open_authority(&self) -> Option<&meerkat_live::LiveChannelOpenAuthority> {
1880 self.channel_open_authority.as_ref()
1881 }
1882}
1883
1884#[derive(Debug, Clone, PartialEq, Eq)]
1891#[cfg(feature = "live")]
1892pub struct LiveRefreshResultAuthority {
1893 pub status: dsl::LiveRefreshPublicStatus,
1894 pub sequence: u64,
1895 pub queue_acceptance_sequence: u64,
1896}
1897
1898#[derive(Debug, Clone)]
1903#[cfg(feature = "live")]
1904pub struct LiveCloseResultAuthority {
1905 pub status: dsl::LiveClosePublicStatus,
1906 pub sequence: u64,
1907 pub close_observation_sequence: u64,
1908 channel_close_commit_authority: Option<meerkat_live::LiveChannelCloseCommitAuthority>,
1909}
1910
1911#[cfg(feature = "live")]
1912impl LiveCloseResultAuthority {
1913 pub(crate) fn from_generated_effect(
1914 channel_id: String,
1915 status: dsl::LiveClosePublicStatus,
1916 sequence: u64,
1917 close_observation_sequence: u64,
1918 ) -> Result<Self, String> {
1919 let channel_close_commit_authority = match status {
1920 dsl::LiveClosePublicStatus::Closed => Some(build_live_channel_close_commit_authority(
1921 channel_id,
1922 close_observation_sequence,
1923 )?),
1924 };
1925 Ok(Self {
1926 status,
1927 sequence,
1928 close_observation_sequence,
1929 channel_close_commit_authority,
1930 })
1931 }
1932
1933 #[must_use]
1934 pub fn channel_close_commit_authority(
1935 &self,
1936 ) -> Option<&meerkat_live::LiveChannelCloseCommitAuthority> {
1937 self.channel_close_commit_authority.as_ref()
1938 }
1939
1940 #[must_use]
1941 pub fn into_channel_close_commit_authority(
1942 self,
1943 ) -> Option<meerkat_live::LiveChannelCloseCommitAuthority> {
1944 self.channel_close_commit_authority
1945 }
1946}
1947
1948#[derive(Debug, Clone, PartialEq, Eq)]
1954#[cfg(feature = "live")]
1955pub struct LiveCommandResultAuthority {
1956 pub command: dsl::LiveCommandPublicKind,
1957 pub sequence: u64,
1958 pub command_acceptance_sequence: u64,
1959}
1960
1961#[derive(Debug, Clone, PartialEq, Eq)]
1968#[cfg(feature = "live")]
1969pub struct LiveCommandRejectionAuthority {
1970 pub command: dsl::LiveCommandPublicKind,
1971 pub rejection: dsl::LiveCommandRejectionReason,
1972 pub public_error_class: dsl::LiveCommandRejectionPublicErrorClass,
1973 pub sequence: u64,
1974}
1975
1976#[derive(Debug, Clone, PartialEq, Eq)]
1983#[cfg(feature = "live")]
1984pub struct LiveChannelRequestRejectionAuthority {
1985 pub request: dsl::LiveChannelRequestPublicKind,
1986 pub rejection: dsl::LiveChannelRequestRejectionReason,
1987 pub public_error_class: dsl::LiveChannelRequestRejectionPublicErrorClass,
1988 pub sequence: u64,
1989}
1990
1991#[derive(Debug, Clone, PartialEq, Eq)]
1998#[cfg(feature = "live")]
1999pub struct LiveWebrtcTokenAuthority {
2000 pub token: String,
2001 pub expires_at_ms: u64,
2002 pub sequence: u64,
2003}
2004
2005#[derive(Debug, Clone, PartialEq, Eq)]
2011#[cfg(feature = "live")]
2012pub struct LiveWebrtcAnswerAdmissionAuthority {
2013 pub admitted: bool,
2014 pub rejection: Option<dsl::LiveWebrtcAnswerAdmissionRejection>,
2015 pub public_error_class: Option<dsl::LiveChannelRequestRejectionPublicErrorClass>,
2016 pub sequence: u64,
2017}
2018
2019#[derive(Debug, Clone, PartialEq, Eq)]
2026#[cfg(feature = "live")]
2027pub struct LiveWebrtcAnswerResultAuthority {
2028 pub status: dsl::LiveWebrtcAnswerPublicStatus,
2029 pub answered: bool,
2030 pub sequence: u64,
2031 pub answer_observation_sequence: u64,
2032}
2033
2034#[derive(Debug, Clone, PartialEq, Eq)]
2041#[cfg(feature = "live")]
2042pub struct LiveWebsocketTokenAuthority {
2043 pub token: String,
2044 pub expires_at_ms: u64,
2045 pub sequence: u64,
2046}
2047
2048#[derive(Debug, Clone, PartialEq, Eq)]
2054#[cfg(feature = "live")]
2055pub struct LiveWebsocketTokenAdmissionAuthority {
2056 pub admitted: bool,
2057 pub rejection: Option<dsl::LiveWebsocketTokenAdmissionRejection>,
2058 pub public_error_class: Option<dsl::LiveWebsocketTokenAdmissionPublicErrorClass>,
2059 pub sequence: u64,
2060}
2061
2062#[derive(Debug, Clone)]
2068#[cfg(feature = "live")]
2069pub struct LiveChannelStatusAuthority {
2070 pub status: dsl::LiveChannelPublicStatus,
2071 pub sequence: u64,
2072 pub status_observation_sequence: u64,
2073 pub degradation_reason: Option<dsl::LiveChannelDegradationReason>,
2074 pub degradation_detail: Option<String>,
2075 pub channel_status_commit_authority: Option<meerkat_live::LiveChannelStatusCommitAuthority>,
2076}
2077
2078#[cfg(feature = "live")]
2079impl LiveChannelStatusAuthority {
2080 pub(crate) fn from_generated_effect(
2081 channel_id: String,
2082 status: dsl::LiveChannelPublicStatus,
2083 sequence: u64,
2084 status_observation_sequence: u64,
2085 degradation_reason: Option<dsl::LiveChannelDegradationReason>,
2086 degradation_detail: Option<String>,
2087 ) -> Result<Self, String> {
2088 Ok(Self {
2089 status,
2090 sequence,
2091 status_observation_sequence,
2092 degradation_reason,
2093 degradation_detail,
2094 channel_status_commit_authority: Some(build_live_channel_status_commit_authority(
2095 channel_id,
2096 status_observation_sequence,
2097 )?),
2098 })
2099 }
2100
2101 #[must_use]
2102 pub fn channel_status_commit_authority(
2103 &self,
2104 ) -> Option<&meerkat_live::LiveChannelStatusCommitAuthority> {
2105 self.channel_status_commit_authority.as_ref()
2106 }
2107
2108 #[must_use]
2109 pub fn into_channel_status_commit_authority(
2110 self,
2111 ) -> Option<meerkat_live::LiveChannelStatusCommitAuthority> {
2112 self.channel_status_commit_authority
2113 }
2114}
2115
2116pub struct MeerkatMachine {
2123 sessions: RwLock<HashMap<SessionId, RuntimeSessionEntry>>,
2125 store: Option<Arc<dyn RuntimeStore>>,
2127 blob_store: Option<Arc<dyn BlobStore>>,
2129 llm_reconfigure_host: StdRwLock<Option<Arc<dyn SessionLlmReconfigureHost>>>,
2131 auth_lease: StdRwLock<meerkat_core::handles::GeneratedAuthLeaseHandle>,
2134 #[cfg(not(target_arch = "wasm32"))]
2137 oauth_flows: StdRwLock<Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority>>,
2138 #[cfg(feature = "live")]
2142 live_unbound_rejection_authority: crate::driver::ephemeral::SharedIngressDslAuthority,
2143 session_claims: Arc<crate::handles::RuntimeSessionClaimRegistry>,
2150 composition_signal_dispatcher:
2154 StdRwLock<Option<composition::MeerkatCompositionSignalDispatcher>>,
2155}
2156
2157impl MeerkatMachine {
2158 #[must_use]
2161 pub fn session_control_authority(&self) -> MachineSessionControlAuthority {
2162 MachineSessionControlAuthority { _private: () }
2163 }
2164
2165 #[must_use]
2170 pub fn shares_runtime_persistence_with(&self, other: &Self) -> bool {
2171 match (&self.store, &other.store) {
2172 (None, None) => true,
2173 (Some(a), Some(b)) => runtime_stores_share_authority(a, b),
2174 _ => false,
2175 }
2176 }
2177
2178 #[must_use]
2181 pub fn shares_runtime_store_authority(&self, store: &Arc<dyn RuntimeStore>) -> bool {
2182 self.store
2183 .as_ref()
2184 .is_some_and(|machine_store| runtime_stores_share_authority(machine_store, store))
2185 }
2186
2187 #[must_use]
2189 pub fn has_runtime_persistence(&self) -> bool {
2190 self.store.is_some()
2191 }
2192
2193 fn normalize_destroyed_error(err: RuntimeDriverError) -> RuntimeDriverError {
2194 match err {
2195 RuntimeDriverError::NotReady {
2196 state: RuntimeState::Destroyed,
2197 } => RuntimeDriverError::Destroyed,
2198 other => other,
2199 }
2200 }
2201
2202 pub fn ephemeral() -> Self {
2204 let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
2205 #[cfg(not(target_arch = "wasm32"))]
2206 let oauth_flows = Arc::new(crate::handles::RuntimeOAuthFlowHandle::new_with_auth_lease(
2207 std::time::Duration::from_secs(10 * 60),
2208 Arc::clone(&auth_lease),
2209 ));
2210 let auth_lease = generated_runtime_auth_lease_handle(auth_lease);
2211 Self {
2212 sessions: RwLock::new(HashMap::new()),
2213 store: None,
2214 blob_store: None,
2215 llm_reconfigure_host: StdRwLock::new(None),
2216 auth_lease: StdRwLock::new(auth_lease),
2217 #[cfg(not(target_arch = "wasm32"))]
2218 oauth_flows: StdRwLock::new(oauth_flows),
2219 #[cfg(feature = "live")]
2220 live_unbound_rejection_authority: live_unbound_rejection_authority(),
2221 session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
2222 composition_signal_dispatcher: StdRwLock::new(None),
2223 }
2224 }
2225
2226 pub fn persistent(store: Arc<dyn RuntimeStore>, blob_store: Arc<dyn BlobStore>) -> Self {
2228 #[cfg(not(target_arch = "wasm32"))]
2229 let (auth_lease, oauth_flows) = {
2230 let authorities = persistent_auth_authorities(&store);
2231 (
2232 Arc::clone(&authorities.auth_lease),
2233 Arc::clone(&authorities.oauth_flows),
2234 )
2235 };
2236 #[cfg(target_arch = "wasm32")]
2237 let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
2238 let auth_lease = generated_runtime_auth_lease_handle(auth_lease);
2239 Self {
2240 sessions: RwLock::new(HashMap::new()),
2241 store: Some(store),
2242 blob_store: Some(blob_store),
2243 llm_reconfigure_host: StdRwLock::new(None),
2244 auth_lease: StdRwLock::new(auth_lease),
2245 #[cfg(not(target_arch = "wasm32"))]
2246 oauth_flows: StdRwLock::new(oauth_flows),
2247 #[cfg(feature = "live")]
2248 live_unbound_rejection_authority: live_unbound_rejection_authority(),
2249 session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
2250 composition_signal_dispatcher: StdRwLock::new(None),
2251 }
2252 }
2253
2254 pub fn persistent_without_blobs(store: Arc<dyn RuntimeStore>) -> Self {
2260 #[cfg(not(target_arch = "wasm32"))]
2261 let (auth_lease, oauth_flows) = {
2262 let authorities = persistent_auth_authorities(&store);
2263 (
2264 Arc::clone(&authorities.auth_lease),
2265 Arc::clone(&authorities.oauth_flows),
2266 )
2267 };
2268 #[cfg(target_arch = "wasm32")]
2269 let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
2270 let auth_lease = generated_runtime_auth_lease_handle(auth_lease);
2271 Self {
2272 sessions: RwLock::new(HashMap::new()),
2273 store: Some(store),
2274 blob_store: Some(Arc::new(UnavailableBlobStore)),
2275 llm_reconfigure_host: StdRwLock::new(None),
2276 auth_lease: StdRwLock::new(auth_lease),
2277 #[cfg(not(target_arch = "wasm32"))]
2278 oauth_flows: StdRwLock::new(oauth_flows),
2279 #[cfg(feature = "live")]
2280 live_unbound_rejection_authority: live_unbound_rejection_authority(),
2281 session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
2282 composition_signal_dispatcher: StdRwLock::new(None),
2283 }
2284 }
2285
2286 pub fn auth_lease_handle(&self) -> Arc<dyn meerkat_core::handles::AuthLeaseHandle> {
2289 self.generated_auth_lease_handle().clone_handle()
2290 }
2291
2292 pub fn generated_auth_lease_handle(&self) -> meerkat_core::handles::GeneratedAuthLeaseHandle {
2295 self.auth_lease
2296 .read()
2297 .unwrap_or_else(std::sync::PoisonError::into_inner)
2298 .clone()
2299 }
2300
2301 pub fn set_auth_lease_handle(&self, handle: Arc<crate::handles::RuntimeAuthLeaseHandle>) {
2307 self.set_runtime_auth_lease_handle(handle);
2308 }
2309
2310 #[cfg(not(target_arch = "wasm32"))]
2316 pub fn set_auth_lease_handle_with_oauth_flow_authority(
2317 &self,
2318 handle: Arc<crate::handles::RuntimeAuthLeaseHandle>,
2319 oauth_flows: Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority>,
2320 ) {
2321 *self
2322 .oauth_flows
2323 .write()
2324 .unwrap_or_else(std::sync::PoisonError::into_inner) = oauth_flows;
2325 let handle = generated_runtime_auth_lease_handle(handle);
2326 *self
2327 .auth_lease
2328 .write()
2329 .unwrap_or_else(std::sync::PoisonError::into_inner) = handle;
2330 }
2331
2332 pub fn set_runtime_auth_lease_handle(
2335 &self,
2336 handle: Arc<crate::handles::RuntimeAuthLeaseHandle>,
2337 ) {
2338 #[cfg(not(target_arch = "wasm32"))]
2339 {
2340 *self
2341 .oauth_flows
2342 .write()
2343 .unwrap_or_else(std::sync::PoisonError::into_inner) =
2344 Arc::new(crate::handles::RuntimeOAuthFlowHandle::new_with_auth_lease(
2345 std::time::Duration::from_secs(10 * 60),
2346 Arc::clone(&handle),
2347 ));
2348 }
2349 let handle = generated_runtime_auth_lease_handle(handle);
2350 *self
2351 .auth_lease
2352 .write()
2353 .unwrap_or_else(std::sync::PoisonError::into_inner) = handle;
2354 }
2355
2356 #[cfg(not(target_arch = "wasm32"))]
2359 pub fn oauth_flow_authority(
2360 &self,
2361 ) -> Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority> {
2362 Arc::clone(
2363 &self
2364 .oauth_flows
2365 .read()
2366 .unwrap_or_else(std::sync::PoisonError::into_inner),
2367 )
2368 }
2369
2370 pub fn session_claim_handle(&self) -> Arc<dyn meerkat_core::handles::SessionClaimHandle> {
2375 Arc::clone(&self.session_claims) as Arc<dyn meerkat_core::handles::SessionClaimHandle>
2376 }
2377
2378 pub fn set_composition_signal_dispatcher(
2381 &self,
2382 dispatcher: composition::MeerkatCompositionSignalDispatcher,
2383 ) {
2384 let mut slot = self
2385 .composition_signal_dispatcher
2386 .write()
2387 .unwrap_or_else(std::sync::PoisonError::into_inner);
2388 *slot = Some(dispatcher);
2389 }
2390
2391 pub(crate) async fn apply_routed_meerkat_input(
2402 &self,
2403 session_id: &SessionId,
2404 input: dsl::MeerkatMachineInput,
2405 ) -> Result<(), dsl_authority::DslTransitionRefusal> {
2406 let _gate_guard = self
2407 .lock_current_session_mutation_gate(session_id)
2408 .await
2409 .ok_or_else(|| {
2410 dsl_authority::DslTransitionRefusal::other(
2411 "routed_session_not_registered",
2412 format!(
2413 "session `{session_id}` is not registered with this MeerkatMachine; \
2414 cannot deliver routed input"
2415 ),
2416 )
2417 })?;
2418 self.apply_routed_session_dsl_input(session_id, input, "RoutedMeerkatInput")
2419 .await
2420 .map(|_| ())
2421 }
2422
2423 #[cfg(test)]
2424 pub(crate) async fn debug_shared_ingress_authorities(
2425 &self,
2426 session_id: &SessionId,
2427 ) -> Option<(
2428 Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>,
2429 crate::driver::ephemeral::SharedIngressDslAuthority,
2430 )> {
2431 let sessions = self.sessions.read().await;
2432 let entry = sessions.get(session_id)?;
2433 let session_authority = Arc::clone(&entry.dsl_authority);
2434 let driver = entry.driver.lock().await;
2435 Some((session_authority, driver.shared_dsl_authority()))
2436 }
2437
2438 fn make_driver(
2440 &self,
2441 runtime_id: LogicalRuntimeId,
2442 dsl_authority: crate::driver::ephemeral::SharedIngressDslAuthority,
2443 initial_runtime_state: RuntimeState,
2444 ) -> DriverEntry {
2445 let control_projection = Arc::new(StdRwLock::new(
2446 crate::driver::ephemeral::RuntimeControlProjection {
2447 phase: initial_runtime_state,
2448 current_run_id: None,
2449 pre_run_phase: None,
2450 },
2451 ));
2452 match (&self.store, &self.blob_store) {
2453 (Some(store), Some(blob_store)) => {
2454 DriverEntry::Persistent(PersistentRuntimeDriver::new_with_control(
2455 runtime_id,
2456 store.clone(),
2457 blob_store.clone(),
2458 control_projection,
2459 dsl_authority,
2460 ))
2461 }
2462 _ => DriverEntry::Ephemeral(EphemeralRuntimeDriver::new_with_control_and_dsl(
2463 runtime_id,
2464 control_projection,
2465 dsl_authority,
2466 )),
2467 }
2468 }
2469
2470 async fn recover_or_create_ops_state(
2477 &self,
2478 session_id: &SessionId,
2479 runtime_id: &LogicalRuntimeId,
2480 ) -> Result<
2481 (
2482 Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
2483 meerkat_core::RuntimeEpochId,
2484 Arc<meerkat_core::EpochCursorState>,
2485 ),
2486 RuntimeDriverError,
2487 > {
2488 if let Some(ref store) = self.store {
2489 match store.load_ops_lifecycle(runtime_id).await {
2490 Ok(Some(snapshot)) => {
2491 let recovered_epoch = snapshot.epoch_id.clone();
2492 let recovered_ops_count = snapshot.completion_entries.len();
2493 let registry =
2494 match crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::from_recovered(
2495 snapshot,
2496 ) {
2497 Ok(registry) => registry,
2498 Err(err) => {
2499 tracing::error!(
2500 %session_id,
2501 %runtime_id,
2502 error = %err,
2503 "failed to recover ops lifecycle through generated authority"
2504 );
2505 return Err(RuntimeDriverError::Internal(format!(
2506 "failed to recover ops lifecycle through generated authority: {err}"
2507 )));
2508 }
2509 };
2510 let recovered_cursor_snapshot = registry.completion_cursor_snapshot();
2511 let recovered_cursors = meerkat_core::EpochCursorState::from_recovered(
2512 recovered_cursor_snapshot.agent_applied_cursor,
2513 recovered_cursor_snapshot.runtime_observed_seq,
2514 recovered_cursor_snapshot.runtime_last_injected_seq,
2515 );
2516 tracing::info!(
2517 %session_id,
2518 %runtime_id,
2519 epoch_id = %recovered_epoch,
2520 recovered_ops = recovered_ops_count,
2521 "ops lifecycle recovered from durable store (same epoch)"
2522 );
2523 return Ok((
2524 Arc::new(registry),
2525 recovered_epoch,
2526 Arc::new(recovered_cursors),
2527 ));
2528 }
2529 Ok(None) => {}
2530 Err(err) => {
2531 tracing::error!(
2532 %session_id,
2533 %runtime_id,
2534 error = %err,
2535 "failed to load ops lifecycle from durable store"
2536 );
2537 return Err(RuntimeDriverError::Internal(format!(
2538 "failed to load ops lifecycle from durable store: {err}"
2539 )));
2540 }
2541 }
2542 tracing::debug!(%session_id, "no persisted ops lifecycle; fresh epoch");
2543 Ok((
2544 Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
2545 meerkat_core::RuntimeEpochId::new(),
2546 Arc::new(meerkat_core::EpochCursorState::new()),
2547 ))
2548 } else {
2549 Ok((
2550 Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
2551 meerkat_core::RuntimeEpochId::new(),
2552 Arc::new(meerkat_core::EpochCursorState::new()),
2553 ))
2554 }
2555 }
2556
2557 fn fresh_ops_state() -> (
2558 Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
2559 meerkat_core::RuntimeEpochId,
2560 Arc<meerkat_core::EpochCursorState>,
2561 ) {
2562 let registry = Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new());
2563 let epoch = meerkat_core::RuntimeEpochId::new();
2564 let cursors = Arc::new(meerkat_core::EpochCursorState::new());
2565 (registry, epoch, cursors)
2566 }
2567
2568 #[allow(clippy::large_futures)]
2569 fn execute_meerkat_machine_command(
2570 &self,
2571 self_handle: Option<Arc<Self>>,
2572 command: MeerkatMachineCommand,
2573 ) -> MeerkatMachineCommandFuture<'_> {
2574 Box::pin(async move {
2575 match command {
2576 MeerkatMachineCommand::EnsureSessionWithExecutor { .. } => {
2577 let self_handle = self_handle.ok_or_else(|| {
2578 MeerkatMachineCommandError::Driver(RuntimeDriverError::Internal(
2579 "EnsureSessionWithExecutor requires Arc<Self> machine handle".into(),
2580 ))
2581 })?;
2582 self_handle
2583 .execute_meerkat_machine_ensure_session_command(command)
2584 .await
2585 .map_err(Into::into)
2586 }
2587 MeerkatMachineCommand::RegisterSession { .. }
2588 | MeerkatMachineCommand::UnregisterSession { .. }
2589 | MeerkatMachineCommand::SetSilentIntents { .. }
2590 | MeerkatMachineCommand::CancelAfterBoundary { .. }
2591 | MeerkatMachineCommand::StopRuntimeExecutor { .. }
2592 | MeerkatMachineCommand::CommitServiceTurnTerminalReceipt { .. }
2593 | MeerkatMachineCommand::ContainsSession { .. }
2594 | MeerkatMachineCommand::SessionHasExecutor { .. }
2595 | MeerkatMachineCommand::SessionHasComms { .. }
2596 | MeerkatMachineCommand::OpsLifecycleRegistry { .. }
2597 | MeerkatMachineCommand::PrepareBindings { .. }
2598 | MeerkatMachineCommand::PrepareLocalSessionBindings { .. }
2599 | MeerkatMachineCommand::InputState { .. }
2600 | MeerkatMachineCommand::ListActiveInputs { .. }
2601 | MeerkatMachineCommand::ReconfigureSessionLlmIdentity { .. }
2602 | MeerkatMachineCommand::StagePersistentFilter { .. }
2603 | MeerkatMachineCommand::RequestDeferredTools { .. }
2604 | MeerkatMachineCommand::PublishCommittedVisibleSet { .. } => self
2605 .execute_meerkat_machine_session_command(command)
2606 .await
2607 .map_err(Into::into),
2608 MeerkatMachineCommand::SetPeerIngressContext { .. }
2609 | MeerkatMachineCommand::NotifyDrainExited { .. } => {
2610 let self_handle = self_handle.ok_or_else(|| {
2611 MeerkatMachineCommandError::Driver(RuntimeDriverError::Internal(
2612 "drain command requires Arc<Self> machine handle".into(),
2613 ))
2614 })?;
2615 self_handle
2616 .execute_meerkat_machine_drain_command(command)
2617 .await
2618 .map_err(Into::into)
2619 }
2620 MeerkatMachineCommand::AbortAll
2621 | MeerkatMachineCommand::Abort { .. }
2622 | MeerkatMachineCommand::Wait { .. } => self
2623 .execute_meerkat_machine_drain_local_command(command)
2624 .await
2625 .map_err(Into::into),
2626 MeerkatMachineCommand::Ingest { .. }
2627 | MeerkatMachineCommand::PublishEvent { .. }
2628 | MeerkatMachineCommand::Retire { .. }
2629 | MeerkatMachineCommand::Recycle { .. }
2630 | MeerkatMachineCommand::Reset { .. }
2631 | MeerkatMachineCommand::Recover { .. }
2632 | MeerkatMachineCommand::Destroy { .. }
2633 | MeerkatMachineCommand::RuntimeState { .. }
2634 | MeerkatMachineCommand::ResolvedSessionLlmCapabilities { .. }
2635 | MeerkatMachineCommand::ConfigureModelRoutingBaseline { .. }
2636 | MeerkatMachineCommand::SessionModelRoutingStatus { .. }
2637 | MeerkatMachineCommand::RequestSwitchTurn { .. }
2638 | MeerkatMachineCommand::AdmitModelRoutingAssistantTurn { .. }
2639 | MeerkatMachineCommand::BeginImageOperation { .. }
2640 | MeerkatMachineCommand::DenyImageOperationPlan { .. }
2641 | MeerkatMachineCommand::ActivateImageOperationOverride { .. }
2642 | MeerkatMachineCommand::ClassifyImageOperationTerminal { .. }
2643 | MeerkatMachineCommand::CompleteImageOperation { .. }
2644 | MeerkatMachineCommand::RestoreImageOperationOverride { .. }
2645 | MeerkatMachineCommand::LoadBoundaryReceipt { .. } => self
2646 .execute_meerkat_machine_control_command(command)
2647 .await
2648 .map_err(Into::into),
2649 MeerkatMachineCommand::AcceptWithCompletion { .. }
2650 | MeerkatMachineCommand::AcceptWithoutWake { .. } => self
2651 .execute_meerkat_machine_ingress_command(command)
2652 .await
2653 .map_err(Into::into),
2654 }
2655 })
2656 }
2657
2658 pub async fn register_session(
2665 &self,
2666 session_id: SessionId,
2667 ) -> Result<(), RuntimeControlPlaneError> {
2668 match self
2669 .execute_meerkat_machine_command(
2670 None,
2671 MeerkatMachineCommand::RegisterSession { session_id },
2672 )
2673 .await
2674 .map_err(MeerkatMachine::control_plane_error_from_command_error)?
2675 {
2676 MeerkatMachineCommandResult::Unit => Ok(()),
2677 other => Err(RuntimeControlPlaneError::Internal(format!(
2678 "register_session: unexpected command result variant: {other:?}"
2679 ))),
2680 }
2681 }
2682}
2683
2684#[cfg(test)]
2685#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
2686#[path = "../meerkat_machine_tests.rs"]
2687mod tests;