1use std::collections::{BTreeSet, HashMap, HashSet};
20use std::future::Future;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::sync::RwLock as StdRwLock;
24#[cfg(not(target_arch = "wasm32"))]
25use std::sync::{Mutex as StdMutex, OnceLock, Weak};
26
27use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
28use meerkat_core::lifecycle::{InputId, RunId};
29use meerkat_core::types::SessionId;
30use meerkat_core::{BlobId, BlobPayload, BlobRef, BlobStore, BlobStoreError};
31use meerkat_core::{
32 DeferredToolLoadAuthority, SessionToolVisibilityState, ToolFilter, ToolScopeApplyError,
33 ToolScopeRevision, ToolScopeStageError, ToolVisibilityOwner, ToolVisibilityWitness,
34};
35
36use crate::accept::AcceptOutcome;
37use crate::driver::ephemeral::EphemeralRuntimeDriver;
38use crate::driver::persistent::PersistentRuntimeDriver;
39use crate::identifiers::LogicalRuntimeId;
40use crate::input::Input;
41use crate::input_state::InputLifecycleState;
42use crate::meerkat_machine_types::{
43 HydratedSessionLlmState, MeerkatAdmittedInputSnapshot, MeerkatBindingSnapshot,
44 MeerkatCompletionWaiterSnapshot, MeerkatCompletionWaitersSnapshot, MeerkatControlSnapshot,
45 MeerkatCursorSnapshot, MeerkatDrainSnapshot, MeerkatDriverKind, MeerkatFormalStateProjection,
46 MeerkatInputsSnapshot, MeerkatLedgerSnapshot, MeerkatMachineCommand,
47 MeerkatMachineCommandError, MeerkatMachineCommandResult, MeerkatMachineRunFailure,
48 MeerkatMachineRunPrepared, MeerkatMachineSpineSnapshot, MeerkatOpsSnapshot,
49 SessionLlmCapabilityDelta, SessionLlmCapabilitySurface, SessionLlmCapabilitySurfaceStatus,
50 SessionLlmReconfigureHost, SessionLlmReconfigureReport, SessionLlmReconfigureRequest,
51 SessionToolVisibilityDelta,
52};
53use crate::runtime_state::RuntimeState;
54use crate::service_ext::{RuntimeMode, SessionServiceRuntimeExt};
55use crate::store::RuntimeStore;
56use crate::tokio;
57use crate::tokio::sync::{Mutex, RwLock, mpsc};
58#[cfg(test)]
59use crate::traits::RuntimeDriver;
60use crate::traits::{
61 DestroyReport, RecoveryReport, RecycleReport, ResetReport, RetireReport,
62 RuntimeControlPlaneError, RuntimeDriverError,
63};
64
65#[derive(Debug, thiserror::Error)]
67pub enum RuntimeBindingsError {
68 #[error("session {0} not found in runtime adapter after registration")]
70 SessionNotFound(SessionId),
71 #[error("failed to prepare runtime bindings for session {0}: {1}")]
73 PrepareFailed(SessionId, String),
74}
75
76#[derive(Debug, Default)]
77struct UnavailableBlobStore;
78
79impl UnavailableBlobStore {
80 fn error() -> BlobStoreError {
81 BlobStoreError::Unsupported(
82 "persistent runtime constructed without blob store; blob-backed inputs require a BlobStore"
83 .to_string(),
84 )
85 }
86}
87
88#[cfg(not(target_arch = "wasm32"))]
89struct PersistentAuthAuthorityBundle {
90 store: StdMutex<Weak<dyn RuntimeStore>>,
91 auth_lease: Arc<crate::handles::RuntimeAuthLeaseHandle>,
92 oauth_flows: Arc<crate::handles::RuntimeOAuthFlowHandle>,
93}
94
95#[cfg(not(target_arch = "wasm32"))]
96#[derive(Debug, Clone, PartialEq, Eq, Hash)]
97enum PersistentAuthAuthorityKey {
98 Durable(String),
99 Process(usize),
100}
101
102#[cfg(not(target_arch = "wasm32"))]
103static PERSISTENT_AUTH_AUTHORITIES: OnceLock<
104 StdMutex<HashMap<PersistentAuthAuthorityKey, Arc<PersistentAuthAuthorityBundle>>>,
105> = OnceLock::new();
106
107#[cfg(not(target_arch = "wasm32"))]
108fn runtime_store_identity(store: &Arc<dyn RuntimeStore>) -> PersistentAuthAuthorityKey {
109 store
110 .auth_authority_key()
111 .map(PersistentAuthAuthorityKey::Durable)
112 .unwrap_or_else(|| {
113 PersistentAuthAuthorityKey::Process(Arc::as_ptr(store).cast::<()>() as usize)
114 })
115}
116
117fn runtime_stores_share_authority(a: &Arc<dyn RuntimeStore>, b: &Arc<dyn RuntimeStore>) -> bool {
118 match (a.auth_authority_key(), b.auth_authority_key()) {
119 (Some(a), Some(b)) => a == b,
120 _ => Arc::ptr_eq(a, b),
121 }
122}
123
124#[cfg(not(target_arch = "wasm32"))]
125fn persistent_auth_authorities(
126 store: &Arc<dyn RuntimeStore>,
127) -> Arc<PersistentAuthAuthorityBundle> {
128 let key = runtime_store_identity(store);
129 let authorities = PERSISTENT_AUTH_AUTHORITIES.get_or_init(|| StdMutex::new(HashMap::new()));
130 let mut authorities = authorities
131 .lock()
132 .unwrap_or_else(std::sync::PoisonError::into_inner);
133 if let Some(existing) = authorities.get(&key) {
134 let stored_store_alive = existing
135 .store
136 .lock()
137 .unwrap_or_else(std::sync::PoisonError::into_inner)
138 .upgrade()
139 .is_some();
140 if matches!(key, PersistentAuthAuthorityKey::Durable(_)) || stored_store_alive {
141 existing.oauth_flows.bind_persistent_store(store);
142 *existing
143 .store
144 .lock()
145 .unwrap_or_else(std::sync::PoisonError::into_inner) = Arc::downgrade(store);
146 return Arc::clone(existing);
147 }
148 }
149 let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
150 let oauth_flows = Arc::new(
151 crate::handles::RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
152 std::time::Duration::from_secs(10 * 60),
153 Arc::clone(&auth_lease),
154 store,
155 ),
156 );
157 let bundle = Arc::new(PersistentAuthAuthorityBundle {
158 store: StdMutex::new(Arc::downgrade(store)),
159 auth_lease,
160 oauth_flows,
161 });
162 authorities.insert(key, Arc::clone(&bundle));
163 bundle
164}
165
166#[cfg(all(test, not(target_arch = "wasm32")))]
167pub(crate) fn clear_persistent_auth_authorities_for_test() {
168 if let Some(authorities) = PERSISTENT_AUTH_AUTHORITIES.get() {
169 authorities
170 .lock()
171 .unwrap_or_else(std::sync::PoisonError::into_inner)
172 .clear();
173 }
174}
175
176#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
177#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
178impl BlobStore for UnavailableBlobStore {
179 async fn put_image(&self, _media_type: &str, _data: &str) -> Result<BlobRef, BlobStoreError> {
180 Err(Self::error())
181 }
182
183 async fn get(&self, _blob_id: &BlobId) -> Result<BlobPayload, BlobStoreError> {
184 Err(Self::error())
185 }
186
187 async fn delete(&self, _blob_id: &BlobId) -> Result<(), BlobStoreError> {
188 Err(Self::error())
189 }
190
191 async fn exists(&self, _blob_id: &BlobId) -> Result<bool, BlobStoreError> {
192 Err(Self::error())
193 }
194
195 fn is_persistent(&self) -> bool {
196 false
197 }
198}
199
200#[cfg(not(target_arch = "wasm32"))]
201struct UnavailableOAuthFlowAuthority;
202
203#[cfg(not(target_arch = "wasm32"))]
204impl UnavailableOAuthFlowAuthority {
205 fn rejected(operation: &'static str) -> meerkat_auth_core::oauth_flow::OAuthFlowError {
206 meerkat_auth_core::oauth_flow::OAuthFlowError::LifecycleRejected {
207 operation,
208 detail: "custom auth lease handle does not provide a runtime OAuth flow authority"
209 .to_string(),
210 }
211 }
212}
213
214#[cfg(not(target_arch = "wasm32"))]
215impl meerkat_auth_core::oauth_flow::OAuthFlowAuthority for UnavailableOAuthFlowAuthority {
216 fn start(
217 &self,
218 _target: meerkat_core::AuthBindingRef,
219 _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
220 _redirect_uri: String,
221 _pkce_verifier: String,
222 ) -> Result<String, meerkat_auth_core::oauth_flow::OAuthFlowError> {
223 Err(Self::rejected("admit_oauth_browser_flow"))
224 }
225
226 fn verify(
227 &self,
228 _state: &str,
229 _target: &meerkat_core::AuthBindingRef,
230 _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
231 _redirect_uri: &str,
232 ) -> Result<
233 meerkat_auth_core::oauth_flow::OAuthFlowRecord,
234 meerkat_auth_core::oauth_flow::OAuthFlowError,
235 > {
236 Err(Self::rejected("verify_oauth_browser_flow"))
237 }
238
239 fn consume(
240 &self,
241 _state: &str,
242 _target: &meerkat_core::AuthBindingRef,
243 _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
244 _redirect_uri: &str,
245 ) -> Result<
246 meerkat_auth_core::oauth_flow::OAuthFlowRecord,
247 meerkat_auth_core::oauth_flow::OAuthFlowError,
248 > {
249 Err(Self::rejected("consume_oauth_browser_flow"))
250 }
251
252 fn admit_device_code(
253 &self,
254 _target: meerkat_core::AuthBindingRef,
255 _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
256 _device_code: String,
257 _expires_in: std::time::Duration,
258 ) -> Result<(), meerkat_auth_core::oauth_flow::OAuthFlowError> {
259 Err(Self::rejected("admit_oauth_device_flow"))
260 }
261
262 fn verify_device_code(
263 &self,
264 _device_code: &str,
265 _target: &meerkat_core::AuthBindingRef,
266 _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
267 ) -> Result<
268 meerkat_auth_core::oauth_flow::OAuthDeviceFlowRecord,
269 meerkat_auth_core::oauth_flow::OAuthFlowError,
270 > {
271 Err(Self::rejected("verify_oauth_device_flow"))
272 }
273
274 fn begin_device_code_poll(
275 &self,
276 _device_code: &str,
277 _target: &meerkat_core::AuthBindingRef,
278 _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
279 ) -> Result<
280 meerkat_auth_core::oauth_flow::OAuthDevicePollLease,
281 meerkat_auth_core::oauth_flow::OAuthFlowError,
282 > {
283 Err(Self::rejected("begin_oauth_device_poll"))
284 }
285}
286
287#[cfg(not(target_arch = "wasm32"))]
288type MeerkatMachineCommandFuture<'a> = Pin<
289 Box<
290 dyn Future<Output = Result<MeerkatMachineCommandResult, MeerkatMachineCommandError>>
291 + Send
292 + 'a,
293 >,
294>;
295
296#[cfg(target_arch = "wasm32")]
297type MeerkatMachineCommandFuture<'a> = Pin<
298 Box<dyn Future<Output = Result<MeerkatMachineCommandResult, MeerkatMachineCommandError>> + 'a>,
299>;
300
301pub(crate) use driver::{
302 DriverEntry, SharedCompletionRegistry, SharedDriver, cancel_runtime_loop_run,
303 commit_runtime_loop_run, fail_machine_run, fail_runtime_loop_run,
304 machine_apply_run_return_projection, machine_batch_primitive_projections,
305 machine_batch_runtime_semantics, machine_begin_run, machine_commit_prepared_destroy,
306 machine_commit_service_turn_terminal_receipt, machine_executor_attach_projection,
307 machine_input_boundary, machine_prepare_bindings_projection, machine_prepare_destroy,
308 machine_recover_ephemeral_driver, machine_recover_persistent_driver,
309 machine_recycle_preserving_work, machine_reset, machine_retire,
310 machine_select_runtime_loop_batch, machine_stop_runtime, machine_unregister_session_projection,
311 prepare_runtime_loop_batch_start, rollback_runtime_loop_run_after_boundary_commit_failure,
312};
313
314pub(crate) mod driver;
315
316mod comms_drain;
317pub mod composition;
318mod dispatch_control;
319mod dispatch_drain;
320mod dispatch_ingress;
321mod dispatch_session;
322#[allow(unused_variables, dead_code, clippy::cmp_owned)]
323#[allow(clippy::assign_op_pattern)]
324pub mod dsl;
325pub(crate) mod dsl_authority;
326mod dsl_effects;
327mod llm_reconfigure;
328mod runtime_control;
329mod session_management;
330mod traits;
331mod visibility;
332
333pub use composition::{MeerkatCompositionSignalDispatcher, MeerkatConsumerSurface};
334
335pub use comms_drain::{
336 CommsDrainMode, CommsDrainPhase, DrainExitReason, PeerEndpointStageError, PeerIngressOwner,
337 SupervisorBinding, SupervisorBindingStageError,
338};
339pub(crate) use comms_drain::{CommsDrainSlot, abort_slot};
340pub(crate) use dsl_effects::{DslTransitionEffects, apply_dsl_transition_on_authority};
341pub(crate) use visibility::MachineToolVisibilityOwner;
342
343struct StagedSessionDslInput {
344 previous_state: Box<dsl::MeerkatMachineState>,
345 effects: DslTransitionEffects,
346}
347
348#[derive(Clone, Copy)]
349enum CommittedEffectDispatchFailure {
350 PreserveCommittedDslState,
351 RestorePreviousDslState,
352}
353
354struct RuntimeSessionEntry {
356 runtime_id: LogicalRuntimeId,
358 mutation_gate: Arc<Mutex<()>>,
370 driver: SharedDriver,
372 control_projection: Arc<StdRwLock<crate::driver::ephemeral::RuntimeControlProjection>>,
378 ops_lifecycle: Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
380 epoch_id: meerkat_core::RuntimeEpochId,
382 cursor_state: Arc<meerkat_core::EpochCursorState>,
384 completions: SharedCompletionRegistry,
386 tool_visibility_owner: Arc<MachineToolVisibilityOwner>,
388 current_llm_identity: Option<meerkat_core::SessionLlmIdentity>,
390 current_capability_surface: Option<SessionLlmCapabilitySurface>,
392 capability_surface_status: SessionLlmCapabilitySurfaceStatus,
394 phase: RegistrationPhase,
397 provisional_interrupt_handle:
400 Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
401 dsl_authority: Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>,
411 drain_slot: CommsDrainSlot,
421}
422
423struct RuntimeLoopAttachment {
428 wake_tx: mpsc::Sender<()>,
429 effect_tx: mpsc::Sender<crate::effect::RuntimeEffect>,
430 boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
431 interrupt_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
432 _loop_handle: tokio::task::JoinHandle<()>,
433}
434
435enum RegistrationPhase {
441 Queuing,
444 Attaching,
448 Active(RuntimeLoopAttachment),
451}
452
453impl RuntimeSessionEntry {
454 fn control_snapshot(&self) -> crate::driver::ephemeral::RuntimeControlProjection {
455 self.control_projection
456 .read()
457 .map(|guard| guard.clone())
458 .unwrap_or_else(|poisoned| {
459 tracing::error!("runtime control projection lock poisoned");
460 poisoned.into_inner().clone()
461 })
462 }
463
464 fn attachment_is_live(&self) -> bool {
465 match &self.phase {
466 RegistrationPhase::Active(attachment) => {
467 !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed()
468 }
469 RegistrationPhase::Queuing | RegistrationPhase::Attaching => false,
470 }
471 }
472
473 fn has_attachment_or_attaching(&self) -> bool {
478 matches!(self.phase, RegistrationPhase::Attaching) || self.attachment_is_live()
479 }
480
481 fn has_live_attachment(&self) -> bool {
485 self.attachment_is_live()
486 }
487
488 fn attach_runtime_loop(
489 &mut self,
490 wake_tx: mpsc::Sender<()>,
491 effect_tx: mpsc::Sender<crate::effect::RuntimeEffect>,
492 boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
493 interrupt_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
494 loop_handle: tokio::task::JoinHandle<()>,
495 ) {
496 self.provisional_interrupt_handle = None;
497 self.phase = RegistrationPhase::Active(RuntimeLoopAttachment {
498 wake_tx,
499 effect_tx,
500 boundary_handle,
501 interrupt_handle,
502 _loop_handle: loop_handle,
503 });
504 }
505
506 fn clear_dead_attachment(&mut self) -> bool {
507 if matches!(self.phase, RegistrationPhase::Active(_)) && !self.attachment_is_live() {
508 self.phase = RegistrationPhase::Queuing;
511 return true;
512 }
513 false
514 }
515
516 fn wake_sender(&self) -> Option<mpsc::Sender<()>> {
517 match &self.phase {
518 RegistrationPhase::Active(attachment)
519 if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
520 {
521 Some(attachment.wake_tx.clone())
522 }
523 _ => None,
524 }
525 }
526
527 fn effect_sender(&self) -> Option<mpsc::Sender<crate::effect::RuntimeEffect>> {
528 match &self.phase {
529 RegistrationPhase::Active(attachment)
530 if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
531 {
532 Some(attachment.effect_tx.clone())
533 }
534 _ => None,
535 }
536 }
537
538 fn boundary_handle(
539 &self,
540 ) -> Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>> {
541 match &self.phase {
542 RegistrationPhase::Active(attachment)
543 if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
544 {
545 attachment.boundary_handle.clone()
546 }
547 _ => None,
548 }
549 }
550
551 fn interrupt_handle(
552 &self,
553 ) -> Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>> {
554 match &self.phase {
555 RegistrationPhase::Active(attachment)
556 if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
557 {
558 attachment.interrupt_handle.clone()
559 }
560 _ => self.provisional_interrupt_handle.clone(),
561 }
562 }
563
564 fn install_provisional_interrupt_handle(
565 &mut self,
566 handle: Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>,
567 ) {
568 if !self.attachment_is_live() {
569 self.provisional_interrupt_handle = Some(handle);
570 }
571 }
572}
573
574impl MeerkatMachine {
575 async fn session_mutation_gate(&self, session_id: &SessionId) -> Option<Arc<Mutex<()>>> {
581 let sessions = self.sessions.read().await;
582 sessions
583 .get(session_id)
584 .map(|entry| Arc::clone(&entry.mutation_gate))
585 }
586
587 async fn session_dsl_authority(
588 &self,
589 session_id: &SessionId,
590 ) -> Result<Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>, String> {
591 let sessions = self.sessions.read().await;
592 sessions
593 .get(session_id)
594 .map(|entry| Arc::clone(&entry.dsl_authority))
595 .ok_or_else(|| {
596 RuntimeDriverError::NotReady {
597 state: RuntimeState::Destroyed,
598 }
599 .to_string()
600 })
601 }
602
603 fn preview_dsl_input_on_state(
604 state: &dsl::MeerkatMachineState,
605 input: dsl::MeerkatMachineInput,
606 context: &str,
607 ) -> Result<Vec<dsl::MeerkatMachineEffect>, String> {
608 let mut preview = dsl::MeerkatMachineAuthority::from_state(state.clone());
609 dsl::MeerkatMachineMutator::apply(&mut preview, input)
610 .map(|transition| transition.effects)
611 .map_err(|err| dsl_authority::map_error(err, context))
612 }
613
614 async fn preview_session_dsl_input(
615 &self,
616 session_id: &SessionId,
617 input: dsl::MeerkatMachineInput,
618 context: &str,
619 ) -> Result<Vec<dsl::MeerkatMachineEffect>, String> {
620 let authority = self.session_dsl_authority(session_id).await?;
621 let state = {
622 let authority = authority
623 .lock()
624 .unwrap_or_else(std::sync::PoisonError::into_inner);
625 authority.state.clone()
626 };
627 Self::preview_dsl_input_on_state(&state, input, context)
628 }
629
630 async fn session_dsl_state(
631 &self,
632 session_id: &SessionId,
633 ) -> Result<dsl::MeerkatMachineState, RuntimeControlPlaneError> {
634 let authority = self
635 .session_dsl_authority(session_id)
636 .await
637 .map_err(RuntimeControlPlaneError::Internal)?;
638 let authority = authority
639 .lock()
640 .unwrap_or_else(std::sync::PoisonError::into_inner);
641 Ok(authority.state.clone())
642 }
643
644 async fn commit_session_dsl_transition(
645 &self,
646 session_id: &SessionId,
647 staged: StagedSessionDslInput,
648 context: &str,
649 ) -> Result<(), String> {
650 self.commit_session_dsl_transition_with_dispatch_failure(
651 session_id,
652 staged,
653 context,
654 CommittedEffectDispatchFailure::RestorePreviousDslState,
655 )
656 .await
657 }
658
659 async fn commit_session_dsl_transition_preserving_committed_state(
660 &self,
661 session_id: &SessionId,
662 staged: StagedSessionDslInput,
663 context: &str,
664 ) -> Result<(), String> {
665 self.commit_session_dsl_transition_with_dispatch_failure(
666 session_id,
667 staged,
668 context,
669 CommittedEffectDispatchFailure::PreserveCommittedDslState,
670 )
671 .await
672 }
673
674 async fn commit_session_dsl_transition_with_dispatch_failure(
675 &self,
676 session_id: &SessionId,
677 staged: StagedSessionDslInput,
678 context: &str,
679 dispatch_failure: CommittedEffectDispatchFailure,
680 ) -> Result<(), String> {
681 if let Err(error) = self
682 .dispatch_routed_signals_from_effects(&staged.effects)
683 .await
684 {
685 match dispatch_failure {
686 CommittedEffectDispatchFailure::PreserveCommittedDslState => {}
687 CommittedEffectDispatchFailure::RestorePreviousDslState => {
688 self.restore_session_dsl_state(session_id, staged.previous_state)
689 .await;
690 }
691 }
692 return Err(format!(
693 "DSL authority ({context}): committed effect dispatch failed: {error}"
694 ));
695 }
696 Ok(())
697 }
698
699 async fn dispatch_routed_signals_from_effects(
700 &self,
701 effects: &[dsl::MeerkatMachineEffect],
702 ) -> Result<(), String> {
703 let dispatcher = {
704 self.composition_signal_dispatcher
705 .read()
706 .unwrap_or_else(std::sync::PoisonError::into_inner)
707 .clone()
708 };
709 let Some(dispatcher) = dispatcher else {
710 return Ok(());
711 };
712
713 for effect in effects {
714 if let Some(signal) = composition::lift_routed_signal(effect) {
715 composition::dispatch_routed_signal(&dispatcher, signal).await?;
716 }
717 }
718 Ok(())
719 }
720
721 async fn clear_dead_runtime_attachment(&self, session_id: &SessionId) {
722 let mut sessions = self.sessions.write().await;
723 if let Some(entry) = sessions.get_mut(session_id) {
724 entry.clear_dead_attachment();
725 }
726 }
727
728 async fn dispatch_cancel_after_boundary_runtime_effect(
729 &self,
730 session_id: &SessionId,
731 effect_tx: Option<mpsc::Sender<crate::effect::RuntimeEffect>>,
732 boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
733 projected_effect: crate::effect::ProjectedRuntimeEffect,
734 context: &str,
735 ) -> Result<(), RuntimeDriverError> {
736 let Some(effect_tx) = effect_tx else {
737 let state = self
738 .existing_session_runtime_state(session_id)
739 .await
740 .unwrap_or(RuntimeState::Destroyed);
741 return Err(RuntimeDriverError::NotReady { state });
742 };
743
744 let reason = projected_effect.reason().to_string();
745 if let Some(boundary_handle) = boundary_handle {
746 boundary_handle
747 .cancel_after_boundary(reason)
748 .await
749 .map_err(|err| {
750 RuntimeDriverError::Internal(format!(
751 "{context}: failed to apply live boundary cancel: {err}"
752 ))
753 })?;
754 }
755
756 match effect_tx.send(projected_effect.into_effect()).await {
757 Ok(()) => Ok(()),
758 Err(_) => {
759 self.clear_dead_runtime_attachment(session_id).await;
760 Err(RuntimeDriverError::NotReady {
761 state: RuntimeState::Idle,
762 })
763 }
764 }
765 }
766
767 async fn restore_session_dsl_state(
768 &self,
769 session_id: &SessionId,
770 state: Box<dsl::MeerkatMachineState>,
771 ) {
772 if let Ok(authority) = self.session_dsl_authority(session_id).await {
773 Self::restore_dsl_authority_state(&authority, state);
774 }
775 }
776
777 fn restore_dsl_authority_state(
778 authority: &crate::driver::ephemeral::SharedIngressDslAuthority,
779 state: Box<dsl::MeerkatMachineState>,
780 ) {
781 let mut authority = authority
782 .lock()
783 .unwrap_or_else(std::sync::PoisonError::into_inner);
784 *authority = dsl::MeerkatMachineAuthority::from_state(*state);
785 }
786}
787
788#[derive(Debug, Clone, Copy)]
791pub struct MachineSessionControlAuthority {
792 _private: (),
793}
794
795pub struct MeerkatMachine {
802 sessions: RwLock<HashMap<SessionId, RuntimeSessionEntry>>,
804 mode: RuntimeMode,
806 store: Option<Arc<dyn RuntimeStore>>,
808 blob_store: Option<Arc<dyn BlobStore>>,
810 llm_reconfigure_host: StdRwLock<Option<Arc<dyn SessionLlmReconfigureHost>>>,
812 auth_lease: StdRwLock<Arc<dyn meerkat_core::handles::AuthLeaseHandle>>,
815 #[cfg(not(target_arch = "wasm32"))]
818 oauth_flows: StdRwLock<Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority>>,
819 session_claims: Arc<crate::handles::RuntimeSessionClaimRegistry>,
826 composition_signal_dispatcher:
830 StdRwLock<Option<composition::MeerkatCompositionSignalDispatcher>>,
831}
832
833impl MeerkatMachine {
834 #[must_use]
837 pub fn session_control_authority(&self) -> MachineSessionControlAuthority {
838 MachineSessionControlAuthority { _private: () }
839 }
840
841 #[must_use]
846 pub fn shares_runtime_persistence_with(&self, other: &Self) -> bool {
847 match (&self.store, &other.store) {
848 (None, None) => true,
849 (Some(a), Some(b)) => runtime_stores_share_authority(a, b),
850 _ => false,
851 }
852 }
853
854 #[must_use]
857 pub fn shares_runtime_store_authority(&self, store: &Arc<dyn RuntimeStore>) -> bool {
858 self.store
859 .as_ref()
860 .is_some_and(|machine_store| runtime_stores_share_authority(machine_store, store))
861 }
862
863 #[must_use]
865 pub fn has_runtime_persistence(&self) -> bool {
866 self.store.is_some()
867 }
868
869 fn normalize_destroyed_error(err: RuntimeDriverError) -> RuntimeDriverError {
870 match err {
871 RuntimeDriverError::NotReady {
872 state: RuntimeState::Destroyed,
873 } => RuntimeDriverError::Destroyed,
874 other => other,
875 }
876 }
877
878 pub fn ephemeral() -> Self {
880 let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
881 #[cfg(not(target_arch = "wasm32"))]
882 let oauth_flows = Arc::new(crate::handles::RuntimeOAuthFlowHandle::new_with_auth_lease(
883 std::time::Duration::from_secs(10 * 60),
884 Arc::clone(&auth_lease),
885 ));
886 let auth_lease: Arc<dyn meerkat_core::handles::AuthLeaseHandle> = auth_lease;
887 Self {
888 sessions: RwLock::new(HashMap::new()),
889 mode: RuntimeMode::V9Compliant,
890 store: None,
891 blob_store: None,
892 llm_reconfigure_host: StdRwLock::new(None),
893 auth_lease: StdRwLock::new(auth_lease),
894 #[cfg(not(target_arch = "wasm32"))]
895 oauth_flows: StdRwLock::new(oauth_flows),
896 session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
897 composition_signal_dispatcher: StdRwLock::new(None),
898 }
899 }
900
901 pub fn persistent(store: Arc<dyn RuntimeStore>, blob_store: Arc<dyn BlobStore>) -> Self {
903 #[cfg(not(target_arch = "wasm32"))]
904 let (auth_lease, oauth_flows) = {
905 let authorities = persistent_auth_authorities(&store);
906 (
907 Arc::clone(&authorities.auth_lease),
908 Arc::clone(&authorities.oauth_flows),
909 )
910 };
911 #[cfg(target_arch = "wasm32")]
912 let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
913 let auth_lease: Arc<dyn meerkat_core::handles::AuthLeaseHandle> = auth_lease;
914 Self {
915 sessions: RwLock::new(HashMap::new()),
916 mode: RuntimeMode::V9Compliant,
917 store: Some(store),
918 blob_store: Some(blob_store),
919 llm_reconfigure_host: StdRwLock::new(None),
920 auth_lease: StdRwLock::new(auth_lease),
921 #[cfg(not(target_arch = "wasm32"))]
922 oauth_flows: StdRwLock::new(oauth_flows),
923 session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
924 composition_signal_dispatcher: StdRwLock::new(None),
925 }
926 }
927
928 pub fn persistent_without_blobs(store: Arc<dyn RuntimeStore>) -> Self {
934 #[cfg(not(target_arch = "wasm32"))]
935 let (auth_lease, oauth_flows) = {
936 let authorities = persistent_auth_authorities(&store);
937 (
938 Arc::clone(&authorities.auth_lease),
939 Arc::clone(&authorities.oauth_flows),
940 )
941 };
942 #[cfg(target_arch = "wasm32")]
943 let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
944 let auth_lease: Arc<dyn meerkat_core::handles::AuthLeaseHandle> = auth_lease;
945 Self {
946 sessions: RwLock::new(HashMap::new()),
947 mode: RuntimeMode::V9Compliant,
948 store: Some(store),
949 blob_store: Some(Arc::new(UnavailableBlobStore)),
950 llm_reconfigure_host: StdRwLock::new(None),
951 auth_lease: StdRwLock::new(auth_lease),
952 #[cfg(not(target_arch = "wasm32"))]
953 oauth_flows: StdRwLock::new(oauth_flows),
954 session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
955 composition_signal_dispatcher: StdRwLock::new(None),
956 }
957 }
958
959 pub fn auth_lease_handle(&self) -> Arc<dyn meerkat_core::handles::AuthLeaseHandle> {
962 Arc::clone(
963 &self
964 .auth_lease
965 .read()
966 .unwrap_or_else(std::sync::PoisonError::into_inner),
967 )
968 }
969
970 pub fn set_auth_lease_handle(&self, handle: Arc<dyn meerkat_core::handles::AuthLeaseHandle>) {
976 #[cfg(not(target_arch = "wasm32"))]
977 let handle = {
978 let erased: Arc<dyn std::any::Any + Send + Sync> = handle.clone();
979 if let Ok(runtime_handle) =
980 Arc::downcast::<crate::handles::RuntimeAuthLeaseHandle>(erased)
981 {
982 self.set_runtime_auth_lease_handle(runtime_handle);
983 return;
984 }
985 *self
986 .oauth_flows
987 .write()
988 .unwrap_or_else(std::sync::PoisonError::into_inner) =
989 Arc::new(UnavailableOAuthFlowAuthority);
990 handle
991 };
992 *self
993 .auth_lease
994 .write()
995 .unwrap_or_else(std::sync::PoisonError::into_inner) = handle;
996 }
997
998 #[cfg(not(target_arch = "wasm32"))]
1005 pub fn set_auth_lease_handle_with_oauth_flow_authority(
1006 &self,
1007 handle: Arc<dyn meerkat_core::handles::AuthLeaseHandle>,
1008 oauth_flows: Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority>,
1009 ) {
1010 *self
1011 .oauth_flows
1012 .write()
1013 .unwrap_or_else(std::sync::PoisonError::into_inner) = oauth_flows;
1014 *self
1015 .auth_lease
1016 .write()
1017 .unwrap_or_else(std::sync::PoisonError::into_inner) = handle;
1018 }
1019
1020 pub fn set_runtime_auth_lease_handle(
1023 &self,
1024 handle: Arc<crate::handles::RuntimeAuthLeaseHandle>,
1025 ) {
1026 #[cfg(not(target_arch = "wasm32"))]
1027 {
1028 *self
1029 .oauth_flows
1030 .write()
1031 .unwrap_or_else(std::sync::PoisonError::into_inner) =
1032 Arc::new(crate::handles::RuntimeOAuthFlowHandle::new_with_auth_lease(
1033 std::time::Duration::from_secs(10 * 60),
1034 Arc::clone(&handle),
1035 ));
1036 }
1037 let handle: Arc<dyn meerkat_core::handles::AuthLeaseHandle> = handle;
1038 *self
1039 .auth_lease
1040 .write()
1041 .unwrap_or_else(std::sync::PoisonError::into_inner) = handle;
1042 }
1043
1044 #[cfg(not(target_arch = "wasm32"))]
1047 pub fn oauth_flow_authority(
1048 &self,
1049 ) -> Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority> {
1050 Arc::clone(
1051 &self
1052 .oauth_flows
1053 .read()
1054 .unwrap_or_else(std::sync::PoisonError::into_inner),
1055 )
1056 }
1057
1058 pub fn session_claim_handle(&self) -> Arc<dyn meerkat_core::handles::SessionClaimHandle> {
1063 Arc::clone(&self.session_claims) as Arc<dyn meerkat_core::handles::SessionClaimHandle>
1064 }
1065
1066 pub fn set_composition_signal_dispatcher(
1069 &self,
1070 dispatcher: composition::MeerkatCompositionSignalDispatcher,
1071 ) {
1072 let mut slot = self
1073 .composition_signal_dispatcher
1074 .write()
1075 .unwrap_or_else(std::sync::PoisonError::into_inner);
1076 *slot = Some(dispatcher);
1077 }
1078
1079 pub async fn apply_routed_meerkat_input(
1090 &self,
1091 session_id: &SessionId,
1092 input: dsl::MeerkatMachineInput,
1093 ) -> Result<(), String> {
1094 Self::reject_raw_fieldless_runtime_internal_dsl_input(&input)?;
1095 let sessions = self.sessions.read().await;
1096 let entry = sessions.get(session_id).ok_or_else(|| {
1097 format!(
1098 "session `{session_id}` is not registered with this MeerkatMachine; \
1099 cannot deliver routed input"
1100 )
1101 })?;
1102 let authority = Arc::clone(&entry.dsl_authority);
1103 drop(sessions);
1104
1105 let (previous_state, effects) = {
1106 let mut guard = authority
1107 .lock()
1108 .unwrap_or_else(std::sync::PoisonError::into_inner);
1109 let previous_state = Box::new(guard.state.clone());
1110 let effects = dsl::MeerkatMachineMutator::apply(&mut *guard, input)
1111 .map(|transition| transition.effects)
1112 .map_err(|err| format!("{err}"))?;
1113 (previous_state, effects)
1114 };
1115 if let Err(error) = self.dispatch_routed_signals_from_effects(&effects).await {
1116 self.restore_session_dsl_state(session_id, previous_state)
1117 .await;
1118 return Err(format!(
1119 "routed MeerkatMachine input committed effect dispatch failed: {error}"
1120 ));
1121 }
1122 Ok(())
1123 }
1124
1125 #[cfg(test)]
1126 pub(crate) async fn debug_shared_ingress_authorities(
1127 &self,
1128 session_id: &SessionId,
1129 ) -> Option<(
1130 Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>,
1131 crate::driver::ephemeral::SharedIngressDslAuthority,
1132 )> {
1133 let sessions = self.sessions.read().await;
1134 let entry = sessions.get(session_id)?;
1135 let session_authority = Arc::clone(&entry.dsl_authority);
1136 let driver = entry.driver.lock().await;
1137 Some((session_authority, driver.shared_dsl_authority()))
1138 }
1139
1140 fn make_driver(
1142 &self,
1143 runtime_id: LogicalRuntimeId,
1144 dsl_authority: crate::driver::ephemeral::SharedIngressDslAuthority,
1145 initial_runtime_state: RuntimeState,
1146 ) -> DriverEntry {
1147 let control_projection = Arc::new(StdRwLock::new(
1148 crate::driver::ephemeral::RuntimeControlProjection {
1149 phase: initial_runtime_state,
1150 current_run_id: None,
1151 pre_run_phase: None,
1152 },
1153 ));
1154 match (&self.store, &self.blob_store) {
1155 (Some(store), Some(blob_store)) => {
1156 DriverEntry::Persistent(PersistentRuntimeDriver::new_with_control(
1157 runtime_id,
1158 store.clone(),
1159 blob_store.clone(),
1160 control_projection,
1161 dsl_authority,
1162 ))
1163 }
1164 _ => DriverEntry::Ephemeral(EphemeralRuntimeDriver::new_with_control_and_dsl(
1165 runtime_id,
1166 control_projection,
1167 dsl_authority,
1168 )),
1169 }
1170 }
1171
1172 async fn recover_or_create_ops_state(
1179 &self,
1180 session_id: &SessionId,
1181 runtime_id: &LogicalRuntimeId,
1182 ) -> (
1183 Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
1184 meerkat_core::RuntimeEpochId,
1185 Arc<meerkat_core::EpochCursorState>,
1186 ) {
1187 if let Some(ref store) = self.store {
1188 match store.load_ops_lifecycle(runtime_id).await {
1189 Ok(Some(snapshot)) => {
1190 let recovered_epoch = snapshot.epoch_id.clone();
1191 let recovered_cursors = meerkat_core::EpochCursorState::from_recovered(
1192 snapshot.cursors.agent_applied_cursor,
1193 snapshot.cursors.runtime_observed_seq,
1194 snapshot.cursors.runtime_last_injected_seq,
1195 );
1196 let recovered_ops_count = snapshot.completion_entries.len();
1197 let registry =
1198 crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::from_recovered(snapshot);
1199 tracing::info!(
1200 %session_id,
1201 %runtime_id,
1202 epoch_id = %recovered_epoch,
1203 recovered_ops = recovered_ops_count,
1204 "ops lifecycle recovered from durable store (same epoch)"
1205 );
1206 return (
1207 Arc::new(registry),
1208 recovered_epoch,
1209 Arc::new(recovered_cursors),
1210 );
1211 }
1212 Ok(None) => {}
1213 Err(err) => {
1214 tracing::warn!(
1215 %session_id,
1216 %runtime_id,
1217 error = %err,
1218 "failed to load ops lifecycle; epoch rotated"
1219 );
1220 return (
1221 Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
1222 meerkat_core::RuntimeEpochId::new(),
1223 Arc::new(meerkat_core::EpochCursorState::new()),
1224 );
1225 }
1226 }
1227 tracing::debug!(%session_id, "no persisted ops lifecycle; fresh epoch");
1228 (
1229 Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
1230 meerkat_core::RuntimeEpochId::new(),
1231 Arc::new(meerkat_core::EpochCursorState::new()),
1232 )
1233 } else {
1234 (
1235 Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
1236 meerkat_core::RuntimeEpochId::new(),
1237 Arc::new(meerkat_core::EpochCursorState::new()),
1238 )
1239 }
1240 }
1241
1242 #[allow(clippy::large_futures)]
1243 fn execute_meerkat_machine_command(
1244 &self,
1245 self_handle: Option<Arc<Self>>,
1246 command: MeerkatMachineCommand,
1247 ) -> MeerkatMachineCommandFuture<'_> {
1248 Box::pin(async move {
1249 match command {
1250 MeerkatMachineCommand::EnsureSessionWithExecutor { .. } => {
1251 let self_handle = self_handle.ok_or_else(|| {
1252 MeerkatMachineCommandError::Driver(RuntimeDriverError::Internal(
1253 "EnsureSessionWithExecutor requires Arc<Self> machine handle".into(),
1254 ))
1255 })?;
1256 self_handle
1257 .execute_meerkat_machine_ensure_session_command(command)
1258 .await
1259 .map_err(Into::into)
1260 }
1261 MeerkatMachineCommand::RegisterSession { .. }
1262 | MeerkatMachineCommand::UnregisterSession { .. }
1263 | MeerkatMachineCommand::SetSilentIntents { .. }
1264 | MeerkatMachineCommand::CancelAfterBoundary { .. }
1265 | MeerkatMachineCommand::StopRuntimeExecutor { .. }
1266 | MeerkatMachineCommand::CommitServiceTurnTerminalReceipt { .. }
1267 | MeerkatMachineCommand::ContainsSession { .. }
1268 | MeerkatMachineCommand::SessionHasExecutor { .. }
1269 | MeerkatMachineCommand::SessionHasComms { .. }
1270 | MeerkatMachineCommand::OpsLifecycleRegistry { .. }
1271 | MeerkatMachineCommand::PrepareBindings { .. }
1272 | MeerkatMachineCommand::PrepareLocalSessionBindings { .. }
1273 | MeerkatMachineCommand::InputState { .. }
1274 | MeerkatMachineCommand::ListActiveInputs { .. }
1275 | MeerkatMachineCommand::ReconfigureSessionLlmIdentity { .. }
1276 | MeerkatMachineCommand::StagePersistentFilter { .. }
1277 | MeerkatMachineCommand::RequestDeferredTools { .. }
1278 | MeerkatMachineCommand::PublishCommittedVisibleSet { .. } => self
1279 .execute_meerkat_machine_session_command(command)
1280 .await
1281 .map_err(Into::into),
1282 MeerkatMachineCommand::SetPeerIngressContext { .. }
1283 | MeerkatMachineCommand::NotifyDrainExited { .. } => {
1284 let self_handle = self_handle.ok_or_else(|| {
1285 MeerkatMachineCommandError::Driver(RuntimeDriverError::Internal(
1286 "drain command requires Arc<Self> machine handle".into(),
1287 ))
1288 })?;
1289 self_handle
1290 .execute_meerkat_machine_drain_command(command)
1291 .await
1292 .map_err(Into::into)
1293 }
1294 MeerkatMachineCommand::AbortAll
1295 | MeerkatMachineCommand::Abort { .. }
1296 | MeerkatMachineCommand::Wait { .. } => self
1297 .execute_meerkat_machine_drain_local_command(command)
1298 .await
1299 .map_err(Into::into),
1300 MeerkatMachineCommand::Ingest { .. }
1301 | MeerkatMachineCommand::PublishEvent { .. }
1302 | MeerkatMachineCommand::Retire { .. }
1303 | MeerkatMachineCommand::Recycle { .. }
1304 | MeerkatMachineCommand::Reset { .. }
1305 | MeerkatMachineCommand::Recover { .. }
1306 | MeerkatMachineCommand::Destroy { .. }
1307 | MeerkatMachineCommand::RuntimeState { .. }
1308 | MeerkatMachineCommand::ResolvedSessionLlmCapabilities { .. }
1309 | MeerkatMachineCommand::ConfigureModelRoutingBaseline { .. }
1310 | MeerkatMachineCommand::SessionModelRoutingStatus { .. }
1311 | MeerkatMachineCommand::RequestSwitchTurn { .. }
1312 | MeerkatMachineCommand::AdmitModelRoutingAssistantTurn { .. }
1313 | MeerkatMachineCommand::BeginImageOperation { .. }
1314 | MeerkatMachineCommand::ActivateImageOperationOverride { .. }
1315 | MeerkatMachineCommand::CompleteImageOperation { .. }
1316 | MeerkatMachineCommand::RestoreImageOperationOverride { .. }
1317 | MeerkatMachineCommand::LoadBoundaryReceipt { .. } => self
1318 .execute_meerkat_machine_control_command(command)
1319 .await
1320 .map_err(Into::into),
1321 MeerkatMachineCommand::AcceptWithCompletion { .. }
1322 | MeerkatMachineCommand::AcceptWithoutWake { .. } => self
1323 .execute_meerkat_machine_ingress_command(command)
1324 .await
1325 .map_err(Into::into),
1326 MeerkatMachineCommand::Prepare { .. }
1327 | MeerkatMachineCommand::Commit { .. }
1328 | MeerkatMachineCommand::Fail { .. } => self
1329 .execute_meerkat_machine_legacy_run_command(command)
1330 .await
1331 .map_err(Into::into),
1332 }
1333 })
1334 }
1335
1336 pub async fn register_session(&self, session_id: SessionId) {
1339 let _ = self
1340 .execute_meerkat_machine_command(
1341 None,
1342 MeerkatMachineCommand::RegisterSession { session_id },
1343 )
1344 .await;
1345 }
1346}
1347
1348#[cfg(test)]
1349#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
1350#[path = "../meerkat_machine_tests.rs"]
1351mod tests;