use std::sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
};
use crate::meerkat_machine::dsl as mm_dsl;
use crate::meerkat_machine_types::MeerkatMachineFieldlessRuntimeInternalInput;
use meerkat_core::handles::DslTransitionError;
fn map_kernel_error(
err: mm_dsl::MeerkatMachineTransitionError,
context: &'static str,
) -> DslTransitionError {
let reason = err.to_string();
match err {
mm_dsl::MeerkatMachineTransitionError::GuardRejected { .. } => {
DslTransitionError::guard_rejected(context, reason)
}
mm_dsl::MeerkatMachineTransitionError::NoMatchingTransition { .. } => {
DslTransitionError::no_matching(context, reason)
}
mm_dsl::MeerkatMachineTransitionError::RecoveredStateInvariantRejected { .. } => {
DslTransitionError::recovered_state_invariant_rejected(context, reason)
}
}
}
mod auth_lease;
mod comms_drain;
mod external_tool_surface;
mod interaction_stream;
mod mcp_server_lifecycle;
mod model_routing;
#[cfg(not(target_arch = "wasm32"))]
mod oauth_flow;
mod peer_comms;
mod peer_interaction;
mod session_admission;
mod session_claim;
mod session_context;
mod turn_state;
pub use auth_lease::RuntimeAuthLeaseHandle;
pub use comms_drain::RuntimeCommsDrainHandle;
pub use external_tool_surface::RuntimeExternalToolSurfaceHandle;
pub use interaction_stream::RuntimeInteractionStreamHandle;
pub use mcp_server_lifecycle::RuntimeMcpServerLifecycleHandle;
pub use model_routing::RuntimeModelRoutingHandle;
#[cfg(not(target_arch = "wasm32"))]
pub use oauth_flow::RuntimeOAuthFlowHandle;
pub use peer_comms::RuntimePeerCommsHandle;
pub use peer_interaction::RuntimePeerInteractionHandle;
pub use session_admission::RuntimeSessionAdmissionHandle;
pub use session_claim::RuntimeSessionClaimRegistry;
pub use session_context::RuntimeSessionContextHandle;
pub use turn_state::RuntimeTurnStateHandle;
pub struct HandleDslAuthority {
inner: Arc<Mutex<mm_dsl::MeerkatMachineAuthority>>,
teardown_gate: Arc<HandleTeardownGate>,
}
pub(crate) struct HandleTeardownGate {
closed: AtomicBool,
}
impl HandleTeardownGate {
pub(crate) fn open() -> Arc<Self> {
Arc::new(Self {
closed: AtomicBool::new(false),
})
}
pub(crate) fn close(&self) {
self.closed.store(true, Ordering::Release);
}
fn ensure_open(&self, context: &'static str) -> Result<(), DslTransitionError> {
if self.closed.load(Ordering::Acquire) {
Err(DslTransitionError::no_matching(
context,
"session-owned runtime handle authority is closed by teardown",
))
} else {
Ok(())
}
}
}
impl HandleDslAuthority {
pub fn from_shared(inner: Arc<Mutex<mm_dsl::MeerkatMachineAuthority>>) -> Self {
Self {
inner,
teardown_gate: HandleTeardownGate::open(),
}
}
pub(crate) fn from_shared_with_teardown_gate(
inner: Arc<Mutex<mm_dsl::MeerkatMachineAuthority>>,
teardown_gate: Arc<HandleTeardownGate>,
) -> Self {
Self {
inner,
teardown_gate,
}
}
pub fn ephemeral() -> Self {
Self {
inner: Arc::new(Mutex::new(mm_dsl::MeerkatMachineAuthority::new())),
teardown_gate: HandleTeardownGate::open(),
}
}
pub fn apply_input(
&self,
input: mm_dsl::MeerkatMachineInput,
context: &'static str,
) -> Result<(), DslTransitionError> {
self.apply_input_with_effects(input, context).map(|_| ())
}
pub fn apply_input_with_effects(
&self,
input: mm_dsl::MeerkatMachineInput,
context: &'static str,
) -> Result<Vec<mm_dsl::MeerkatMachineEffect>, DslTransitionError> {
self.apply_input_with_transition(input, context)
.map(|transition| transition.into_effects())
}
pub fn apply_input_with_transition(
&self,
input: mm_dsl::MeerkatMachineInput,
context: &'static str,
) -> Result<mm_dsl::MeerkatMachineTransition, DslTransitionError> {
MeerkatMachineFieldlessRuntimeInternalInput::reject_raw_dsl_input(&input)
.map_err(|reason| DslTransitionError::no_matching(context, reason))?;
self.teardown_gate.ensure_open(context)?;
let mut guard = self
.inner
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
self.teardown_gate.ensure_open(context)?;
mm_dsl::MeerkatMachineMutator::apply(&mut *guard, input)
.map_err(|err| map_kernel_error(err, context))
}
pub fn apply_input_with_effects_and_sample<S>(
&self,
input: mm_dsl::MeerkatMachineInput,
context: &'static str,
sample: impl FnOnce(&[mm_dsl::MeerkatMachineEffect]) -> S,
) -> Result<S, DslTransitionError> {
MeerkatMachineFieldlessRuntimeInternalInput::reject_raw_dsl_input(&input)
.map_err(|reason| DslTransitionError::no_matching(context, reason))?;
self.teardown_gate.ensure_open(context)?;
let mut guard = self
.inner
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
self.teardown_gate.ensure_open(context)?;
let effects = mm_dsl::MeerkatMachineMutator::apply(&mut *guard, input)
.map(|transition| transition.into_effects())
.map_err(|err| map_kernel_error(err, context))?;
Ok(sample(&effects))
}
pub fn apply_signal(
&self,
signal: mm_dsl::MeerkatMachineSignal,
context: &'static str,
) -> Result<(), DslTransitionError> {
self.apply_signal_with_effects(signal, context).map(|_| ())
}
pub fn apply_signal_with_effects(
&self,
signal: mm_dsl::MeerkatMachineSignal,
context: &'static str,
) -> Result<Vec<mm_dsl::MeerkatMachineEffect>, DslTransitionError> {
self.teardown_gate.ensure_open(context)?;
let mut guard = self
.inner
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
self.teardown_gate.ensure_open(context)?;
guard
.apply_signal(signal)
.map(|transition| transition.into_effects())
.map_err(|err| map_kernel_error(err, context))
}
pub fn apply_signal_and_sample<S>(
&self,
signal: mm_dsl::MeerkatMachineSignal,
context: &'static str,
sample: impl FnOnce(&mm_dsl::MeerkatMachineState) -> S,
) -> Result<S, DslTransitionError> {
self.teardown_gate.ensure_open(context)?;
let mut guard = self
.inner
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
self.teardown_gate.ensure_open(context)?;
guard
.apply_signal(signal)
.map_err(|err| map_kernel_error(err, context))?;
Ok(sample(guard.state()))
}
pub fn snapshot_state(&self) -> mm_dsl::MeerkatMachineState {
let guard = self
.inner
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
guard.state().clone()
}
pub fn peer_projection_freshness_authority(
&self,
) -> crate::protocol_comms_trust_reconcile::PeerProjectionFreshnessAuthority {
crate::protocol_comms_trust_reconcile::PeerProjectionFreshnessAuthority::from_authority(
Arc::clone(&self.inner),
)
}
pub(crate) fn generated_authority_owner_token(&self) -> Arc<dyn std::any::Any + Send + Sync> {
let guard = self
.inner
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
guard.generated_authority_owner_token()
}
pub fn with_state_lock<R>(&self, body: impl FnOnce(&mm_dsl::MeerkatMachineState) -> R) -> R {
let guard = self
.inner
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
body(guard.state())
}
}
impl std::fmt::Debug for HandleDslAuthority {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HandleDslAuthority").finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn teardown_gate_rejects_stale_handle_input_before_mutation() {
let authority = Arc::new(Mutex::new(mm_dsl::MeerkatMachineAuthority::new()));
let gate = HandleTeardownGate::open();
let handle = HandleDslAuthority::from_shared_with_teardown_gate(
Arc::clone(&authority),
Arc::clone(&gate),
);
gate.close();
let err = handle
.apply_input(
mm_dsl::MeerkatMachineInput::RegisterSession {
session_id: mm_dsl::SessionId::from("closed-session"),
},
"test::stale_handle",
)
.expect_err("closed handle must reject writes");
assert_eq!(
err.kind,
meerkat_core::handles::DslRejectionKind::NoMatchingTransition
);
assert!(err.reason.contains("closed by teardown"));
let state = authority
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.state()
.clone();
assert_eq!(state.session_id, None);
}
}