use super::*;
use meerkat_core::AgentExecutionSnapshot;
use meerkat_core::CommsCapabilityError;
use meerkat_core::ExternalToolSurfaceSnapshot;
use meerkat_core::PeerIngressRuntimeSnapshot;
use meerkat_core::PendingSystemContextAppend;
use meerkat_core::Session;
use meerkat_core::ToolScopeSnapshot;
use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
use meerkat_core::service::{AppendSystemContextRequest, StartTurnRequest};
use meerkat_core::service::{
SessionControlError, SessionError, SessionServiceCommsExt, SessionServiceControlExt,
SessionServiceHistoryExt,
};
use meerkat_core::{InputId, RunId};
use sha2::{Digest, Sha256};
#[cfg(feature = "runtime-adapter")]
use std::collections::HashMap;
#[cfg(feature = "runtime-adapter")]
use std::sync::{Mutex, OnceLock, Weak};
fn build_runtime_receipt(
run_id: RunId,
boundary: RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
session: &Session,
) -> Result<RunBoundaryReceipt, SessionError> {
let encoded_messages = serde_json::to_vec(session.messages()).map_err(|err| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(format!(
"failed to serialize session for runtime receipt digest: {err}"
)))
})?;
Ok(RunBoundaryReceipt {
run_id,
boundary,
contributing_input_ids,
conversation_digest: Some(format!("{:x}", Sha256::digest(encoded_messages))),
message_count: session.messages().len(),
sequence: 0,
})
}
fn session_control_error_to_session_error(err: SessionControlError) -> SessionError {
match err {
SessionControlError::Session(err) => err,
other => SessionError::Agent(meerkat_core::error::AgentError::InternalError(
other.to_string(),
)),
}
}
#[cfg(feature = "runtime-adapter")]
fn ephemeral_runtime_adapter_cache()
-> &'static Mutex<HashMap<usize, Weak<meerkat_runtime::MeerkatMachine>>> {
static CACHE: OnceLock<Mutex<HashMap<usize, Weak<meerkat_runtime::MeerkatMachine>>>> =
OnceLock::new();
CACHE.get_or_init(|| Mutex::new(HashMap::new()))
}
#[cfg(all(not(target_arch = "wasm32"), feature = "runtime-adapter"))]
fn persistent_runtime_adapter_cache()
-> &'static Mutex<HashMap<usize, Weak<meerkat_runtime::MeerkatMachine>>> {
static CACHE: OnceLock<Mutex<HashMap<usize, Weak<meerkat_runtime::MeerkatMachine>>>> =
OnceLock::new();
CACHE.get_or_init(|| Mutex::new(HashMap::new()))
}
#[cfg(feature = "runtime-adapter")]
fn cached_runtime_adapter(
cache: &'static Mutex<HashMap<usize, Weak<meerkat_runtime::MeerkatMachine>>>,
key: usize,
init: impl FnOnce() -> Arc<meerkat_runtime::MeerkatMachine>,
) -> Arc<meerkat_runtime::MeerkatMachine> {
let mut cache = cache
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
cache.retain(|_, adapter| adapter.strong_count() > 0);
if let Some(existing) = cache.get(&key).and_then(Weak::upgrade) {
return existing;
}
let adapter = init();
cache.insert(key, Arc::downgrade(&adapter));
adapter
}
#[cfg(feature = "runtime-adapter")]
async fn retire_runtime_session_for_archive(
runtime_adapter: &meerkat_runtime::MeerkatMachine,
session_id: &SessionId,
) -> Result<(), SessionError> {
let runtime_id = meerkat_runtime::LogicalRuntimeId::for_session(session_id);
match meerkat_runtime::RuntimeControlPlane::retire(runtime_adapter, &runtime_id).await {
Ok(_) => Ok(()),
Err(meerkat_runtime::RuntimeControlPlaneError::NotFound(_)) => {
runtime_adapter.register_session(session_id.clone()).await;
meerkat_runtime::RuntimeControlPlane::retire(runtime_adapter, &runtime_id)
.await
.map(|_| ())
.map_err(|error| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(format!(
"machine archive retire failed after registration: {error}"
)))
})
}
Err(error) => Err(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(format!(
"machine archive retire failed: {error}"
)),
)),
}
}
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
pub trait MobSessionService:
SessionServiceCommsExt + SessionServiceControlExt + SessionServiceHistoryExt
{
async fn subscribe_session_events(
&self,
session_id: &SessionId,
) -> Result<EventStream, StreamError> {
<Self as SessionService>::subscribe_session_events(self, session_id).await
}
fn supports_persistent_sessions(&self) -> bool {
false
}
#[cfg(feature = "runtime-adapter")]
fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
None
}
#[cfg(feature = "runtime-adapter")]
async fn interrupt_with_machine_authority(
&self,
session_id: &SessionId,
_authority: meerkat_runtime::MachineSessionControlAuthority,
) -> Result<(), SessionError> {
Err(SessionError::Unsupported(format!(
"interrupt for runtime-backed mob session {session_id} must be implemented by the machine-owned session service"
)))
}
#[cfg(feature = "runtime-adapter")]
async fn cancel_after_boundary_with_machine_authority(
&self,
session_id: &SessionId,
_authority: meerkat_runtime::MachineSessionControlAuthority,
) -> Result<(), SessionError> {
Err(SessionError::Unsupported(format!(
"cancel_after_boundary for runtime-backed mob session {session_id} must be implemented by the machine-owned session service"
)))
}
async fn execution_snapshot(
&self,
_session_id: &SessionId,
) -> Result<Option<AgentExecutionSnapshot>, SessionError> {
Ok(None)
}
async fn tool_scope_snapshot(
&self,
_session_id: &SessionId,
) -> Result<Option<ToolScopeSnapshot>, SessionError> {
Ok(None)
}
async fn external_tool_surface_snapshot(
&self,
_session_id: &SessionId,
) -> Result<Option<ExternalToolSurfaceSnapshot>, SessionError> {
Ok(None)
}
async fn peer_ingress_runtime_snapshot(
&self,
_session_id: &SessionId,
) -> Result<Option<PeerIngressRuntimeSnapshot>, SessionError> {
Ok(None)
}
async fn session_belongs_to_mob(
&self,
_session_id: &SessionId,
_mob_id: &crate::ids::MobId,
) -> bool {
false
}
async fn load_persisted_session(
&self,
_session_id: &SessionId,
) -> Result<Option<Session>, SessionError> {
Ok(None)
}
async fn archive_with_mob_lifecycle_authority(
&self,
session_id: &SessionId,
) -> Result<(), SessionError> {
#[cfg(feature = "runtime-adapter")]
if self.runtime_adapter().is_some() {
return Err(SessionError::Unsupported(format!(
"archive for runtime-backed mob session {session_id} must be implemented by the machine-owned session service"
)));
}
<Self as SessionService>::archive(self, session_id).await
}
async fn apply_runtime_turn(
&self,
_session_id: &SessionId,
_run_id: RunId,
_req: StartTurnRequest,
_boundary: RunApplyBoundary,
_contributing_input_ids: Vec<InputId>,
) -> Result<CoreApplyOutput, SessionError> {
Err(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(
"runtime-backed apply is unavailable for this session service".into(),
),
))
}
async fn apply_runtime_context_appends(
&self,
session_id: &SessionId,
run_id: RunId,
appends: Vec<PendingSystemContextAppend>,
contributing_input_ids: Vec<InputId>,
) -> Result<CoreApplyOutput, SessionError> {
self.apply_runtime_context_appends_with_boundary(
session_id,
run_id,
appends,
RunApplyBoundary::Immediate,
contributing_input_ids,
)
.await
}
async fn apply_runtime_context_appends_with_boundary(
&self,
session_id: &SessionId,
run_id: RunId,
appends: Vec<PendingSystemContextAppend>,
boundary: RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
) -> Result<CoreApplyOutput, SessionError> {
for append in appends {
self.append_system_context(
session_id,
AppendSystemContextRequest {
text: append.text,
source: append.source,
idempotency_key: append.idempotency_key,
},
)
.await
.map_err(session_control_error_to_session_error)?;
}
Ok(CoreApplyOutput::without_terminal(
RunBoundaryReceipt {
run_id,
boundary,
contributing_input_ids,
conversation_digest: None,
message_count: 0,
sequence: 0,
},
None,
))
}
async fn apply_runtime_system_context_for_turn(
&self,
_session_id: &SessionId,
_appends: Vec<PendingSystemContextAppend>,
) -> Result<(), SessionError> {
Err(SessionError::Agent(
meerkat_core::error::AgentError::InternalError(
"runtime-backed context staging is unavailable for this session service".into(),
),
))
}
async fn checkpoint_committed_runtime_session_snapshot(
&self,
_session_id: &SessionId,
_session_snapshot: &[u8],
) -> Result<(), SessionError> {
Ok(())
}
async fn discard_live_session(&self, _session_id: &SessionId) -> Result<(), SessionError> {
Ok(())
}
async fn cancel_all_checkpointers(&self) {}
async fn rearm_all_checkpointers(&self) {}
}
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl<B> MobSessionService for meerkat_session::EphemeralSessionService<B>
where
B: meerkat_session::SessionAgentBuilder + 'static,
{
fn supports_persistent_sessions(&self) -> bool {
false
}
#[cfg(feature = "runtime-adapter")]
fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
let key = std::ptr::from_ref(self) as usize;
Some(cached_runtime_adapter(
ephemeral_runtime_adapter_cache(),
key,
|| Arc::new(meerkat_runtime::MeerkatMachine::ephemeral()),
))
}
#[cfg(feature = "runtime-adapter")]
async fn interrupt_with_machine_authority(
&self,
session_id: &SessionId,
_authority: meerkat_runtime::MachineSessionControlAuthority,
) -> Result<(), SessionError> {
meerkat_core::service::SessionService::interrupt(self, session_id).await
}
#[cfg(feature = "runtime-adapter")]
async fn cancel_after_boundary_with_machine_authority(
&self,
session_id: &SessionId,
_authority: meerkat_runtime::MachineSessionControlAuthority,
) -> Result<(), SessionError> {
meerkat_core::service::SessionService::cancel_after_boundary(self, session_id).await
}
async fn archive_with_mob_lifecycle_authority(
&self,
session_id: &SessionId,
) -> Result<(), SessionError> {
<Self as SessionService>::read(self, session_id).await?;
#[cfg(feature = "runtime-adapter")]
if let Some(runtime_adapter) = self.runtime_adapter() {
retire_runtime_session_for_archive(runtime_adapter.as_ref(), session_id).await?;
}
<Self as SessionService>::archive(self, session_id).await
}
async fn execution_snapshot(
&self,
session_id: &SessionId,
) -> Result<Option<AgentExecutionSnapshot>, SessionError> {
meerkat_session::EphemeralSessionService::<B>::execution_snapshot(self, session_id).await
}
async fn tool_scope_snapshot(
&self,
session_id: &SessionId,
) -> Result<Option<ToolScopeSnapshot>, SessionError> {
meerkat_session::EphemeralSessionService::<B>::tool_scope_snapshot(self, session_id).await
}
async fn external_tool_surface_snapshot(
&self,
session_id: &SessionId,
) -> Result<Option<ExternalToolSurfaceSnapshot>, SessionError> {
meerkat_session::EphemeralSessionService::<B>::external_tool_surface_snapshot(
self, session_id,
)
.await
}
async fn peer_ingress_runtime_snapshot(
&self,
session_id: &SessionId,
) -> Result<Option<PeerIngressRuntimeSnapshot>, SessionError> {
let Some(runtime) = self.comms_runtime(session_id).await else {
return Ok(None);
};
match runtime.peer_ingress_runtime_snapshot().await {
Ok(snapshot) => Ok(Some(snapshot)),
Err(CommsCapabilityError::Unsupported(_)) => Ok(None),
}
}
async fn subscribe_session_events(
&self,
session_id: &SessionId,
) -> Result<EventStream, StreamError> {
meerkat_session::EphemeralSessionService::<B>::subscribe_session_events(self, session_id)
.await
}
async fn discard_live_session(&self, session_id: &SessionId) -> Result<(), SessionError> {
meerkat_session::EphemeralSessionService::<B>::discard_live_session(self, session_id).await
}
async fn apply_runtime_turn(
&self,
session_id: &SessionId,
run_id: RunId,
req: StartTurnRequest,
boundary: RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
) -> Result<CoreApplyOutput, SessionError> {
let run_result =
meerkat_session::EphemeralSessionService::<B>::start_turn(self, session_id, req)
.await?;
let session =
meerkat_session::EphemeralSessionService::<B>::export_session(self, session_id).await?;
let receipt = build_runtime_receipt(run_id, boundary, contributing_input_ids, &session)?;
let session_snapshot = serde_json::to_vec(&session).map_err(|err| {
SessionError::Agent(meerkat_core::error::AgentError::InternalError(format!(
"failed to serialize session snapshot for runtime commit: {err}"
)))
})?;
Ok(CoreApplyOutput::with_run_result(
receipt,
Some(session_snapshot),
run_result,
))
}
async fn apply_runtime_context_appends(
&self,
session_id: &SessionId,
run_id: RunId,
appends: Vec<PendingSystemContextAppend>,
contributing_input_ids: Vec<InputId>,
) -> Result<CoreApplyOutput, SessionError> {
meerkat_session::EphemeralSessionService::<B>::apply_runtime_context_appends(
self,
session_id,
run_id,
appends,
contributing_input_ids,
)
.await
}
async fn apply_runtime_context_appends_with_boundary(
&self,
session_id: &SessionId,
run_id: RunId,
appends: Vec<PendingSystemContextAppend>,
boundary: RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
) -> Result<CoreApplyOutput, SessionError> {
meerkat_session::EphemeralSessionService::<B>::apply_runtime_context_appends_with_boundary(
self,
session_id,
run_id,
appends,
boundary,
contributing_input_ids,
)
.await
}
async fn apply_runtime_system_context_for_turn(
&self,
session_id: &SessionId,
appends: Vec<PendingSystemContextAppend>,
) -> Result<(), SessionError> {
meerkat_session::EphemeralSessionService::<B>::apply_runtime_system_context(
self, session_id, appends,
)
.await
}
}
#[cfg(not(target_arch = "wasm32"))]
#[async_trait::async_trait]
impl<B> MobSessionService for meerkat_session::PersistentSessionService<B>
where
B: meerkat_session::SessionAgentBuilder + 'static,
{
fn supports_persistent_sessions(&self) -> bool {
true
}
#[cfg(feature = "runtime-adapter")]
fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
#[cfg(target_arch = "wasm32")]
{
None
}
#[cfg(not(target_arch = "wasm32"))]
{
let key = std::ptr::from_ref(self) as usize;
self.runtime_store().map(|store| {
cached_runtime_adapter(persistent_runtime_adapter_cache(), key, || {
Arc::new(meerkat_runtime::MeerkatMachine::persistent(
store,
self.blob_store(),
))
})
})
}
}
async fn load_persisted_session(
&self,
session_id: &SessionId,
) -> Result<Option<Session>, SessionError> {
let Some(session) = self.load_authoritative_session(session_id).await? else {
return Ok(None);
};
if self
.session_archived_by_authority(session_id, &session)
.await?
{
return Ok(None);
}
Ok(Some(session))
}
async fn archive_with_mob_lifecycle_authority(
&self,
session_id: &SessionId,
) -> Result<(), SessionError> {
#[cfg(feature = "runtime-adapter")]
if let Some(runtime_adapter) = self.runtime_adapter() {
return meerkat_session::PersistentSessionService::<B>::archive_with_machine_protocol(
self,
session_id,
meerkat_session::MachineSessionArchiveProtocol::from_machine(
runtime_adapter.as_ref(),
),
)
.await;
}
<Self as SessionService>::archive(self, session_id).await
}
#[cfg(feature = "runtime-adapter")]
async fn interrupt_with_machine_authority(
&self,
session_id: &SessionId,
authority: meerkat_runtime::MachineSessionControlAuthority,
) -> Result<(), SessionError> {
meerkat_session::PersistentSessionService::<B>::interrupt_with_machine_authority(
self, session_id, authority,
)
.await
}
#[cfg(feature = "runtime-adapter")]
async fn cancel_after_boundary_with_machine_authority(
&self,
session_id: &SessionId,
authority: meerkat_runtime::MachineSessionControlAuthority,
) -> Result<(), SessionError> {
meerkat_session::PersistentSessionService::<B>::cancel_after_boundary_with_machine_authority(
self, session_id, authority,
)
.await
}
async fn execution_snapshot(
&self,
session_id: &SessionId,
) -> Result<Option<AgentExecutionSnapshot>, SessionError> {
meerkat_session::PersistentSessionService::<B>::execution_snapshot(self, session_id).await
}
async fn tool_scope_snapshot(
&self,
session_id: &SessionId,
) -> Result<Option<ToolScopeSnapshot>, SessionError> {
meerkat_session::PersistentSessionService::<B>::tool_scope_snapshot(self, session_id).await
}
async fn external_tool_surface_snapshot(
&self,
session_id: &SessionId,
) -> Result<Option<ExternalToolSurfaceSnapshot>, SessionError> {
meerkat_session::PersistentSessionService::<B>::external_tool_surface_snapshot(
self, session_id,
)
.await
}
async fn peer_ingress_runtime_snapshot(
&self,
session_id: &SessionId,
) -> Result<Option<PeerIngressRuntimeSnapshot>, SessionError> {
let Some(runtime) = self.comms_runtime(session_id).await else {
return Ok(None);
};
match runtime.peer_ingress_runtime_snapshot().await {
Ok(snapshot) => Ok(Some(snapshot)),
Err(CommsCapabilityError::Unsupported(_)) => Ok(None),
}
}
async fn subscribe_session_events(
&self,
session_id: &SessionId,
) -> Result<EventStream, StreamError> {
meerkat_session::PersistentSessionService::<B>::subscribe_session_events(self, session_id)
.await
}
async fn discard_live_session(&self, session_id: &SessionId) -> Result<(), SessionError> {
meerkat_session::PersistentSessionService::<B>::discard_live_session(self, session_id).await
}
async fn apply_runtime_turn(
&self,
session_id: &SessionId,
run_id: RunId,
req: StartTurnRequest,
boundary: RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
) -> Result<CoreApplyOutput, SessionError> {
meerkat_session::PersistentSessionService::<B>::apply_runtime_turn(
self,
session_id,
run_id,
req,
boundary,
contributing_input_ids,
)
.await
}
async fn apply_runtime_context_appends(
&self,
session_id: &SessionId,
run_id: RunId,
appends: Vec<PendingSystemContextAppend>,
contributing_input_ids: Vec<InputId>,
) -> Result<CoreApplyOutput, SessionError> {
meerkat_session::PersistentSessionService::<B>::apply_runtime_context_appends(
self,
session_id,
run_id,
appends,
contributing_input_ids,
)
.await
}
async fn apply_runtime_context_appends_with_boundary(
&self,
session_id: &SessionId,
run_id: RunId,
appends: Vec<PendingSystemContextAppend>,
boundary: RunApplyBoundary,
contributing_input_ids: Vec<InputId>,
) -> Result<CoreApplyOutput, SessionError> {
meerkat_session::PersistentSessionService::<B>::apply_runtime_context_appends_with_boundary(
self,
session_id,
run_id,
appends,
boundary,
contributing_input_ids,
)
.await
}
async fn apply_runtime_system_context_for_turn(
&self,
session_id: &SessionId,
appends: Vec<PendingSystemContextAppend>,
) -> Result<(), SessionError> {
meerkat_session::PersistentSessionService::<B>::apply_runtime_system_context_for_turn(
self, session_id, appends,
)
.await
}
async fn checkpoint_committed_runtime_session_snapshot(
&self,
session_id: &SessionId,
session_snapshot: &[u8],
) -> Result<(), SessionError> {
meerkat_session::PersistentSessionService::<B>::checkpoint_committed_runtime_session_snapshot(
self,
session_id,
session_snapshot,
)
.await
}
async fn cancel_all_checkpointers(&self) {
meerkat_session::PersistentSessionService::<B>::cancel_all_checkpointers(self).await;
}
async fn rearm_all_checkpointers(&self) {
meerkat_session::PersistentSessionService::<B>::rearm_all_checkpointers(self).await;
}
}