1use std::collections::{BTreeSet, HashMap, HashSet};
20use std::future::Future;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::sync::RwLock as StdRwLock;
24#[cfg(not(target_arch = "wasm32"))]
25use std::sync::{Mutex as StdMutex, OnceLock, Weak};
26
27use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
28use meerkat_core::lifecycle::{InputId, RunId};
29use meerkat_core::types::SessionId;
30use meerkat_core::{BlobId, BlobPayload, BlobRef, BlobStore, BlobStoreError};
31use meerkat_core::{
32 DeferredToolLoadAuthority, SessionToolVisibilityState, ToolFilter, ToolScopeApplyError,
33 ToolScopeRevision, ToolScopeStageError, ToolVisibilityOwner, ToolVisibilityWitness,
34};
35
36use crate::accept::AcceptOutcome;
37use crate::driver::ephemeral::EphemeralRuntimeDriver;
38use crate::driver::persistent::PersistentRuntimeDriver;
39use crate::identifiers::LogicalRuntimeId;
40use crate::input::Input;
41use crate::input_state::InputLifecycleState;
42use crate::meerkat_machine_types::{
43 HydratedSessionLlmState, MeerkatAdmittedInputSnapshot, MeerkatBindingSnapshot,
44 MeerkatCompletionWaiterSnapshot, MeerkatCompletionWaitersSnapshot, MeerkatControlSnapshot,
45 MeerkatCursorSnapshot, MeerkatDrainSnapshot, MeerkatDriverKind, MeerkatFormalStateProjection,
46 MeerkatInputsSnapshot, MeerkatLedgerSnapshot, MeerkatMachineCommand,
47 MeerkatMachineCommandError, MeerkatMachineCommandResult, MeerkatMachineRunFailure,
48 MeerkatMachineRunPrepared, MeerkatMachineSpineSnapshot, MeerkatOpsSnapshot,
49 SessionLlmCapabilityDelta, SessionLlmCapabilitySurface, SessionLlmCapabilitySurfaceStatus,
50 SessionLlmReconfigureHost, SessionLlmReconfigureReport, SessionLlmReconfigureRequest,
51 SessionToolVisibilityDelta,
52};
53use crate::runtime_state::RuntimeState;
54use crate::service_ext::{RuntimeMode, SessionServiceRuntimeExt};
55use crate::store::RuntimeStore;
56use crate::tokio;
57use crate::tokio::sync::{Mutex, RwLock, mpsc};
58#[cfg(test)]
59use crate::traits::RuntimeDriver;
60use crate::traits::{
61 DestroyReport, RecoveryReport, RecycleReport, ResetReport, RetireReport,
62 RuntimeControlPlaneError, RuntimeDriverError,
63};
64
65#[derive(Debug, thiserror::Error)]
67pub enum RuntimeBindingsError {
68 #[error("session {0} not found in runtime adapter after registration")]
70 SessionNotFound(SessionId),
71 #[error("failed to prepare runtime bindings for session {0}: {1}")]
73 PrepareFailed(SessionId, String),
74}
75
76#[derive(Debug, Default)]
77struct UnavailableBlobStore;
78
79impl UnavailableBlobStore {
80 fn error() -> BlobStoreError {
81 BlobStoreError::Unsupported(
82 "persistent runtime constructed without blob store; blob-backed inputs require a BlobStore"
83 .to_string(),
84 )
85 }
86}
87
88#[cfg(not(target_arch = "wasm32"))]
89struct PersistentAuthAuthorityBundle {
90 store: StdMutex<Weak<dyn RuntimeStore>>,
91 auth_lease: Arc<crate::handles::RuntimeAuthLeaseHandle>,
92 oauth_flows: Arc<crate::handles::RuntimeOAuthFlowHandle>,
93}
94
95#[cfg(not(target_arch = "wasm32"))]
96#[derive(Debug, Clone, PartialEq, Eq, Hash)]
97enum PersistentAuthAuthorityKey {
98 Durable(String),
99 Process(usize),
100}
101
102#[cfg(not(target_arch = "wasm32"))]
103static PERSISTENT_AUTH_AUTHORITIES: OnceLock<
104 StdMutex<HashMap<PersistentAuthAuthorityKey, Arc<PersistentAuthAuthorityBundle>>>,
105> = OnceLock::new();
106
107#[cfg(not(target_arch = "wasm32"))]
108fn runtime_store_identity(store: &Arc<dyn RuntimeStore>) -> PersistentAuthAuthorityKey {
109 store
110 .auth_authority_key()
111 .map(PersistentAuthAuthorityKey::Durable)
112 .unwrap_or_else(|| {
113 PersistentAuthAuthorityKey::Process(Arc::as_ptr(store).cast::<()>() as usize)
114 })
115}
116
117fn runtime_stores_share_authority(a: &Arc<dyn RuntimeStore>, b: &Arc<dyn RuntimeStore>) -> bool {
118 match (a.auth_authority_key(), b.auth_authority_key()) {
119 (Some(a), Some(b)) => a == b,
120 _ => Arc::ptr_eq(a, b),
121 }
122}
123
124#[cfg(not(target_arch = "wasm32"))]
125fn persistent_auth_authorities(
126 store: &Arc<dyn RuntimeStore>,
127) -> Arc<PersistentAuthAuthorityBundle> {
128 let key = runtime_store_identity(store);
129 let authorities = PERSISTENT_AUTH_AUTHORITIES.get_or_init(|| StdMutex::new(HashMap::new()));
130 let mut authorities = authorities
131 .lock()
132 .unwrap_or_else(std::sync::PoisonError::into_inner);
133 if let Some(existing) = authorities.get(&key) {
134 let stored_store_alive = existing
135 .store
136 .lock()
137 .unwrap_or_else(std::sync::PoisonError::into_inner)
138 .upgrade()
139 .is_some();
140 if matches!(key, PersistentAuthAuthorityKey::Durable(_)) || stored_store_alive {
141 existing.oauth_flows.bind_persistent_store(store);
142 *existing
143 .store
144 .lock()
145 .unwrap_or_else(std::sync::PoisonError::into_inner) = Arc::downgrade(store);
146 return Arc::clone(existing);
147 }
148 }
149 let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
150 let oauth_flows = Arc::new(
151 crate::handles::RuntimeOAuthFlowHandle::new_with_persistent_store_and_auth_lease(
152 std::time::Duration::from_secs(10 * 60),
153 Arc::clone(&auth_lease),
154 store,
155 ),
156 );
157 let bundle = Arc::new(PersistentAuthAuthorityBundle {
158 store: StdMutex::new(Arc::downgrade(store)),
159 auth_lease,
160 oauth_flows,
161 });
162 authorities.insert(key, Arc::clone(&bundle));
163 bundle
164}
165
166#[cfg(all(test, not(target_arch = "wasm32")))]
167pub(crate) fn clear_persistent_auth_authorities_for_test() {
168 if let Some(authorities) = PERSISTENT_AUTH_AUTHORITIES.get() {
169 authorities
170 .lock()
171 .unwrap_or_else(std::sync::PoisonError::into_inner)
172 .clear();
173 }
174}
175
176#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
177#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
178impl BlobStore for UnavailableBlobStore {
179 async fn put_image(&self, _media_type: &str, _data: &str) -> Result<BlobRef, BlobStoreError> {
180 Err(Self::error())
181 }
182
183 async fn get(&self, _blob_id: &BlobId) -> Result<BlobPayload, BlobStoreError> {
184 Err(Self::error())
185 }
186
187 async fn delete(&self, _blob_id: &BlobId) -> Result<(), BlobStoreError> {
188 Err(Self::error())
189 }
190
191 async fn exists(&self, _blob_id: &BlobId) -> Result<bool, BlobStoreError> {
192 Err(Self::error())
193 }
194
195 fn is_persistent(&self) -> bool {
196 false
197 }
198}
199
200#[cfg(not(target_arch = "wasm32"))]
201struct UnavailableOAuthFlowAuthority;
202
203#[cfg(not(target_arch = "wasm32"))]
204impl UnavailableOAuthFlowAuthority {
205 fn rejected(operation: &'static str) -> meerkat_auth_core::oauth_flow::OAuthFlowError {
206 meerkat_auth_core::oauth_flow::OAuthFlowError::LifecycleRejected {
207 operation,
208 detail: "custom auth lease handle does not provide a runtime OAuth flow authority"
209 .to_string(),
210 }
211 }
212}
213
214#[cfg(not(target_arch = "wasm32"))]
215impl meerkat_auth_core::oauth_flow::OAuthFlowAuthority for UnavailableOAuthFlowAuthority {
216 fn start(
217 &self,
218 _target: meerkat_core::AuthBindingRef,
219 _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
220 _redirect_uri: String,
221 _pkce_verifier: String,
222 ) -> Result<String, meerkat_auth_core::oauth_flow::OAuthFlowError> {
223 Err(Self::rejected("admit_oauth_browser_flow"))
224 }
225
226 fn verify(
227 &self,
228 _state: &str,
229 _target: &meerkat_core::AuthBindingRef,
230 _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
231 _redirect_uri: &str,
232 ) -> Result<
233 meerkat_auth_core::oauth_flow::OAuthFlowRecord,
234 meerkat_auth_core::oauth_flow::OAuthFlowError,
235 > {
236 Err(Self::rejected("verify_oauth_browser_flow"))
237 }
238
239 fn consume(
240 &self,
241 _state: &str,
242 _target: &meerkat_core::AuthBindingRef,
243 _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
244 _redirect_uri: &str,
245 ) -> Result<
246 meerkat_auth_core::oauth_flow::OAuthFlowRecord,
247 meerkat_auth_core::oauth_flow::OAuthFlowError,
248 > {
249 Err(Self::rejected("consume_oauth_browser_flow"))
250 }
251
252 fn admit_device_code(
253 &self,
254 _target: meerkat_core::AuthBindingRef,
255 _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
256 _device_code: String,
257 _expires_in: std::time::Duration,
258 ) -> Result<(), meerkat_auth_core::oauth_flow::OAuthFlowError> {
259 Err(Self::rejected("admit_oauth_device_flow"))
260 }
261
262 fn verify_device_code(
263 &self,
264 _device_code: &str,
265 _target: &meerkat_core::AuthBindingRef,
266 _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
267 ) -> Result<
268 meerkat_auth_core::oauth_flow::OAuthDeviceFlowRecord,
269 meerkat_auth_core::oauth_flow::OAuthFlowError,
270 > {
271 Err(Self::rejected("verify_oauth_device_flow"))
272 }
273
274 fn begin_device_code_poll(
275 &self,
276 _device_code: &str,
277 _target: &meerkat_core::AuthBindingRef,
278 _provider: meerkat_auth_core::oauth_flow::OAuthProviderIdentity,
279 ) -> Result<
280 meerkat_auth_core::oauth_flow::OAuthDevicePollLease,
281 meerkat_auth_core::oauth_flow::OAuthFlowError,
282 > {
283 Err(Self::rejected("begin_oauth_device_poll"))
284 }
285}
286
287#[cfg(not(target_arch = "wasm32"))]
288type MeerkatMachineCommandFuture<'a> = Pin<
289 Box<
290 dyn Future<Output = Result<MeerkatMachineCommandResult, MeerkatMachineCommandError>>
291 + Send
292 + 'a,
293 >,
294>;
295
296#[cfg(target_arch = "wasm32")]
297type MeerkatMachineCommandFuture<'a> = Pin<
298 Box<dyn Future<Output = Result<MeerkatMachineCommandResult, MeerkatMachineCommandError>> + 'a>,
299>;
300
301pub(crate) use driver::{
302 DriverEntry, SharedCompletionRegistry, SharedDriver, cancel_runtime_loop_run,
303 commit_runtime_loop_run, fail_machine_run, fail_runtime_loop_run,
304 machine_apply_run_return_projection, machine_batch_primitive_projections,
305 machine_batch_runtime_semantics, machine_begin_run, machine_commit_prepared_destroy,
306 machine_commit_service_turn_terminal_receipt, machine_executor_attach_projection,
307 machine_input_boundary, machine_prepare_bindings_projection, machine_prepare_destroy,
308 machine_recover_ephemeral_driver, machine_recover_persistent_driver,
309 machine_recycle_preserving_work, machine_reset, machine_retire,
310 machine_select_runtime_loop_batch, machine_stop_runtime, machine_unregister_session_projection,
311 prepare_runtime_loop_batch_start, rollback_runtime_loop_run_after_boundary_commit_failure,
312};
313
314pub(crate) mod driver;
315
316mod comms_drain;
317pub mod composition;
318mod dispatch_control;
319mod dispatch_drain;
320mod dispatch_ingress;
321mod dispatch_session;
322#[allow(unused_variables, dead_code, clippy::cmp_owned)]
323#[allow(clippy::assign_op_pattern)]
324pub mod dsl;
325pub(crate) mod dsl_authority;
326mod dsl_effects;
327mod llm_reconfigure;
328mod runtime_control;
329mod session_management;
330mod traits;
331mod visibility;
332
333pub use composition::{MeerkatCompositionSignalDispatcher, MeerkatConsumerSurface};
334
335pub use comms_drain::{
336 CommsDrainMode, CommsDrainPhase, DrainExitReason, PeerEndpointStageError, PeerIngressOwner,
337 SupervisorBinding, SupervisorBindingStageError,
338};
339pub(crate) use comms_drain::{CommsDrainSlot, abort_slot};
340pub(crate) use dsl_effects::{DslTransitionEffects, apply_dsl_transition_on_authority};
341pub(crate) use visibility::MachineToolVisibilityOwner;
342
343struct StagedSessionDslInput {
344 previous_state: Box<dsl::MeerkatMachineState>,
345 effects: DslTransitionEffects,
346}
347
348#[derive(Clone, Copy)]
349enum CommittedEffectDispatchFailure {
350 PreserveCommittedDslState,
351 RestorePreviousDslState,
352}
353
354struct RuntimeSessionEntry {
356 runtime_id: LogicalRuntimeId,
358 mutation_gate: Arc<Mutex<()>>,
370 driver: SharedDriver,
372 control_projection: Arc<StdRwLock<crate::driver::ephemeral::RuntimeControlProjection>>,
378 ops_lifecycle: Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
380 epoch_id: meerkat_core::RuntimeEpochId,
382 cursor_state: Arc<meerkat_core::EpochCursorState>,
384 completions: SharedCompletionRegistry,
386 tool_visibility_owner: Arc<MachineToolVisibilityOwner>,
388 current_llm_identity: Option<meerkat_core::SessionLlmIdentity>,
390 current_capability_surface: Option<SessionLlmCapabilitySurface>,
392 capability_surface_status: SessionLlmCapabilitySurfaceStatus,
394 phase: RegistrationPhase,
397 provisional_interrupt_handle:
400 Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
401 dsl_authority: Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>,
411 drain_slot: CommsDrainSlot,
421}
422
423struct RuntimeLoopAttachment {
428 wake_tx: mpsc::Sender<()>,
429 effect_tx: mpsc::Sender<crate::effect::RuntimeEffect>,
430 boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
431 interrupt_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
432 _loop_handle: tokio::task::JoinHandle<()>,
433}
434
435enum RegistrationPhase {
441 Queuing,
444 Attaching,
448 Active(RuntimeLoopAttachment),
451}
452
453impl RuntimeSessionEntry {
454 fn control_snapshot(&self) -> crate::driver::ephemeral::RuntimeControlProjection {
455 self.control_projection
456 .read()
457 .map(|guard| guard.clone())
458 .unwrap_or_else(|poisoned| {
459 tracing::error!("runtime control projection lock poisoned");
460 poisoned.into_inner().clone()
461 })
462 }
463
464 fn attachment_is_live(&self) -> bool {
465 match &self.phase {
466 RegistrationPhase::Active(attachment) => {
467 !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed()
468 }
469 RegistrationPhase::Queuing | RegistrationPhase::Attaching => false,
470 }
471 }
472
473 fn has_attachment_or_attaching(&self) -> bool {
478 matches!(self.phase, RegistrationPhase::Attaching) || self.attachment_is_live()
479 }
480
481 fn has_live_attachment(&self) -> bool {
485 self.attachment_is_live()
486 }
487
488 fn attach_runtime_loop(
489 &mut self,
490 wake_tx: mpsc::Sender<()>,
491 effect_tx: mpsc::Sender<crate::effect::RuntimeEffect>,
492 boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
493 interrupt_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>>,
494 loop_handle: tokio::task::JoinHandle<()>,
495 ) {
496 self.provisional_interrupt_handle = None;
497 self.phase = RegistrationPhase::Active(RuntimeLoopAttachment {
498 wake_tx,
499 effect_tx,
500 boundary_handle,
501 interrupt_handle,
502 _loop_handle: loop_handle,
503 });
504 }
505
506 fn clear_dead_attachment(&mut self) -> bool {
507 if matches!(self.phase, RegistrationPhase::Active(_)) && !self.attachment_is_live() {
508 self.phase = RegistrationPhase::Queuing;
511 return true;
512 }
513 false
514 }
515
516 fn wake_sender(&self) -> Option<mpsc::Sender<()>> {
517 match &self.phase {
518 RegistrationPhase::Active(attachment)
519 if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
520 {
521 Some(attachment.wake_tx.clone())
522 }
523 _ => None,
524 }
525 }
526
527 fn effect_sender(&self) -> Option<mpsc::Sender<crate::effect::RuntimeEffect>> {
528 match &self.phase {
529 RegistrationPhase::Active(attachment)
530 if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
531 {
532 Some(attachment.effect_tx.clone())
533 }
534 _ => None,
535 }
536 }
537
538 fn boundary_handle(
539 &self,
540 ) -> Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>> {
541 match &self.phase {
542 RegistrationPhase::Active(attachment)
543 if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
544 {
545 attachment.boundary_handle.clone()
546 }
547 _ => None,
548 }
549 }
550
551 fn interrupt_handle(
552 &self,
553 ) -> Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>> {
554 match &self.phase {
555 RegistrationPhase::Active(attachment)
556 if !attachment.wake_tx.is_closed() && !attachment.effect_tx.is_closed() =>
557 {
558 attachment.interrupt_handle.clone()
559 }
560 _ => self.provisional_interrupt_handle.clone(),
561 }
562 }
563
564 fn install_provisional_interrupt_handle(
565 &mut self,
566 handle: Arc<dyn meerkat_core::lifecycle::CoreExecutorInterruptHandle>,
567 ) {
568 if !self.attachment_is_live() {
569 self.provisional_interrupt_handle = Some(handle);
570 }
571 }
572}
573
574impl MeerkatMachine {
575 async fn session_mutation_gate(&self, session_id: &SessionId) -> Option<Arc<Mutex<()>>> {
581 let sessions = self.sessions.read().await;
582 sessions
583 .get(session_id)
584 .map(|entry| Arc::clone(&entry.mutation_gate))
585 }
586
587 async fn session_dsl_authority(
588 &self,
589 session_id: &SessionId,
590 ) -> Result<Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>, String> {
591 let sessions = self.sessions.read().await;
592 sessions
593 .get(session_id)
594 .map(|entry| Arc::clone(&entry.dsl_authority))
595 .ok_or_else(|| {
596 RuntimeDriverError::NotReady {
597 state: RuntimeState::Destroyed,
598 }
599 .to_string()
600 })
601 }
602
603 fn preview_dsl_input_on_state(
604 state: &dsl::MeerkatMachineState,
605 input: dsl::MeerkatMachineInput,
606 context: &str,
607 ) -> Result<Vec<dsl::MeerkatMachineEffect>, String> {
608 let mut preview = dsl::MeerkatMachineAuthority::from_state(state.clone());
609 dsl::MeerkatMachineMutator::apply(&mut preview, input)
610 .map(|transition| transition.effects)
611 .map_err(|err| dsl_authority::map_error(err, context))
612 }
613
614 async fn preview_session_dsl_input(
615 &self,
616 session_id: &SessionId,
617 input: dsl::MeerkatMachineInput,
618 context: &str,
619 ) -> Result<Vec<dsl::MeerkatMachineEffect>, String> {
620 let authority = self.session_dsl_authority(session_id).await?;
621 let state = {
622 let authority = authority
623 .lock()
624 .unwrap_or_else(std::sync::PoisonError::into_inner);
625 authority.state.clone()
626 };
627 Self::preview_dsl_input_on_state(&state, input, context)
628 }
629
630 async fn session_dsl_state(
631 &self,
632 session_id: &SessionId,
633 ) -> Result<dsl::MeerkatMachineState, RuntimeControlPlaneError> {
634 let authority = self
635 .session_dsl_authority(session_id)
636 .await
637 .map_err(RuntimeControlPlaneError::Internal)?;
638 let authority = authority
639 .lock()
640 .unwrap_or_else(std::sync::PoisonError::into_inner);
641 Ok(authority.state.clone())
642 }
643
644 async fn commit_session_dsl_transition(
645 &self,
646 session_id: &SessionId,
647 staged: StagedSessionDslInput,
648 context: &str,
649 ) -> Result<(), String> {
650 self.commit_session_dsl_transition_with_dispatch_failure(
651 session_id,
652 staged,
653 context,
654 CommittedEffectDispatchFailure::RestorePreviousDslState,
655 )
656 .await
657 }
658
659 async fn commit_session_dsl_transition_preserving_committed_state(
660 &self,
661 session_id: &SessionId,
662 staged: StagedSessionDslInput,
663 context: &str,
664 ) -> Result<(), String> {
665 self.commit_session_dsl_transition_with_dispatch_failure(
666 session_id,
667 staged,
668 context,
669 CommittedEffectDispatchFailure::PreserveCommittedDslState,
670 )
671 .await
672 }
673
674 async fn commit_session_dsl_transition_with_dispatch_failure(
675 &self,
676 session_id: &SessionId,
677 staged: StagedSessionDslInput,
678 context: &str,
679 dispatch_failure: CommittedEffectDispatchFailure,
680 ) -> Result<(), String> {
681 if let Err(error) = self
682 .dispatch_routed_signals_from_effects(&staged.effects)
683 .await
684 {
685 match dispatch_failure {
686 CommittedEffectDispatchFailure::PreserveCommittedDslState => {}
687 CommittedEffectDispatchFailure::RestorePreviousDslState => {
688 self.restore_session_dsl_state(session_id, staged.previous_state)
689 .await;
690 }
691 }
692 return Err(format!(
693 "DSL authority ({context}): committed effect dispatch failed: {error}"
694 ));
695 }
696 Ok(())
697 }
698
699 async fn dispatch_routed_signals_from_effects(
700 &self,
701 effects: &[dsl::MeerkatMachineEffect],
702 ) -> Result<(), String> {
703 let dispatcher = {
704 self.composition_signal_dispatcher
705 .read()
706 .unwrap_or_else(std::sync::PoisonError::into_inner)
707 .clone()
708 };
709 let Some(dispatcher) = dispatcher else {
710 return Ok(());
711 };
712
713 for effect in effects {
714 if let Some(signal) = composition::lift_routed_signal(effect) {
715 composition::dispatch_routed_signal(&dispatcher, signal).await?;
716 }
717 }
718 Ok(())
719 }
720
721 async fn clear_dead_runtime_attachment(&self, session_id: &SessionId) {
722 let mut sessions = self.sessions.write().await;
723 if let Some(entry) = sessions.get_mut(session_id) {
724 entry.clear_dead_attachment();
725 }
726 }
727
728 async fn dispatch_interrupt_yielding_runtime_effect(
729 &self,
730 session_id: &SessionId,
731 effect_tx: Option<mpsc::Sender<crate::effect::RuntimeEffect>>,
732 boundary_handle: Option<Arc<dyn meerkat_core::lifecycle::CoreExecutorBoundaryHandle>>,
733 projected_effect: crate::effect::ProjectedRuntimeEffect,
734 context: &str,
735 ) -> Result<(), RuntimeDriverError> {
736 let Some(effect_tx) = effect_tx else {
737 let state = self
738 .existing_session_runtime_state(session_id)
739 .await
740 .unwrap_or(RuntimeState::Destroyed);
741 return Err(RuntimeDriverError::NotReady { state });
742 };
743
744 let reason = projected_effect.reason().to_string();
745 if let Some(boundary_handle) = boundary_handle {
746 boundary_handle
747 .cancel_after_boundary(reason)
748 .await
749 .map_err(|err| {
750 RuntimeDriverError::Internal(format!(
751 "{context}: failed to apply live boundary cancel: {err}"
752 ))
753 })?;
754 }
755
756 match effect_tx.try_send(projected_effect.into_effect()) {
757 Ok(()) => Ok(()),
758 Err(mpsc::error::TrySendError::Full(_)) => Err(RuntimeDriverError::Internal(format!(
759 "{context}: runtime effect channel full after accepted boundary cancel"
760 ))),
761 Err(mpsc::error::TrySendError::Closed(_)) => {
762 self.clear_dead_runtime_attachment(session_id).await;
763 Err(RuntimeDriverError::NotReady {
764 state: RuntimeState::Idle,
765 })
766 }
767 }
768 }
769
770 async fn restore_session_dsl_state(
771 &self,
772 session_id: &SessionId,
773 state: Box<dsl::MeerkatMachineState>,
774 ) {
775 if let Ok(authority) = self.session_dsl_authority(session_id).await {
776 Self::restore_dsl_authority_state(&authority, state);
777 }
778 }
779
780 fn restore_dsl_authority_state(
781 authority: &crate::driver::ephemeral::SharedIngressDslAuthority,
782 state: Box<dsl::MeerkatMachineState>,
783 ) {
784 let mut authority = authority
785 .lock()
786 .unwrap_or_else(std::sync::PoisonError::into_inner);
787 *authority = dsl::MeerkatMachineAuthority::from_state(*state);
788 }
789}
790
791#[derive(Debug, Clone, Copy)]
794pub struct MachineSessionControlAuthority {
795 _private: (),
796}
797
798pub struct MeerkatMachine {
805 sessions: RwLock<HashMap<SessionId, RuntimeSessionEntry>>,
807 mode: RuntimeMode,
809 store: Option<Arc<dyn RuntimeStore>>,
811 blob_store: Option<Arc<dyn BlobStore>>,
813 llm_reconfigure_host: StdRwLock<Option<Arc<dyn SessionLlmReconfigureHost>>>,
815 auth_lease: StdRwLock<Arc<dyn meerkat_core::handles::AuthLeaseHandle>>,
818 #[cfg(not(target_arch = "wasm32"))]
821 oauth_flows: StdRwLock<Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority>>,
822 session_claims: Arc<crate::handles::RuntimeSessionClaimRegistry>,
829 composition_signal_dispatcher:
833 StdRwLock<Option<composition::MeerkatCompositionSignalDispatcher>>,
834}
835
836impl MeerkatMachine {
837 #[must_use]
840 pub fn session_control_authority(&self) -> MachineSessionControlAuthority {
841 MachineSessionControlAuthority { _private: () }
842 }
843
844 #[must_use]
849 pub fn shares_runtime_persistence_with(&self, other: &Self) -> bool {
850 match (&self.store, &other.store) {
851 (None, None) => true,
852 (Some(a), Some(b)) => runtime_stores_share_authority(a, b),
853 _ => false,
854 }
855 }
856
857 #[must_use]
860 pub fn shares_runtime_store_authority(&self, store: &Arc<dyn RuntimeStore>) -> bool {
861 self.store
862 .as_ref()
863 .is_some_and(|machine_store| runtime_stores_share_authority(machine_store, store))
864 }
865
866 #[must_use]
868 pub fn has_runtime_persistence(&self) -> bool {
869 self.store.is_some()
870 }
871
872 fn normalize_destroyed_error(err: RuntimeDriverError) -> RuntimeDriverError {
873 match err {
874 RuntimeDriverError::NotReady {
875 state: RuntimeState::Destroyed,
876 } => RuntimeDriverError::Destroyed,
877 other => other,
878 }
879 }
880
881 pub fn ephemeral() -> Self {
883 let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
884 #[cfg(not(target_arch = "wasm32"))]
885 let oauth_flows = Arc::new(crate::handles::RuntimeOAuthFlowHandle::new_with_auth_lease(
886 std::time::Duration::from_secs(10 * 60),
887 Arc::clone(&auth_lease),
888 ));
889 let auth_lease: Arc<dyn meerkat_core::handles::AuthLeaseHandle> = auth_lease;
890 Self {
891 sessions: RwLock::new(HashMap::new()),
892 mode: RuntimeMode::V9Compliant,
893 store: None,
894 blob_store: None,
895 llm_reconfigure_host: StdRwLock::new(None),
896 auth_lease: StdRwLock::new(auth_lease),
897 #[cfg(not(target_arch = "wasm32"))]
898 oauth_flows: StdRwLock::new(oauth_flows),
899 session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
900 composition_signal_dispatcher: StdRwLock::new(None),
901 }
902 }
903
904 pub fn persistent(store: Arc<dyn RuntimeStore>, blob_store: Arc<dyn BlobStore>) -> Self {
906 #[cfg(not(target_arch = "wasm32"))]
907 let (auth_lease, oauth_flows) = {
908 let authorities = persistent_auth_authorities(&store);
909 (
910 Arc::clone(&authorities.auth_lease),
911 Arc::clone(&authorities.oauth_flows),
912 )
913 };
914 #[cfg(target_arch = "wasm32")]
915 let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
916 let auth_lease: Arc<dyn meerkat_core::handles::AuthLeaseHandle> = auth_lease;
917 Self {
918 sessions: RwLock::new(HashMap::new()),
919 mode: RuntimeMode::V9Compliant,
920 store: Some(store),
921 blob_store: Some(blob_store),
922 llm_reconfigure_host: StdRwLock::new(None),
923 auth_lease: StdRwLock::new(auth_lease),
924 #[cfg(not(target_arch = "wasm32"))]
925 oauth_flows: StdRwLock::new(oauth_flows),
926 session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
927 composition_signal_dispatcher: StdRwLock::new(None),
928 }
929 }
930
931 pub fn persistent_without_blobs(store: Arc<dyn RuntimeStore>) -> Self {
937 #[cfg(not(target_arch = "wasm32"))]
938 let (auth_lease, oauth_flows) = {
939 let authorities = persistent_auth_authorities(&store);
940 (
941 Arc::clone(&authorities.auth_lease),
942 Arc::clone(&authorities.oauth_flows),
943 )
944 };
945 #[cfg(target_arch = "wasm32")]
946 let auth_lease = Arc::new(crate::handles::RuntimeAuthLeaseHandle::new());
947 let auth_lease: Arc<dyn meerkat_core::handles::AuthLeaseHandle> = auth_lease;
948 Self {
949 sessions: RwLock::new(HashMap::new()),
950 mode: RuntimeMode::V9Compliant,
951 store: Some(store),
952 blob_store: Some(Arc::new(UnavailableBlobStore)),
953 llm_reconfigure_host: StdRwLock::new(None),
954 auth_lease: StdRwLock::new(auth_lease),
955 #[cfg(not(target_arch = "wasm32"))]
956 oauth_flows: StdRwLock::new(oauth_flows),
957 session_claims: Arc::new(crate::handles::RuntimeSessionClaimRegistry::new()),
958 composition_signal_dispatcher: StdRwLock::new(None),
959 }
960 }
961
962 pub fn auth_lease_handle(&self) -> Arc<dyn meerkat_core::handles::AuthLeaseHandle> {
965 Arc::clone(
966 &self
967 .auth_lease
968 .read()
969 .unwrap_or_else(std::sync::PoisonError::into_inner),
970 )
971 }
972
973 pub fn set_auth_lease_handle(&self, handle: Arc<dyn meerkat_core::handles::AuthLeaseHandle>) {
979 #[cfg(not(target_arch = "wasm32"))]
980 let handle = {
981 let erased: Arc<dyn std::any::Any + Send + Sync> = handle.clone();
982 if let Ok(runtime_handle) =
983 Arc::downcast::<crate::handles::RuntimeAuthLeaseHandle>(erased)
984 {
985 self.set_runtime_auth_lease_handle(runtime_handle);
986 return;
987 }
988 *self
989 .oauth_flows
990 .write()
991 .unwrap_or_else(std::sync::PoisonError::into_inner) =
992 Arc::new(UnavailableOAuthFlowAuthority);
993 handle
994 };
995 *self
996 .auth_lease
997 .write()
998 .unwrap_or_else(std::sync::PoisonError::into_inner) = handle;
999 }
1000
1001 #[cfg(not(target_arch = "wasm32"))]
1008 pub fn set_auth_lease_handle_with_oauth_flow_authority(
1009 &self,
1010 handle: Arc<dyn meerkat_core::handles::AuthLeaseHandle>,
1011 oauth_flows: Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority>,
1012 ) {
1013 *self
1014 .oauth_flows
1015 .write()
1016 .unwrap_or_else(std::sync::PoisonError::into_inner) = oauth_flows;
1017 *self
1018 .auth_lease
1019 .write()
1020 .unwrap_or_else(std::sync::PoisonError::into_inner) = handle;
1021 }
1022
1023 pub fn set_runtime_auth_lease_handle(
1026 &self,
1027 handle: Arc<crate::handles::RuntimeAuthLeaseHandle>,
1028 ) {
1029 #[cfg(not(target_arch = "wasm32"))]
1030 {
1031 *self
1032 .oauth_flows
1033 .write()
1034 .unwrap_or_else(std::sync::PoisonError::into_inner) =
1035 Arc::new(crate::handles::RuntimeOAuthFlowHandle::new_with_auth_lease(
1036 std::time::Duration::from_secs(10 * 60),
1037 Arc::clone(&handle),
1038 ));
1039 }
1040 let handle: Arc<dyn meerkat_core::handles::AuthLeaseHandle> = handle;
1041 *self
1042 .auth_lease
1043 .write()
1044 .unwrap_or_else(std::sync::PoisonError::into_inner) = handle;
1045 }
1046
1047 #[cfg(not(target_arch = "wasm32"))]
1050 pub fn oauth_flow_authority(
1051 &self,
1052 ) -> Arc<dyn meerkat_auth_core::oauth_flow::OAuthFlowAuthority> {
1053 Arc::clone(
1054 &self
1055 .oauth_flows
1056 .read()
1057 .unwrap_or_else(std::sync::PoisonError::into_inner),
1058 )
1059 }
1060
1061 pub fn session_claim_handle(&self) -> Arc<dyn meerkat_core::handles::SessionClaimHandle> {
1066 Arc::clone(&self.session_claims) as Arc<dyn meerkat_core::handles::SessionClaimHandle>
1067 }
1068
1069 pub fn set_composition_signal_dispatcher(
1072 &self,
1073 dispatcher: composition::MeerkatCompositionSignalDispatcher,
1074 ) {
1075 let mut slot = self
1076 .composition_signal_dispatcher
1077 .write()
1078 .unwrap_or_else(std::sync::PoisonError::into_inner);
1079 *slot = Some(dispatcher);
1080 }
1081
1082 pub async fn apply_routed_meerkat_input(
1093 &self,
1094 session_id: &SessionId,
1095 input: dsl::MeerkatMachineInput,
1096 ) -> Result<(), String> {
1097 Self::reject_raw_fieldless_runtime_internal_dsl_input(&input)?;
1098 let sessions = self.sessions.read().await;
1099 let entry = sessions.get(session_id).ok_or_else(|| {
1100 format!(
1101 "session `{session_id}` is not registered with this MeerkatMachine; \
1102 cannot deliver routed input"
1103 )
1104 })?;
1105 let authority = Arc::clone(&entry.dsl_authority);
1106 drop(sessions);
1107
1108 let (previous_state, effects) = {
1109 let mut guard = authority
1110 .lock()
1111 .unwrap_or_else(std::sync::PoisonError::into_inner);
1112 let previous_state = Box::new(guard.state.clone());
1113 let effects = dsl::MeerkatMachineMutator::apply(&mut *guard, input)
1114 .map(|transition| transition.effects)
1115 .map_err(|err| format!("{err}"))?;
1116 (previous_state, effects)
1117 };
1118 if let Err(error) = self.dispatch_routed_signals_from_effects(&effects).await {
1119 self.restore_session_dsl_state(session_id, previous_state)
1120 .await;
1121 return Err(format!(
1122 "routed MeerkatMachine input committed effect dispatch failed: {error}"
1123 ));
1124 }
1125 Ok(())
1126 }
1127
1128 #[cfg(test)]
1129 pub(crate) async fn debug_shared_ingress_authorities(
1130 &self,
1131 session_id: &SessionId,
1132 ) -> Option<(
1133 Arc<std::sync::Mutex<dsl::MeerkatMachineAuthority>>,
1134 crate::driver::ephemeral::SharedIngressDslAuthority,
1135 )> {
1136 let sessions = self.sessions.read().await;
1137 let entry = sessions.get(session_id)?;
1138 let session_authority = Arc::clone(&entry.dsl_authority);
1139 let driver = entry.driver.lock().await;
1140 Some((session_authority, driver.shared_dsl_authority()))
1141 }
1142
1143 fn make_driver(
1145 &self,
1146 runtime_id: LogicalRuntimeId,
1147 dsl_authority: crate::driver::ephemeral::SharedIngressDslAuthority,
1148 initial_runtime_state: RuntimeState,
1149 ) -> DriverEntry {
1150 let control_projection = Arc::new(StdRwLock::new(
1151 crate::driver::ephemeral::RuntimeControlProjection {
1152 phase: initial_runtime_state,
1153 current_run_id: None,
1154 pre_run_phase: None,
1155 },
1156 ));
1157 match (&self.store, &self.blob_store) {
1158 (Some(store), Some(blob_store)) => {
1159 DriverEntry::Persistent(PersistentRuntimeDriver::new_with_control(
1160 runtime_id,
1161 store.clone(),
1162 blob_store.clone(),
1163 control_projection,
1164 dsl_authority,
1165 ))
1166 }
1167 _ => DriverEntry::Ephemeral(EphemeralRuntimeDriver::new_with_control_and_dsl(
1168 runtime_id,
1169 control_projection,
1170 dsl_authority,
1171 )),
1172 }
1173 }
1174
1175 async fn recover_or_create_ops_state(
1182 &self,
1183 session_id: &SessionId,
1184 runtime_id: &LogicalRuntimeId,
1185 ) -> (
1186 Arc<crate::ops_lifecycle::RuntimeOpsLifecycleRegistry>,
1187 meerkat_core::RuntimeEpochId,
1188 Arc<meerkat_core::EpochCursorState>,
1189 ) {
1190 if let Some(ref store) = self.store {
1191 match store.load_ops_lifecycle(runtime_id).await {
1192 Ok(Some(snapshot)) => {
1193 let recovered_epoch = snapshot.epoch_id.clone();
1194 let recovered_cursors = meerkat_core::EpochCursorState::from_recovered(
1195 snapshot.cursors.agent_applied_cursor,
1196 snapshot.cursors.runtime_observed_seq,
1197 snapshot.cursors.runtime_last_injected_seq,
1198 );
1199 let recovered_ops_count = snapshot.completion_entries.len();
1200 let registry =
1201 crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::from_recovered(snapshot);
1202 tracing::info!(
1203 %session_id,
1204 %runtime_id,
1205 epoch_id = %recovered_epoch,
1206 recovered_ops = recovered_ops_count,
1207 "ops lifecycle recovered from durable store (same epoch)"
1208 );
1209 return (
1210 Arc::new(registry),
1211 recovered_epoch,
1212 Arc::new(recovered_cursors),
1213 );
1214 }
1215 Ok(None) => {}
1216 Err(err) => {
1217 tracing::warn!(
1218 %session_id,
1219 %runtime_id,
1220 error = %err,
1221 "failed to load ops lifecycle; epoch rotated"
1222 );
1223 return (
1224 Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
1225 meerkat_core::RuntimeEpochId::new(),
1226 Arc::new(meerkat_core::EpochCursorState::new()),
1227 );
1228 }
1229 }
1230 tracing::debug!(%session_id, "no persisted ops lifecycle; fresh epoch");
1231 (
1232 Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
1233 meerkat_core::RuntimeEpochId::new(),
1234 Arc::new(meerkat_core::EpochCursorState::new()),
1235 )
1236 } else {
1237 (
1238 Arc::new(crate::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
1239 meerkat_core::RuntimeEpochId::new(),
1240 Arc::new(meerkat_core::EpochCursorState::new()),
1241 )
1242 }
1243 }
1244
1245 #[allow(clippy::large_futures)]
1246 fn execute_meerkat_machine_command(
1247 &self,
1248 self_handle: Option<Arc<Self>>,
1249 command: MeerkatMachineCommand,
1250 ) -> MeerkatMachineCommandFuture<'_> {
1251 Box::pin(async move {
1252 match command {
1253 MeerkatMachineCommand::EnsureSessionWithExecutor { .. } => {
1254 let self_handle = self_handle.ok_or_else(|| {
1255 MeerkatMachineCommandError::Driver(RuntimeDriverError::Internal(
1256 "EnsureSessionWithExecutor requires Arc<Self> machine handle".into(),
1257 ))
1258 })?;
1259 self_handle
1260 .execute_meerkat_machine_ensure_session_command(command)
1261 .await
1262 .map_err(Into::into)
1263 }
1264 MeerkatMachineCommand::RegisterSession { .. }
1265 | MeerkatMachineCommand::UnregisterSession { .. }
1266 | MeerkatMachineCommand::SetSilentIntents { .. }
1267 | MeerkatMachineCommand::CancelAfterBoundary { .. }
1268 | MeerkatMachineCommand::StopRuntimeExecutor { .. }
1269 | MeerkatMachineCommand::CommitServiceTurnTerminalReceipt { .. }
1270 | MeerkatMachineCommand::ContainsSession { .. }
1271 | MeerkatMachineCommand::SessionHasExecutor { .. }
1272 | MeerkatMachineCommand::SessionHasComms { .. }
1273 | MeerkatMachineCommand::OpsLifecycleRegistry { .. }
1274 | MeerkatMachineCommand::PrepareBindings { .. }
1275 | MeerkatMachineCommand::PrepareLocalSessionBindings { .. }
1276 | MeerkatMachineCommand::InputState { .. }
1277 | MeerkatMachineCommand::ListActiveInputs { .. }
1278 | MeerkatMachineCommand::ReconfigureSessionLlmIdentity { .. }
1279 | MeerkatMachineCommand::StagePersistentFilter { .. }
1280 | MeerkatMachineCommand::RequestDeferredTools { .. }
1281 | MeerkatMachineCommand::PublishCommittedVisibleSet { .. } => self
1282 .execute_meerkat_machine_session_command(command)
1283 .await
1284 .map_err(Into::into),
1285 MeerkatMachineCommand::SetPeerIngressContext { .. }
1286 | MeerkatMachineCommand::NotifyDrainExited { .. } => {
1287 let self_handle = self_handle.ok_or_else(|| {
1288 MeerkatMachineCommandError::Driver(RuntimeDriverError::Internal(
1289 "drain command requires Arc<Self> machine handle".into(),
1290 ))
1291 })?;
1292 self_handle
1293 .execute_meerkat_machine_drain_command(command)
1294 .await
1295 .map_err(Into::into)
1296 }
1297 MeerkatMachineCommand::AbortAll
1298 | MeerkatMachineCommand::Abort { .. }
1299 | MeerkatMachineCommand::Wait { .. } => self
1300 .execute_meerkat_machine_drain_local_command(command)
1301 .await
1302 .map_err(Into::into),
1303 MeerkatMachineCommand::Ingest { .. }
1304 | MeerkatMachineCommand::PublishEvent { .. }
1305 | MeerkatMachineCommand::Retire { .. }
1306 | MeerkatMachineCommand::Recycle { .. }
1307 | MeerkatMachineCommand::Reset { .. }
1308 | MeerkatMachineCommand::Recover { .. }
1309 | MeerkatMachineCommand::Destroy { .. }
1310 | MeerkatMachineCommand::RuntimeState { .. }
1311 | MeerkatMachineCommand::RuntimeRealtimeAttachmentStatus { .. }
1312 | MeerkatMachineCommand::ResolvedSessionLlmCapabilities { .. }
1313 | MeerkatMachineCommand::RuntimeRealtimeChannelStatus { .. }
1314 | MeerkatMachineCommand::ConfigureModelRoutingBaseline { .. }
1315 | MeerkatMachineCommand::SessionModelRoutingStatus { .. }
1316 | MeerkatMachineCommand::RequestSwitchTurn { .. }
1317 | MeerkatMachineCommand::AdmitModelRoutingAssistantTurn { .. }
1318 | MeerkatMachineCommand::BeginImageOperation { .. }
1319 | MeerkatMachineCommand::ActivateImageOperationOverride { .. }
1320 | MeerkatMachineCommand::CompleteImageOperation { .. }
1321 | MeerkatMachineCommand::RestoreImageOperationOverride { .. }
1322 | MeerkatMachineCommand::LoadBoundaryReceipt { .. } => self
1323 .execute_meerkat_machine_control_command(command)
1324 .await
1325 .map_err(Into::into),
1326 MeerkatMachineCommand::AcceptWithCompletion { .. }
1327 | MeerkatMachineCommand::AcceptWithoutWake { .. } => self
1328 .execute_meerkat_machine_ingress_command(command)
1329 .await
1330 .map_err(Into::into),
1331 MeerkatMachineCommand::Prepare { .. }
1332 | MeerkatMachineCommand::Commit { .. }
1333 | MeerkatMachineCommand::Fail { .. } => self
1334 .execute_meerkat_machine_legacy_run_command(command)
1335 .await
1336 .map_err(Into::into),
1337 }
1338 })
1339 }
1340
1341 pub async fn register_session(&self, session_id: SessionId) {
1344 let _ = self
1345 .execute_meerkat_machine_command(
1346 None,
1347 MeerkatMachineCommand::RegisterSession { session_id },
1348 )
1349 .await;
1350 }
1351}
1352
1353#[cfg(test)]
1354#[allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
1355#[path = "../meerkat_machine_tests.rs"]
1356mod tests;