use std::collections::{BTreeMap, BTreeSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_trait::async_trait;
use futures::StreamExt;
use meerkat::{AgentFactory, Config, FactoryAgentBuilder, SessionStore};
use meerkat_client::types::LlmStream;
use meerkat_client::{LlmClient, LlmRequest};
use meerkat_core::agent::CommsRuntime;
use meerkat_core::service::{
CreateSessionRequest, SessionError, SessionHistoryPage, SessionHistoryQuery,
SessionServiceHistoryExt,
};
use meerkat_core::{AgentSessionStore, AssistantBlock, Message, Provider};
use meerkat_mob::{
MobBuilder, MobDefinition, MobError, MobHandle, MobSessionService, MobStorage, Profile,
ProfileName, SpawnMemberSpec,
};
use meerkat_runtime::input_state::StoredInputState;
use meerkat_runtime::store::MachineLifecycleCommit;
use meerkat_store::StoreAdapter;
use serde_json::Value;
use crate::blob_store::{
Base64BlobStoreAdapter, BinaryBlobStore, BinaryBlobStoreAdapter, ObjectStoreBlobStore,
};
pub(crate) const DELEGATE_IDLE_RETIRE_SECS_LABEL: &str = "implicit_delegate_idle_retire_secs";
pub(crate) const DELEGATE_IDLE_RETIRE_DISABLED_LABEL: &str = "disabled";
pub(crate) fn is_previous_member_cleanup_ambiguous_error(error: &str) -> bool {
error.contains("previous member cleanup ambiguous for member ")
}
pub(crate) fn is_recoverable_lifecycle_cleanup_error(error: &str) -> bool {
is_previous_member_cleanup_ambiguous_error(error)
|| (error.contains("disposal completed but ArchiveSession failed")
&& error.contains("cancel-before-retire failed")
&& error.contains("Runtime not ready: running"))
}
#[cfg(test)]
use std::sync::{Mutex, OnceLock};
pub const MEMBER_STATE_ACTIVE: &str = "active";
pub const MEMBER_STATE_RETIRING: &str = "retiring";
#[derive(Clone, Default)]
pub struct MobBootstrapOptions {
pub allow_ephemeral_sessions: bool,
pub notify_orchestrator_on_resume: bool,
pub default_llm_client: Option<Arc<dyn LlmClient>>,
}
pub struct ReplaySanitizingLlmClient {
inner: Arc<dyn LlmClient>,
}
type SharedDefaultLlmClientSlot = Arc<std::sync::RwLock<Option<Arc<dyn LlmClient>>>>;
impl ReplaySanitizingLlmClient {
pub fn new(inner: Arc<dyn LlmClient>) -> Self {
Self { inner }
}
pub fn wrap(inner: Arc<dyn LlmClient>) -> Arc<dyn LlmClient> {
Arc::new(Self::new(inner))
}
}
pub struct ReplaySanitizingAgentLlmClient {
inner: Arc<dyn meerkat_core::AgentLlmClient>,
}
impl ReplaySanitizingAgentLlmClient {
pub fn new(inner: Arc<dyn meerkat_core::AgentLlmClient>) -> Self {
Self { inner }
}
pub fn wrap(
inner: Arc<dyn meerkat_core::AgentLlmClient>,
) -> Arc<dyn meerkat_core::AgentLlmClient> {
Arc::new(Self::new(inner))
}
}
#[async_trait]
impl meerkat_core::AgentLlmClient for ReplaySanitizingAgentLlmClient {
async fn stream_response(
&self,
messages: &[Message],
tools: &[Arc<meerkat_core::ToolDef>],
max_tokens: u32,
temperature: Option<f32>,
provider_params: Option<&meerkat_core::lifecycle::run_primitive::ProviderParamsOverride>,
) -> Result<meerkat_core::agent::LlmStreamResult, meerkat_core::AgentError> {
let sanitized: Vec<Message> = messages
.iter()
.cloned()
.map(sanitize_message_for_stateless_replay)
.collect();
self.inner
.stream_response(&sanitized, tools, max_tokens, temperature, provider_params)
.await
}
fn provider(&self) -> &'static str {
self.inner.provider()
}
fn model(&self) -> &str {
self.inner.model()
}
fn compile_schema(
&self,
output_schema: &meerkat_core::OutputSchema,
) -> Result<meerkat_core::schema::CompiledSchema, meerkat_core::schema::SchemaError> {
self.inner.compile_schema(output_schema)
}
}
#[async_trait]
impl LlmClient for ReplaySanitizingLlmClient {
fn project_replay_messages(
&self,
messages: &[Message],
) -> Result<Vec<Message>, meerkat_client::LlmError> {
let sanitized: Vec<Message> = messages
.iter()
.cloned()
.map(sanitize_message_for_stateless_replay)
.collect();
self.inner.project_replay_messages(&sanitized)
}
fn stream<'a>(&'a self, request: &'a LlmRequest) -> LlmStream<'a> {
let inner = Arc::clone(&self.inner);
let sanitized = sanitize_llm_request_for_stateless_replay(request);
Box::pin(async_stream::stream! {
let mut stream = inner.stream(&sanitized);
while let Some(event) = stream.next().await {
if runtime_turn_diagnostics_enabled()
&& let Err(error) = &event
{
tracing::error!(
error = %error,
error_debug = ?error,
"mobkit llm client stream error"
);
}
yield event;
}
})
}
fn provider(&self) -> &'static str {
self.inner.provider()
}
async fn health_check(&self) -> Result<(), meerkat_client::LlmError> {
self.inner.health_check().await
}
fn compile_schema(
&self,
output_schema: &meerkat_core::OutputSchema,
) -> Result<meerkat_core::schema::CompiledSchema, meerkat_core::schema::SchemaError> {
self.inner.compile_schema(output_schema)
}
}
pub(crate) type PreBuildHook = Arc<
dyn Fn(
&mut CreateSessionRequest,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
> + Send
+ Sync,
>;
pub type AfterCreateHook = Arc<
dyn Fn(
meerkat_core::types::SessionId,
SessionCreatedContext,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
+ Send
+ Sync,
>;
struct PreBuildMobSessionService {
inner: Arc<dyn MobSessionService>,
hook: PreBuildHook,
after_create_hook: Option<AfterCreateHook>,
runtime_adapter_override: Option<Arc<meerkat_runtime::MeerkatMachine>>,
}
fn no_op_pre_build_hook() -> PreBuildHook {
Arc::new(|_req: &mut CreateSessionRequest| Box::pin(async { Ok(()) }))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum DelegateIdleRetireOverride {
Disabled,
Seconds(u64),
}
#[derive(Clone, Default)]
pub(crate) struct ImplicitDelegateRetirementOverrides {
inner: Arc<tokio::sync::RwLock<BTreeMap<(String, String), DelegateIdleRetireOverride>>>,
}
impl ImplicitDelegateRetirementOverrides {
pub(crate) async fn set(
&self,
mob_id: impl Into<String>,
member_id: impl Into<String>,
override_policy: DelegateIdleRetireOverride,
) {
self.inner
.write()
.await
.insert((mob_id.into(), member_id.into()), override_policy);
}
pub(crate) async fn get(
&self,
mob_id: &str,
member_id: &str,
) -> Option<DelegateIdleRetireOverride> {
self.inner
.read()
.await
.get(&(mob_id.to_string(), member_id.to_string()))
.copied()
}
}
struct AutoWireParentMobToolsFactory {
inner: Arc<dyn meerkat_core::service::MobToolsFactory>,
implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides,
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl meerkat_core::service::MobToolsFactory for AutoWireParentMobToolsFactory {
async fn build_mob_tools(
&self,
args: meerkat_core::service::MobToolsBuildArgs,
) -> Result<Arc<dyn meerkat_core::AgentToolDispatcher>, Box<dyn std::error::Error + Send + Sync>>
{
let inner = self.inner.build_mob_tools(args).await?;
Ok(Arc::new(AutoWireParentMobToolDispatcher {
inner,
implicit_delegate_retirement_overrides: self
.implicit_delegate_retirement_overrides
.clone(),
}))
}
}
struct AutoWireParentMobToolDispatcher {
inner: Arc<dyn meerkat_core::AgentToolDispatcher>,
implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides,
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl meerkat_core::AgentToolDispatcher for AutoWireParentMobToolDispatcher {
fn tools(&self) -> Arc<[Arc<meerkat_core::types::ToolDef>]> {
self.inner
.tools()
.iter()
.map(|tool| {
if tool.name == "delegate" {
Arc::new(delegate_tool_def_with_idle_retire_secs(tool))
} else if tool.name == "mob_spawn_member" {
Arc::new(mob_spawn_tool_def_with_idle_retire_secs(tool))
} else {
Arc::clone(tool)
}
})
.collect::<Vec<_>>()
.into()
}
async fn dispatch(
&self,
call: meerkat_core::types::ToolCallView<'_>,
) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
if call.name == "delegate" {
return self.dispatch_delegate(call).await;
}
if call.name != "mob_spawn_member" {
return self.inner.dispatch(call).await;
}
self.dispatch_mob_spawn_member(call).await
}
fn capabilities(&self) -> meerkat_core::agent::DispatcherCapabilities {
self.inner.capabilities()
}
fn bind_ops_lifecycle(
self: Arc<Self>,
registry: Arc<dyn meerkat_core::ops_lifecycle::OpsLifecycleRegistry>,
owner_bridge_session_id: meerkat_core::types::SessionId,
) -> Result<meerkat_core::agent::BindOutcome, meerkat_core::agent::OpsLifecycleBindError> {
let owned = Arc::try_unwrap(self)
.map_err(|_| meerkat_core::agent::OpsLifecycleBindError::SharedOwnership)?;
let outcome = owned
.inner
.bind_ops_lifecycle(registry, owner_bridge_session_id)?;
let was_bound = outcome.was_bound();
let dispatcher = Arc::new(Self {
inner: outcome.into_dispatcher(),
implicit_delegate_retirement_overrides: owned.implicit_delegate_retirement_overrides,
});
Ok(if was_bound {
meerkat_core::agent::BindOutcome::Bound(dispatcher)
} else {
meerkat_core::agent::BindOutcome::Skipped(dispatcher)
})
}
}
impl AutoWireParentMobToolDispatcher {
async fn dispatch_mob_spawn_member(
&self,
call: meerkat_core::types::ToolCallView<'_>,
) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
let mut args = serde_json::from_str::<Value>(call.args.get()).map_err(|error| {
meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
})?;
let idle_retire_override = delegate_idle_retire_override_from_args(call.name, &mut args)?;
let idle_retire_targets = idle_retire_targets_from_spawn_args(&args);
if let Some(object) = args.as_object_mut() {
object
.entry("auto_wire_parent".to_string())
.or_insert(Value::Bool(true));
}
let args = serde_json::value::RawValue::from_string(args.to_string()).map_err(|error| {
meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
})?;
let call = meerkat_core::types::ToolCallView {
id: call.id,
name: call.name,
args: &args,
};
let outcome = self.inner.dispatch(call).await?;
self.register_idle_retire_override_from_outcome(
&outcome,
idle_retire_override,
&idle_retire_targets,
)
.await;
Ok(outcome)
}
async fn dispatch_delegate(
&self,
call: meerkat_core::types::ToolCallView<'_>,
) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
let mut args = serde_json::from_str::<Value>(call.args.get()).map_err(|error| {
meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
})?;
let idle_retire_override = delegate_idle_retire_override_from_args(call.name, &mut args)?;
let args = serde_json::value::RawValue::from_string(args.to_string()).map_err(|error| {
meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
})?;
let call = meerkat_core::types::ToolCallView {
id: call.id,
name: call.name,
args: &args,
};
let outcome = self.inner.dispatch(call).await?;
self.register_idle_retire_override_from_outcome(&outcome, idle_retire_override, &[])
.await;
Ok(outcome)
}
async fn register_idle_retire_override_from_outcome(
&self,
outcome: &meerkat_core::ToolDispatchOutcome,
idle_retire_override: Option<DelegateIdleRetireOverride>,
fallback_targets: &[IdleRetireTarget],
) {
if outcome.result.is_error {
return;
}
let Some(override_policy) = idle_retire_override else {
return;
};
for target in
idle_retire_targets_from_outcome_text(&outcome.result.text_content(), fallback_targets)
{
self.implicit_delegate_retirement_overrides
.set(&target.mob_id, &target.member_id, override_policy)
.await;
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
struct IdleRetireTarget {
mob_id: String,
member_id: String,
}
fn text_field<'a>(value: &'a Value, key: &str) -> Option<&'a str> {
value
.get(key)
.and_then(Value::as_str)
.filter(|text| !text.is_empty())
}
fn member_identity_field(value: &Value) -> Option<&str> {
text_field(value, "agent_identity")
.or_else(|| text_field(value, "member_id"))
.or_else(|| text_field(value, "identity"))
}
fn target_from_value(value: &Value, default_mob_id: Option<&str>) -> Option<IdleRetireTarget> {
let mob_id = text_field(value, "mob_id").or(default_mob_id)?;
let member_id = member_identity_field(value)?;
Some(IdleRetireTarget {
mob_id: mob_id.to_string(),
member_id: member_id.to_string(),
})
}
fn idle_retire_targets_from_spawn_args(args: &Value) -> Vec<IdleRetireTarget> {
let default_mob_id = text_field(args, "mob_id");
let mut targets = BTreeSet::new();
if let Some(target) = target_from_value(args, default_mob_id) {
targets.insert(target);
}
for key in ["specs", "members"] {
let Some(values) = args.get(key).and_then(Value::as_array) else {
continue;
};
for value in values {
if let Some(target) = target_from_value(value, default_mob_id) {
targets.insert(target);
}
}
}
targets.into_iter().collect()
}
fn target_from_result_value(
value: &Value,
fallback_targets: &[IdleRetireTarget],
) -> Option<IdleRetireTarget> {
if let Some(target) = target_from_value(value, None) {
return Some(target);
}
let member_id = member_identity_field(value)?;
let mut matches = fallback_targets
.iter()
.filter(|target| target.member_id == member_id);
let target = matches.next()?;
if matches.next().is_some() {
return None;
}
Some(target.clone())
}
fn collect_idle_retire_result_targets(
value: &Value,
fallback_targets: &[IdleRetireTarget],
targets: &mut BTreeSet<IdleRetireTarget>,
) {
if let Some(target) = target_from_result_value(value, fallback_targets) {
targets.insert(target);
}
for key in ["members", "specs", "spawned", "results"] {
let Some(values) = value.get(key).and_then(Value::as_array) else {
continue;
};
for value in values {
collect_idle_retire_result_targets(value, fallback_targets, targets);
}
}
}
fn idle_retire_targets_from_outcome_text(
text: &str,
fallback_targets: &[IdleRetireTarget],
) -> Vec<IdleRetireTarget> {
let Ok(payload) = serde_json::from_str::<Value>(text) else {
return fallback_targets.to_vec();
};
let mut targets = BTreeSet::new();
collect_idle_retire_result_targets(&payload, fallback_targets, &mut targets);
if targets.is_empty() {
targets.extend(fallback_targets.iter().cloned());
}
targets.into_iter().collect()
}
struct DefinitionSeededRealmProfileStore {
inner: Arc<dyn meerkat_mob::RealmProfileStore>,
profiles: BTreeMap<String, Profile>,
}
impl DefinitionSeededRealmProfileStore {
fn new(
definition: &MobDefinition,
inner: Arc<dyn meerkat_mob::RealmProfileStore>,
) -> Option<Self> {
let profiles = definition
.profiles
.iter()
.filter_map(|(name, binding)| {
binding
.as_inline()
.cloned()
.map(|profile| (name.to_string(), profile))
})
.collect::<BTreeMap<_, _>>();
(!profiles.is_empty()).then_some(Self { inner, profiles })
}
fn stored(&self, name: &str, profile: &Profile) -> meerkat_mob::StoredRealmProfile {
let now = chrono::Utc::now();
meerkat_mob::StoredRealmProfile {
name: name.to_string(),
profile: profile.clone(),
revision: 0,
created_at: now,
updated_at: now,
}
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl meerkat_mob::RealmProfileStore for DefinitionSeededRealmProfileStore {
async fn create(
&self,
name: &str,
profile: &Profile,
) -> Result<meerkat_mob::StoredRealmProfile, meerkat_mob::MobStoreError> {
if self.profiles.contains_key(name) {
return Err(meerkat_mob::MobStoreError::CasConflict(format!(
"realm profile already exists: {name}"
)));
}
self.inner.create(name, profile).await
}
async fn get(
&self,
name: &str,
) -> Result<Option<meerkat_mob::StoredRealmProfile>, meerkat_mob::MobStoreError> {
if let Some(profile) = self.profiles.get(name) {
return Ok(Some(self.stored(name, profile)));
}
self.inner.get(name).await
}
async fn list(
&self,
) -> Result<Vec<meerkat_mob::StoredRealmProfile>, meerkat_mob::MobStoreError> {
let mut merged = self.inner.list().await?;
merged.retain(|profile| !self.profiles.contains_key(profile.name.as_str()));
merged.extend(
self.profiles
.iter()
.map(|(name, profile)| self.stored(name, profile)),
);
merged.sort_by(|a, b| a.name.cmp(&b.name));
Ok(merged)
}
async fn update(
&self,
name: &str,
profile: &Profile,
expected_revision: u64,
) -> Result<meerkat_mob::StoredRealmProfile, meerkat_mob::MobStoreError> {
if self.profiles.contains_key(name) {
return Err(meerkat_mob::MobStoreError::CasConflict(format!(
"realm profile '{name}' is provided by the mob definition"
)));
}
self.inner.update(name, profile, expected_revision).await
}
async fn delete(
&self,
name: &str,
expected_revision: u64,
) -> Result<meerkat_mob::StoredRealmProfile, meerkat_mob::MobStoreError> {
if self.profiles.contains_key(name) {
return Err(meerkat_mob::MobStoreError::CasConflict(format!(
"realm profile '{name}' is provided by the mob definition"
)));
}
self.inner.delete(name, expected_revision).await
}
}
fn delegate_idle_retire_override_from_args(
tool_name: &str,
args: &mut Value,
) -> Result<Option<DelegateIdleRetireOverride>, meerkat_core::ToolError> {
let Some(object) = args.as_object_mut() else {
return Ok(None);
};
let Some(value) = object.remove("idle_retire_secs") else {
return Ok(None);
};
if value.is_null() {
return Ok(Some(DelegateIdleRetireOverride::Disabled));
}
value
.as_u64()
.map(DelegateIdleRetireOverride::Seconds)
.map(Some)
.ok_or_else(|| {
meerkat_core::ToolError::invalid_arguments(
tool_name,
"idle_retire_secs must be a non-negative integer or null",
)
})
}
fn delegate_tool_def_with_idle_retire_secs(
tool: &meerkat_core::types::ToolDef,
) -> meerkat_core::types::ToolDef {
let mut patched = tool.clone();
if !patched.description.contains("IDLE RETIREMENT:") {
patched.description.push_str(
"\n\nIDLE RETIREMENT:\n\
Omit idle_retire_secs to use the runtime default. Pass an integer \
number of seconds to override idle auto-retirement for this helper. \
Pass null to disable auto-retirement for this helper.",
);
}
if let Some(properties) = patched
.input_schema
.get_mut("properties")
.and_then(Value::as_object_mut)
{
properties
.entry("idle_retire_secs".to_string())
.or_insert_with(|| {
serde_json::json!({
"description": "Override idle auto-retirement for this helper. Omit to use the runtime default, use an integer number of seconds to override, or null to disable auto-retirement for this helper.",
"anyOf": [
{"type": "integer", "minimum": 0},
{"type": "null"}
]
})
});
}
patched
}
fn mob_spawn_tool_def_with_idle_retire_secs(
tool: &meerkat_core::types::ToolDef,
) -> meerkat_core::types::ToolDef {
let mut patched = tool.clone();
if !patched.description.contains("IDLE RETIREMENT:") {
patched.description.push_str(
"\n\nIDLE RETIREMENT:\n\
Omit idle_retire_secs to leave this spawned member out of auto-retirement. \
Pass an integer number of seconds to retire the member after it has been \
idle for that long. Pass null to explicitly disable auto-retirement.",
);
}
if let Some(properties) = patched
.input_schema
.get_mut("properties")
.and_then(Value::as_object_mut)
{
properties
.entry("idle_retire_secs".to_string())
.or_insert_with(|| {
serde_json::json!({
"description": "Opt this spawned member into idle auto-retirement. Omit to keep the member indefinitely, use an integer number of seconds to retire after that much idle time, or null to explicitly disable auto-retirement.",
"anyOf": [
{"type": "integer", "minimum": 0},
{"type": "null"}
]
})
});
}
patched
}
fn install_agent_mob_tools(
definition: &MobDefinition,
slot: Arc<std::sync::RwLock<Option<Arc<dyn meerkat_core::service::MobToolsFactory>>>>,
session_service: Arc<dyn MobSessionService>,
) -> (
Arc<meerkat_mob_mcp::MobMcpState>,
ImplicitDelegateRetirementOverrides,
SharedDefaultLlmClientSlot,
) {
let default_llm_client_slot = Arc::new(std::sync::RwLock::new(None::<Arc<dyn LlmClient>>));
let default_llm_client_provider_slot = Arc::clone(&default_llm_client_slot);
let mut state = meerkat_mob_mcp::MobMcpState::new(session_service);
if let Some(base_store) = state.realm_profile_store().cloned()
&& let Some(store) = DefinitionSeededRealmProfileStore::new(definition, base_store)
{
state = state.with_realm_profile_store(Some(Arc::new(store)));
}
state = state
.with_realm_skill_sources(definition.skills.clone())
.with_default_llm_client_provider(Some(Arc::new(move || {
default_llm_client_provider_slot
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
})));
let state = Arc::new(state);
let implicit_delegate_retirement_overrides = ImplicitDelegateRetirementOverrides::default();
let inner = Arc::new(meerkat_mob_mcp::AgentMobToolSurfaceFactory::new(
Arc::clone(&state),
));
let factory = Arc::new(AutoWireParentMobToolsFactory {
inner,
implicit_delegate_retirement_overrides: implicit_delegate_retirement_overrides.clone(),
});
*slot
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(factory);
(
state,
implicit_delegate_retirement_overrides,
default_llm_client_slot,
)
}
#[cfg(test)]
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub(crate) struct RuntimeTurnTrace {
pub(crate) session_id: String,
pub(crate) boundary: String,
pub(crate) contributing_input_count: usize,
pub(crate) outcome: String,
}
fn is_replay_unsafe_server_tool_content(name: &str, content: &Value) -> bool {
name == "web_search_annotations"
|| content
.get("type")
.and_then(Value::as_str)
.is_some_and(|kind| kind.starts_with("response."))
}
fn sanitize_llm_request_for_stateless_replay(request: &LlmRequest) -> LlmRequest {
let mut sanitized = request.clone();
sanitized.messages = request
.messages
.iter()
.cloned()
.map(sanitize_message_for_stateless_replay)
.collect();
sanitized
}
fn sanitize_create_session_request_llm_override(req: &mut CreateSessionRequest) {
let Some(build) = req.build.as_mut() else {
return;
};
let Some(client) = build
.llm_client_override
.as_ref()
.and_then(meerkat::decode_llm_client_override_from_service)
else {
return;
};
build.llm_client_override = Some(meerkat::encode_llm_client_override_for_service(
ReplaySanitizingLlmClient::wrap(client),
));
}
fn sanitize_message_for_stateless_replay(message: Message) -> Message {
match message {
Message::BlockAssistant(mut assistant) => {
assistant.blocks = assistant
.blocks
.into_iter()
.filter_map(|block| match block {
AssistantBlock::ServerToolContent { name, content, .. }
if is_replay_unsafe_server_tool_content(&name, &content) =>
{
None
}
other => Some(other),
})
.collect();
Message::BlockAssistant(assistant)
}
other => other,
}
}
fn build_persistent_runtime_store(store_path: &Path) -> Arc<dyn meerkat_runtime::RuntimeStore> {
let runtime_db = store_path.join("runtime.sqlite");
match meerkat_runtime::store::SqliteRuntimeStore::new(&runtime_db) {
Ok(store) => Arc::new(store),
Err(err) => {
tracing::warn!(
path = %runtime_db.display(),
error = %err,
"failed to open SqliteRuntimeStore; falling back to InMemoryRuntimeStore. \
Sessions will not survive process restart and archive operations may fail.",
);
Arc::new(meerkat_runtime::InMemoryRuntimeStore::new())
}
}
}
struct SessionStoreBackedRuntimeStore {
inner: Arc<dyn meerkat_runtime::RuntimeStore>,
}
impl SessionStoreBackedRuntimeStore {
fn new(
inner: Arc<dyn meerkat_runtime::RuntimeStore>,
_session_store: Arc<dyn SessionStore>,
) -> Self {
Self { inner }
}
}
#[async_trait]
impl meerkat_runtime::RuntimeStore for SessionStoreBackedRuntimeStore {
fn auth_authority_key(&self) -> Option<String> {
self.inner.auth_authority_key()
}
fn persist_auth_oauth_flow_snapshot(
&self,
snapshot_json: &[u8],
) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
self.inner.persist_auth_oauth_flow_snapshot(snapshot_json)
}
fn load_auth_oauth_flow_snapshot(
&self,
) -> Result<Option<Vec<u8>>, meerkat_runtime::store::RuntimeStoreError> {
self.inner.load_auth_oauth_flow_snapshot()
}
fn update_auth_oauth_flow_snapshot(
&self,
update: &mut meerkat_runtime::store::AuthOAuthFlowSnapshotUpdate<'_>,
) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
self.inner.update_auth_oauth_flow_snapshot(update)
}
async fn commit_session_snapshot(
&self,
runtime_id: &meerkat_runtime::LogicalRuntimeId,
session_delta: meerkat_runtime::store::SessionDelta,
) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
self.inner
.commit_session_snapshot(runtime_id, session_delta)
.await
}
async fn commit_session_transcript_rewrite_snapshot(
&self,
runtime_id: &meerkat_runtime::LogicalRuntimeId,
session_delta: meerkat_runtime::store::SessionDelta,
commit: &meerkat_core::TranscriptRewriteCommit,
) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
self.inner
.commit_session_transcript_rewrite_snapshot(runtime_id, session_delta, commit)
.await
}
async fn atomic_apply(
&self,
runtime_id: &meerkat_runtime::LogicalRuntimeId,
session_delta: Option<meerkat_runtime::store::SessionDelta>,
receipt: meerkat_core::lifecycle::RunBoundaryReceipt,
input_updates: Vec<StoredInputState>,
session_store_key: Option<meerkat_core::types::SessionId>,
) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
self.inner
.atomic_apply(
runtime_id,
session_delta,
receipt,
input_updates,
session_store_key,
)
.await
}
async fn load_input_states(
&self,
runtime_id: &meerkat_runtime::LogicalRuntimeId,
) -> Result<Vec<StoredInputState>, meerkat_runtime::store::RuntimeStoreError> {
self.inner.load_input_states(runtime_id).await
}
async fn load_boundary_receipt(
&self,
runtime_id: &meerkat_runtime::LogicalRuntimeId,
run_id: &meerkat_core::lifecycle::RunId,
sequence: u64,
) -> Result<
Option<meerkat_core::lifecycle::RunBoundaryReceipt>,
meerkat_runtime::store::RuntimeStoreError,
> {
self.inner
.load_boundary_receipt(runtime_id, run_id, sequence)
.await
}
async fn load_session_snapshot(
&self,
runtime_id: &meerkat_runtime::LogicalRuntimeId,
) -> Result<Option<Vec<u8>>, meerkat_runtime::store::RuntimeStoreError> {
self.inner.load_session_snapshot(runtime_id).await
}
async fn clear_session_snapshot(
&self,
runtime_id: &meerkat_runtime::LogicalRuntimeId,
) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
self.inner.clear_session_snapshot(runtime_id).await
}
async fn replace_session_snapshot_if_current(
&self,
runtime_id: &meerkat_runtime::LogicalRuntimeId,
expected_current: &[u8],
replacement: Vec<u8>,
) -> Result<bool, meerkat_runtime::store::RuntimeStoreError> {
self.inner
.replace_session_snapshot_if_current(runtime_id, expected_current, replacement)
.await
}
async fn clear_session_snapshot_if_current(
&self,
runtime_id: &meerkat_runtime::LogicalRuntimeId,
expected_current: &[u8],
) -> Result<bool, meerkat_runtime::store::RuntimeStoreError> {
self.inner
.clear_session_snapshot_if_current(runtime_id, expected_current)
.await
}
async fn persist_input_state(
&self,
runtime_id: &meerkat_runtime::LogicalRuntimeId,
state: &StoredInputState,
) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
self.inner.persist_input_state(runtime_id, state).await
}
async fn load_input_state(
&self,
runtime_id: &meerkat_runtime::LogicalRuntimeId,
input_id: &meerkat_core::lifecycle::InputId,
) -> Result<Option<StoredInputState>, meerkat_runtime::store::RuntimeStoreError> {
self.inner.load_input_state(runtime_id, input_id).await
}
async fn load_runtime_state(
&self,
runtime_id: &meerkat_runtime::LogicalRuntimeId,
) -> Result<Option<meerkat_runtime::RuntimeState>, meerkat_runtime::store::RuntimeStoreError>
{
self.inner.load_runtime_state(runtime_id).await
}
async fn commit_machine_lifecycle(
&self,
runtime_id: &meerkat_runtime::LogicalRuntimeId,
commit: MachineLifecycleCommit,
input_states: &[StoredInputState],
) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
self.inner
.commit_machine_lifecycle(runtime_id, commit, input_states)
.await
}
async fn persist_ops_lifecycle(
&self,
runtime_id: &meerkat_runtime::LogicalRuntimeId,
snapshot: &meerkat_runtime::ops_lifecycle::PersistedOpsSnapshot,
) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
self.inner.persist_ops_lifecycle(runtime_id, snapshot).await
}
async fn load_ops_lifecycle(
&self,
runtime_id: &meerkat_runtime::LogicalRuntimeId,
) -> Result<
Option<meerkat_runtime::ops_lifecycle::PersistedOpsSnapshot>,
meerkat_runtime::store::RuntimeStoreError,
> {
self.inner.load_ops_lifecycle(runtime_id).await
}
}
#[cfg(test)]
static RUNTIME_TURN_TRACES: OnceLock<Mutex<Vec<RuntimeTurnTrace>>> = OnceLock::new();
#[cfg(test)]
fn runtime_turn_traces() -> &'static Mutex<Vec<RuntimeTurnTrace>> {
RUNTIME_TURN_TRACES.get_or_init(|| Mutex::new(Vec::new()))
}
#[cfg(test)]
#[allow(clippy::expect_used)]
fn record_runtime_turn_trace(trace: RuntimeTurnTrace) {
runtime_turn_traces()
.lock()
.expect("runtime turn traces mutex")
.push(trace);
}
#[cfg(test)]
#[allow(dead_code)]
#[allow(clippy::expect_used)]
pub(crate) fn take_runtime_turn_traces() -> Vec<RuntimeTurnTrace> {
std::mem::take(
&mut *runtime_turn_traces()
.lock()
.expect("runtime turn traces mutex"),
)
}
#[cfg(not(test))]
#[allow(dead_code)]
fn record_runtime_turn_trace(_trace: ()) {}
fn runtime_turn_diagnostics_enabled() -> bool {
std::env::var_os("MOBKIT_TRACE_RUNTIME_TURNS").is_some()
}
fn summarize_runtime_prompt(prompt: &meerkat_core::ContentInput) -> String {
match prompt {
meerkat_core::ContentInput::Text(text) => {
text.lines().take(6).collect::<Vec<_>>().join(" ")
}
meerkat_core::ContentInput::Blocks(blocks) => blocks
.iter()
.map(|block| block.text_projection().to_string())
.collect::<Vec<_>>()
.join(" ")
.lines()
.take(6)
.collect::<Vec<_>>()
.join(" "),
}
}
pub fn mob_definition_may_use_image_generation(definition: &MobDefinition) -> bool {
definition.profiles.values().any(|binding| {
binding
.as_inline()
.is_none_or(|profile| profile.tools.image_generation)
})
}
fn normalize_runtime_turn_request(
mut req: meerkat_core::service::StartTurnRequest,
) -> meerkat_core::service::StartTurnRequest {
req.runtime.handling_mode = meerkat_core::types::HandlingMode::Queue;
req.runtime.render_metadata = None;
req
}
macro_rules! delegate_mob_session_service {
($wrapper:ty) => {
#[async_trait]
impl meerkat_core::service::SessionService for $wrapper {
async fn create_session(
&self,
mut req: CreateSessionRequest,
) -> Result<meerkat_core::types::RunResult, SessionError> {
(self.hook)(&mut req).await?;
sanitize_create_session_request_llm_override(&mut req);
let ctx = SessionCreatedContext {
model: req.model.clone(),
labels: req.labels.clone().unwrap_or_default(),
system_prompt: req.system_prompt.clone(),
};
let result = self.inner.create_session(req).await?;
if let Some(ref after_hook) = self.after_create_hook {
after_hook(result.session_id.clone(), ctx).await;
}
Ok(result)
}
async fn start_turn(
&self,
id: &meerkat_core::types::SessionId,
req: meerkat_core::service::StartTurnRequest,
) -> Result<meerkat_core::types::RunResult, SessionError> {
self.inner.start_turn(id, req).await
}
async fn interrupt(
&self,
id: &meerkat_core::types::SessionId,
) -> Result<(), SessionError> {
self.inner.interrupt(id).await
}
async fn cancel_after_boundary(
&self,
id: &meerkat_core::types::SessionId,
) -> Result<(), SessionError> {
self.inner.cancel_after_boundary(id).await
}
async fn set_session_client(
&self,
id: &meerkat_core::types::SessionId,
client: Arc<dyn meerkat_core::AgentLlmClient>,
) -> Result<(), SessionError> {
self.inner
.set_session_client(id, ReplaySanitizingAgentLlmClient::wrap(client))
.await
}
async fn hot_swap_session_llm_identity(
&self,
id: &meerkat_core::types::SessionId,
client: Arc<dyn meerkat_core::AgentLlmClient>,
identity: meerkat_core::session::SessionLlmIdentity,
request_policy: meerkat_core::SessionLlmRequestPolicy,
) -> Result<(), SessionError> {
self.inner
.hot_swap_session_llm_identity(
id,
ReplaySanitizingAgentLlmClient::wrap(client),
identity,
request_policy,
)
.await
}
async fn update_session_keep_alive(
&self,
id: &meerkat_core::types::SessionId,
keep_alive: bool,
) -> Result<(), SessionError> {
self.inner.update_session_keep_alive(id, keep_alive).await
}
async fn update_session_mob_authority_context(
&self,
id: &meerkat_core::types::SessionId,
authority_context: Option<meerkat_core::service::MobToolAuthorityContext>,
) -> Result<(), SessionError> {
self.inner
.update_session_mob_authority_context(id, authority_context)
.await
}
async fn has_live_session(
&self,
id: &meerkat_core::types::SessionId,
) -> Result<bool, SessionError> {
self.inner.has_live_session(id).await
}
async fn set_session_tool_visibility_state(
&self,
id: &meerkat_core::types::SessionId,
state: Option<meerkat_core::SessionToolVisibilityState>,
) -> Result<(), SessionError> {
self.inner
.set_session_tool_visibility_state(id, state)
.await
}
async fn set_session_tool_filter(
&self,
id: &meerkat_core::types::SessionId,
filter: meerkat_core::ToolFilter,
) -> Result<(), SessionError> {
self.inner.set_session_tool_filter(id, filter).await
}
async fn read(
&self,
id: &meerkat_core::types::SessionId,
) -> Result<meerkat_core::service::SessionView, SessionError> {
self.inner.read(id).await
}
async fn list(
&self,
query: meerkat_core::service::SessionQuery,
) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
self.inner.list(query).await
}
async fn archive(
&self,
id: &meerkat_core::types::SessionId,
) -> Result<(), SessionError> {
self.inner.archive(id).await
}
async fn subscribe_session_events(
&self,
id: &meerkat_core::types::SessionId,
) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
meerkat_core::service::SessionService::subscribe_session_events(
self.inner.as_ref(),
id,
)
.await
}
}
#[async_trait]
impl meerkat_core::service::SessionServiceCommsExt for $wrapper {
async fn comms_runtime(
&self,
id: &meerkat_core::types::SessionId,
) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
self.inner.comms_runtime(id).await
}
async fn event_injector(
&self,
id: &meerkat_core::types::SessionId,
) -> Option<Arc<dyn meerkat_core::EventInjector>> {
self.inner.event_injector(id).await
}
async fn interaction_event_injector(
&self,
id: &meerkat_core::types::SessionId,
) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
self.inner.interaction_event_injector(id).await
}
}
#[async_trait]
impl meerkat_core::service::SessionServiceControlExt for $wrapper {
async fn append_system_context(
&self,
id: &meerkat_core::types::SessionId,
req: meerkat_core::service::AppendSystemContextRequest,
) -> Result<
meerkat_core::service::AppendSystemContextResult,
meerkat_core::service::SessionControlError,
> {
self.inner.append_system_context(id, req).await
}
async fn stage_tool_results(
&self,
id: &meerkat_core::types::SessionId,
req: meerkat_core::service::StageToolResultsRequest,
) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
self.inner.stage_tool_results(id, req).await
}
}
#[async_trait]
impl meerkat_core::service::SessionServiceHistoryExt for $wrapper {
async fn read_history(
&self,
id: &meerkat_core::types::SessionId,
query: meerkat_core::service::SessionHistoryQuery,
) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
self.inner.read_history(id, query).await
}
}
#[async_trait]
impl MobSessionService for $wrapper {
fn supports_persistent_sessions(&self) -> bool {
self.inner.supports_persistent_sessions()
}
fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
self.runtime_adapter_override
.clone()
.or_else(|| self.inner.runtime_adapter())
}
async fn interrupt_with_machine_authority(
&self,
session_id: &meerkat_core::types::SessionId,
authority: meerkat_runtime::MachineSessionControlAuthority,
) -> Result<(), SessionError> {
self.inner
.interrupt_with_machine_authority(session_id, authority)
.await
}
async fn cancel_after_boundary_with_machine_authority(
&self,
session_id: &meerkat_core::types::SessionId,
authority: meerkat_runtime::MachineSessionControlAuthority,
) -> Result<(), SessionError> {
self.inner
.cancel_after_boundary_with_machine_authority(session_id, authority)
.await
}
async fn session_belongs_to_mob(
&self,
session_id: &meerkat_core::types::SessionId,
mob_id: &meerkat_mob::MobId,
) -> bool {
self.inner.session_belongs_to_mob(session_id, mob_id).await
}
async fn load_persisted_session(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<Option<meerkat_core::session::Session>, SessionError> {
self.inner.load_persisted_session(session_id).await
}
async fn subscribe_session_events(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
meerkat_mob::MobSessionService::subscribe_session_events(
self.inner.as_ref(),
session_id,
)
.await
}
async fn archive_with_mob_lifecycle_authority(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<(), SessionError> {
self.inner
.archive_with_mob_lifecycle_authority(session_id)
.await
}
async fn execution_snapshot(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<Option<meerkat_core::agent::AgentExecutionSnapshot>, SessionError> {
self.inner.execution_snapshot(session_id).await
}
async fn tool_scope_snapshot(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<Option<meerkat_core::ToolScopeSnapshot>, SessionError> {
self.inner.tool_scope_snapshot(session_id).await
}
async fn external_tool_surface_snapshot(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<Option<meerkat_core::ExternalToolSurfaceSnapshot>, SessionError> {
self.inner.external_tool_surface_snapshot(session_id).await
}
async fn peer_ingress_runtime_snapshot(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<Option<meerkat_core::PeerIngressRuntimeSnapshot>, SessionError> {
self.inner.peer_ingress_runtime_snapshot(session_id).await
}
async fn apply_runtime_turn(
&self,
session_id: &meerkat_core::types::SessionId,
run_id: meerkat_core::lifecycle::RunId,
req: meerkat_core::service::StartTurnRequest,
boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
#[cfg(test)]
let boundary_name = format!("{boundary:?}");
#[cfg(test)]
let contributing_count = contributing_input_ids.len();
let run_id_for_log = run_id.to_string();
let prompt_summary = if runtime_turn_diagnostics_enabled() {
Some(summarize_runtime_prompt(&req.prompt))
} else {
None
};
if let Some(summary) = prompt_summary.as_ref() {
tracing::warn!(
session_id = %session_id,
run_id = %run_id_for_log,
boundary = ?boundary,
contributing_inputs = contributing_input_ids.len(),
prompt = %summary,
runtime = ?req.runtime,
"mobkit runtime turn start"
);
}
let result = self
.inner
.apply_runtime_turn(
session_id,
run_id,
normalize_runtime_turn_request(req),
boundary,
contributing_input_ids,
)
.await;
#[cfg(test)]
record_runtime_turn_trace(RuntimeTurnTrace {
session_id: session_id.to_string(),
boundary: boundary_name,
contributing_input_count: contributing_count,
outcome: match &result {
Ok(_) => "ok".to_string(),
Err(error) => format!("err:{error}"),
},
});
if runtime_turn_diagnostics_enabled() {
match &result {
Ok(_) => tracing::warn!(
session_id = %session_id,
run_id = %run_id_for_log,
"mobkit runtime turn ok"
),
Err(error) => tracing::error!(
session_id = %session_id,
run_id = %run_id_for_log,
error = %error,
error_debug = ?error,
"mobkit runtime turn error"
),
}
}
result
}
async fn apply_runtime_context_appends(
&self,
session_id: &meerkat_core::types::SessionId,
run_id: meerkat_core::lifecycle::RunId,
appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
self.inner
.apply_runtime_context_appends(
session_id,
run_id,
appends,
contributing_input_ids,
)
.await
}
async fn apply_runtime_context_appends_with_boundary(
&self,
session_id: &meerkat_core::types::SessionId,
run_id: meerkat_core::lifecycle::RunId,
appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
self.inner
.apply_runtime_context_appends_with_boundary(
session_id,
run_id,
appends,
boundary,
contributing_input_ids,
)
.await
}
async fn apply_runtime_system_context_for_turn(
&self,
session_id: &meerkat_core::types::SessionId,
appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
) -> Result<(), SessionError> {
self.inner
.apply_runtime_system_context_for_turn(session_id, appends)
.await
}
async fn stage_runtime_system_context_for_active_turn(
&self,
session_id: &meerkat_core::types::SessionId,
expected_run_id: &meerkat_core::lifecycle::RunId,
appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
) -> Result<Option<Vec<u8>>, SessionError> {
self.inner
.stage_runtime_system_context_for_active_turn(
session_id,
expected_run_id,
appends,
)
.await
}
async fn discard_runtime_system_context_for_active_turn(
&self,
session_id: &meerkat_core::types::SessionId,
expected_run_id: &meerkat_core::lifecycle::RunId,
idempotency_keys: Vec<String>,
) -> Result<(), SessionError> {
self.inner
.discard_runtime_system_context_for_active_turn(
session_id,
expected_run_id,
idempotency_keys,
)
.await
}
async fn active_turn_system_context_boundary_available(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<Option<bool>, SessionError> {
self.inner
.active_turn_system_context_boundary_available(session_id)
.await
}
async fn discard_live_session(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<(), SessionError> {
self.inner.discard_live_session(session_id).await
}
async fn checkpoint_committed_runtime_session_snapshot(
&self,
session_id: &meerkat_core::types::SessionId,
session_snapshot: &[u8],
) -> Result<(), SessionError> {
self.inner
.checkpoint_committed_runtime_session_snapshot(session_id, session_snapshot)
.await
}
async fn cancel_all_checkpointers(&self) {
self.inner.cancel_all_checkpointers().await;
}
async fn rearm_all_checkpointers(&self) {
self.inner.rearm_all_checkpointers().await;
}
}
};
}
delegate_mob_session_service!(PreBuildMobSessionService);
struct AfterCreateMobSessionService {
inner: Arc<dyn MobSessionService>,
after_hook: AfterCreateHook,
}
#[async_trait]
impl meerkat_core::service::SessionService for AfterCreateMobSessionService {
async fn create_session(
&self,
mut req: CreateSessionRequest,
) -> Result<meerkat_core::types::RunResult, SessionError> {
sanitize_create_session_request_llm_override(&mut req);
let ctx = SessionCreatedContext {
model: req.model.clone(),
labels: req.labels.clone().unwrap_or_default(),
system_prompt: req.system_prompt.clone(),
};
let result = self.inner.create_session(req).await?;
(self.after_hook)(result.session_id.clone(), ctx).await;
Ok(result)
}
async fn start_turn(
&self,
id: &meerkat_core::types::SessionId,
req: meerkat_core::service::StartTurnRequest,
) -> Result<meerkat_core::types::RunResult, SessionError> {
self.inner.start_turn(id, req).await
}
async fn interrupt(&self, id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
self.inner.interrupt(id).await
}
async fn cancel_after_boundary(
&self,
id: &meerkat_core::types::SessionId,
) -> Result<(), SessionError> {
self.inner.cancel_after_boundary(id).await
}
async fn set_session_client(
&self,
id: &meerkat_core::types::SessionId,
client: Arc<dyn meerkat_core::AgentLlmClient>,
) -> Result<(), SessionError> {
self.inner
.set_session_client(id, ReplaySanitizingAgentLlmClient::wrap(client))
.await
}
async fn hot_swap_session_llm_identity(
&self,
id: &meerkat_core::types::SessionId,
client: Arc<dyn meerkat_core::AgentLlmClient>,
identity: meerkat_core::session::SessionLlmIdentity,
request_policy: meerkat_core::SessionLlmRequestPolicy,
) -> Result<(), SessionError> {
self.inner
.hot_swap_session_llm_identity(
id,
ReplaySanitizingAgentLlmClient::wrap(client),
identity,
request_policy,
)
.await
}
async fn update_session_keep_alive(
&self,
id: &meerkat_core::types::SessionId,
keep_alive: bool,
) -> Result<(), SessionError> {
self.inner.update_session_keep_alive(id, keep_alive).await
}
async fn update_session_mob_authority_context(
&self,
id: &meerkat_core::types::SessionId,
authority_context: Option<meerkat_core::service::MobToolAuthorityContext>,
) -> Result<(), SessionError> {
self.inner
.update_session_mob_authority_context(id, authority_context)
.await
}
async fn has_live_session(
&self,
id: &meerkat_core::types::SessionId,
) -> Result<bool, SessionError> {
self.inner.has_live_session(id).await
}
async fn set_session_tool_visibility_state(
&self,
id: &meerkat_core::types::SessionId,
state: Option<meerkat_core::SessionToolVisibilityState>,
) -> Result<(), SessionError> {
self.inner
.set_session_tool_visibility_state(id, state)
.await
}
async fn set_session_tool_filter(
&self,
id: &meerkat_core::types::SessionId,
filter: meerkat_core::ToolFilter,
) -> Result<(), SessionError> {
self.inner.set_session_tool_filter(id, filter).await
}
async fn read(
&self,
id: &meerkat_core::types::SessionId,
) -> Result<meerkat_core::service::SessionView, SessionError> {
self.inner.read(id).await
}
async fn list(
&self,
query: meerkat_core::service::SessionQuery,
) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
self.inner.list(query).await
}
async fn archive(&self, id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
self.inner.archive(id).await
}
async fn subscribe_session_events(
&self,
id: &meerkat_core::types::SessionId,
) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
meerkat_core::service::SessionService::subscribe_session_events(self.inner.as_ref(), id)
.await
}
}
#[async_trait]
impl meerkat_core::service::SessionServiceCommsExt for AfterCreateMobSessionService {
async fn comms_runtime(
&self,
id: &meerkat_core::types::SessionId,
) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
self.inner.comms_runtime(id).await
}
async fn event_injector(
&self,
id: &meerkat_core::types::SessionId,
) -> Option<Arc<dyn meerkat_core::EventInjector>> {
self.inner.event_injector(id).await
}
async fn interaction_event_injector(
&self,
id: &meerkat_core::types::SessionId,
) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
self.inner.interaction_event_injector(id).await
}
}
#[async_trait]
impl meerkat_core::service::SessionServiceControlExt for AfterCreateMobSessionService {
async fn append_system_context(
&self,
id: &meerkat_core::types::SessionId,
req: meerkat_core::service::AppendSystemContextRequest,
) -> Result<
meerkat_core::service::AppendSystemContextResult,
meerkat_core::service::SessionControlError,
> {
self.inner.append_system_context(id, req).await
}
async fn stage_tool_results(
&self,
id: &meerkat_core::types::SessionId,
req: meerkat_core::service::StageToolResultsRequest,
) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
self.inner.stage_tool_results(id, req).await
}
}
#[async_trait]
impl meerkat_core::service::SessionServiceHistoryExt for AfterCreateMobSessionService {
async fn read_history(
&self,
id: &meerkat_core::types::SessionId,
query: meerkat_core::service::SessionHistoryQuery,
) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
self.inner.read_history(id, query).await
}
}
#[async_trait]
impl MobSessionService for AfterCreateMobSessionService {
fn supports_persistent_sessions(&self) -> bool {
self.inner.supports_persistent_sessions()
}
fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
self.inner.runtime_adapter()
}
async fn interrupt_with_machine_authority(
&self,
session_id: &meerkat_core::types::SessionId,
authority: meerkat_runtime::MachineSessionControlAuthority,
) -> Result<(), SessionError> {
self.inner
.interrupt_with_machine_authority(session_id, authority)
.await
}
async fn cancel_after_boundary_with_machine_authority(
&self,
session_id: &meerkat_core::types::SessionId,
authority: meerkat_runtime::MachineSessionControlAuthority,
) -> Result<(), SessionError> {
self.inner
.cancel_after_boundary_with_machine_authority(session_id, authority)
.await
}
async fn session_belongs_to_mob(
&self,
session_id: &meerkat_core::types::SessionId,
mob_id: &meerkat_mob::MobId,
) -> bool {
self.inner.session_belongs_to_mob(session_id, mob_id).await
}
async fn load_persisted_session(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<Option<meerkat_core::session::Session>, SessionError> {
self.inner.load_persisted_session(session_id).await
}
async fn subscribe_session_events(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
meerkat_mob::MobSessionService::subscribe_session_events(self.inner.as_ref(), session_id)
.await
}
async fn archive_with_mob_lifecycle_authority(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<(), SessionError> {
self.inner
.archive_with_mob_lifecycle_authority(session_id)
.await
}
async fn execution_snapshot(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<Option<meerkat_core::agent::AgentExecutionSnapshot>, SessionError> {
self.inner.execution_snapshot(session_id).await
}
async fn tool_scope_snapshot(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<Option<meerkat_core::ToolScopeSnapshot>, SessionError> {
self.inner.tool_scope_snapshot(session_id).await
}
async fn external_tool_surface_snapshot(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<Option<meerkat_core::ExternalToolSurfaceSnapshot>, SessionError> {
self.inner.external_tool_surface_snapshot(session_id).await
}
async fn peer_ingress_runtime_snapshot(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<Option<meerkat_core::PeerIngressRuntimeSnapshot>, SessionError> {
self.inner.peer_ingress_runtime_snapshot(session_id).await
}
async fn apply_runtime_turn(
&self,
session_id: &meerkat_core::types::SessionId,
run_id: meerkat_core::lifecycle::RunId,
req: meerkat_core::service::StartTurnRequest,
boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
self.inner
.apply_runtime_turn(session_id, run_id, req, boundary, contributing_input_ids)
.await
}
async fn apply_runtime_context_appends(
&self,
session_id: &meerkat_core::types::SessionId,
run_id: meerkat_core::lifecycle::RunId,
appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
self.inner
.apply_runtime_context_appends(session_id, run_id, appends, contributing_input_ids)
.await
}
async fn apply_runtime_context_appends_with_boundary(
&self,
session_id: &meerkat_core::types::SessionId,
run_id: meerkat_core::lifecycle::RunId,
appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
self.inner
.apply_runtime_context_appends_with_boundary(
session_id,
run_id,
appends,
boundary,
contributing_input_ids,
)
.await
}
async fn apply_runtime_system_context_for_turn(
&self,
session_id: &meerkat_core::types::SessionId,
appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
) -> Result<(), SessionError> {
self.inner
.apply_runtime_system_context_for_turn(session_id, appends)
.await
}
async fn stage_runtime_system_context_for_active_turn(
&self,
session_id: &meerkat_core::types::SessionId,
expected_run_id: &meerkat_core::lifecycle::RunId,
appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
) -> Result<Option<Vec<u8>>, SessionError> {
self.inner
.stage_runtime_system_context_for_active_turn(session_id, expected_run_id, appends)
.await
}
async fn discard_runtime_system_context_for_active_turn(
&self,
session_id: &meerkat_core::types::SessionId,
expected_run_id: &meerkat_core::lifecycle::RunId,
idempotency_keys: Vec<String>,
) -> Result<(), SessionError> {
self.inner
.discard_runtime_system_context_for_active_turn(
session_id,
expected_run_id,
idempotency_keys,
)
.await
}
async fn active_turn_system_context_boundary_available(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<Option<bool>, SessionError> {
self.inner
.active_turn_system_context_boundary_available(session_id)
.await
}
async fn discard_live_session(
&self,
session_id: &meerkat_core::types::SessionId,
) -> Result<(), SessionError> {
self.inner.discard_live_session(session_id).await
}
async fn checkpoint_committed_runtime_session_snapshot(
&self,
session_id: &meerkat_core::types::SessionId,
session_snapshot: &[u8],
) -> Result<(), SessionError> {
self.inner
.checkpoint_committed_runtime_session_snapshot(session_id, session_snapshot)
.await
}
async fn cancel_all_checkpointers(&self) {
self.inner.cancel_all_checkpointers().await;
}
async fn rearm_all_checkpointers(&self) {
self.inner.rearm_all_checkpointers().await;
}
}
pub struct MobBootstrapSpec {
pub definition: MobDefinition,
pub storage: MobStorage,
pub session_service: Arc<dyn MobSessionService>,
pub binary_blob_store: Option<Arc<dyn BinaryBlobStore>>,
pub(crate) agent_mob_mcp_state: Option<Arc<meerkat_mob_mcp::MobMcpState>>,
pub(crate) implicit_delegate_retirement_overrides: Option<ImplicitDelegateRetirementOverrides>,
pub(crate) agent_mob_default_llm_client_slot: Option<SharedDefaultLlmClientSlot>,
pub options: MobBootstrapOptions,
pub runtime_adapter: Option<Arc<meerkat_runtime::MeerkatMachine>>,
pub(crate) _ephemeral_dir: Option<Arc<tempfile::TempDir>>,
}
impl MobBootstrapSpec {
pub fn new(
definition: MobDefinition,
storage: MobStorage,
session_service: Arc<dyn MobSessionService>,
) -> Self {
let session_service = Arc::new(PreBuildMobSessionService {
inner: session_service,
hook: no_op_pre_build_hook(),
after_create_hook: None,
runtime_adapter_override: None,
}) as Arc<dyn MobSessionService>;
Self {
definition,
storage,
session_service,
binary_blob_store: None,
agent_mob_mcp_state: None,
implicit_delegate_retirement_overrides: None,
agent_mob_default_llm_client_slot: None,
options: MobBootstrapOptions {
allow_ephemeral_sessions: true,
notify_orchestrator_on_resume: true,
default_llm_client: None,
},
runtime_adapter: None,
_ephemeral_dir: None,
}
}
pub fn with_options(mut self, options: MobBootstrapOptions) -> Self {
self.options = options;
self
}
pub fn with_session_runtime_adapter(
mut self,
adapter: Arc<meerkat_runtime::MeerkatMachine>,
) -> Self {
self.session_service = Arc::new(PreBuildMobSessionService {
inner: self.session_service,
hook: no_op_pre_build_hook(),
after_create_hook: None,
runtime_adapter_override: Some(adapter),
});
self
}
pub fn with_after_create_hook(mut self, hook: AfterCreateHook) -> Self {
self.session_service = Arc::new(AfterCreateMobSessionService {
inner: self.session_service,
after_hook: hook,
});
self
}
pub fn ephemeral(
definition: MobDefinition,
storage: MobStorage,
store_path: PathBuf,
max_sessions: usize,
session_store: Option<Arc<dyn AgentSessionStore>>,
) -> Self {
Self::ephemeral_inner(
definition,
storage,
store_path,
max_sessions,
session_store,
None,
CapabilityFlags::default(),
None,
None,
)
}
pub fn ephemeral_with_hook(
definition: MobDefinition,
storage: MobStorage,
store_path: PathBuf,
max_sessions: usize,
session_store: Option<Arc<dyn AgentSessionStore>>,
hook: impl Fn(
&mut CreateSessionRequest,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
> + Send
+ Sync
+ 'static,
) -> Self {
Self::ephemeral_inner(
definition,
storage,
store_path,
max_sessions,
session_store,
Some(Arc::new(hook)),
CapabilityFlags::default(),
None,
None,
)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn ephemeral_inner(
definition: MobDefinition,
storage: MobStorage,
store_path: PathBuf,
max_sessions: usize,
session_store: Option<Arc<dyn AgentSessionStore>>,
hook: Option<PreBuildHook>,
mut caps: CapabilityFlags,
after_create_hook: Option<AfterCreateHook>,
agent_config: Option<Config>,
) -> Self {
caps.image_generation |= mob_definition_may_use_image_generation(&definition);
let binary_blob_store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
let blob_store: Arc<dyn meerkat_core::BlobStore> =
Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
let runtime_adapter = if caps.image_generation {
let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
Arc::new(meerkat_runtime::InMemoryRuntimeStore::new());
Some(Arc::new(meerkat_runtime::MeerkatMachine::persistent(
runtime_store,
Arc::clone(&blob_store),
)))
} else {
None
};
let mut factory = AgentFactory::new(&store_path)
.builtins(caps.builtins)
.shell(caps.shell)
.mob(caps.mob)
.comms(caps.comms)
.memory(caps.memory);
if let Some(machine) = runtime_adapter.clone() {
factory = factory.with_image_generation_machine(machine);
}
let config = agent_config.unwrap_or_default();
let mut builder = FactoryAgentBuilder::new(factory, config);
builder.default_blob_store = Some(blob_store);
if let Some(store) = session_store {
builder.default_session_store = Some(store);
}
let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
let session_service: Arc<dyn MobSessionService> = Arc::new(
meerkat_session::EphemeralSessionService::new(builder, max_sessions),
);
let hook = hook.unwrap_or_else(no_op_pre_build_hook);
let after_create_hook = if let Some(runtime_adapter) = runtime_adapter.clone() {
let user_after_create_hook = after_create_hook.clone();
Some(Arc::new(
move |session_id: meerkat_core::types::SessionId, ctx: SessionCreatedContext| {
let runtime_adapter = runtime_adapter.clone();
let user_after_create_hook = user_after_create_hook.clone();
Box::pin(async move {
runtime_adapter.register_session(session_id.clone()).await;
if let Some(user_after_create_hook) = user_after_create_hook {
user_after_create_hook(session_id, ctx).await;
}
})
as std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
},
) as AfterCreateHook)
} else {
after_create_hook
};
let session_service = Arc::new(PreBuildMobSessionService {
inner: session_service,
hook,
after_create_hook,
runtime_adapter_override: runtime_adapter.clone(),
}) as Arc<dyn MobSessionService>;
let (
agent_mob_mcp_state,
implicit_delegate_retirement_overrides,
agent_mob_default_llm_client_slot,
) = install_agent_mob_tools(&definition, mob_tools_slot, Arc::clone(&session_service));
let mut spec = Self::new(definition, storage, session_service);
spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
spec.agent_mob_default_llm_client_slot = Some(agent_mob_default_llm_client_slot);
spec.runtime_adapter = runtime_adapter;
spec.binary_blob_store = Some(binary_blob_store);
spec
}
pub fn persistent(
definition: MobDefinition,
storage: MobStorage,
store_path: PathBuf,
max_sessions: usize,
session_store: Arc<dyn SessionStore>,
) -> Self {
Self::persistent_inner(
definition,
storage,
store_path,
max_sessions,
session_store,
None,
None,
CapabilityFlags::default(),
None,
None,
)
}
pub fn persistent_with_hook(
definition: MobDefinition,
storage: MobStorage,
store_path: PathBuf,
max_sessions: usize,
session_store: Arc<dyn SessionStore>,
hook: impl Fn(
&mut CreateSessionRequest,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
> + Send
+ Sync
+ 'static,
) -> Self {
Self::persistent_inner(
definition,
storage,
store_path,
max_sessions,
session_store,
None,
Some(Arc::new(hook)),
CapabilityFlags::default(),
None,
None,
)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn persistent_inner(
definition: MobDefinition,
storage: MobStorage,
store_path: PathBuf,
max_sessions: usize,
session_store: Arc<dyn SessionStore>,
custom_blob_store: Option<Arc<dyn meerkat_core::BlobStore>>,
hook: Option<PreBuildHook>,
mut caps: CapabilityFlags,
after_create_hook: Option<AfterCreateHook>,
agent_config: Option<Config>,
) -> Self {
caps.image_generation |= mob_definition_may_use_image_generation(&definition);
let (binary_blob_store, blob_store): (
Arc<dyn BinaryBlobStore>,
Arc<dyn meerkat_core::BlobStore>,
) = if let Some(blob_store) = custom_blob_store {
(
Arc::new(BinaryBlobStoreAdapter::new(blob_store.clone())),
blob_store,
)
} else {
let binary_blob_store: Arc<dyn BinaryBlobStore> = match ObjectStoreBlobStore::local(
store_path.join("blobs"),
) {
Ok(store) => Arc::new(store),
Err(err) => {
tracing::warn!(
error = %err,
"failed to initialize persistent binary blob store; falling back to in-memory blobs"
);
Arc::new(ObjectStoreBlobStore::memory())
}
};
let blob_store: Arc<dyn meerkat_core::BlobStore> =
Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
(binary_blob_store, blob_store)
};
let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
build_persistent_runtime_store(&store_path);
let runtime_adapter = Arc::new(meerkat_runtime::MeerkatMachine::persistent(
Arc::clone(&runtime_store),
Arc::clone(&blob_store),
));
let mut factory = AgentFactory::new(&store_path)
.builtins(caps.builtins)
.shell(caps.shell)
.mob(caps.mob)
.comms(caps.comms)
.memory(caps.memory);
if caps.image_generation {
factory = factory.with_image_generation_machine(runtime_adapter.clone());
}
let config = agent_config.unwrap_or_default();
let mut builder = FactoryAgentBuilder::new(factory, config);
builder.default_session_store = Some(Arc::new(StoreAdapter::new(session_store.clone())));
builder.default_blob_store = Some(blob_store.clone());
let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
let session_service: Arc<dyn MobSessionService> =
Arc::new(meerkat_session::PersistentSessionService::new(
builder,
max_sessions,
session_store,
Some(runtime_store),
blob_store,
));
let hook = hook.unwrap_or_else(no_op_pre_build_hook);
let session_service = Arc::new(PreBuildMobSessionService {
inner: session_service,
hook,
after_create_hook,
runtime_adapter_override: None,
}) as Arc<dyn MobSessionService>;
let (
agent_mob_mcp_state,
implicit_delegate_retirement_overrides,
agent_mob_default_llm_client_slot,
) = install_agent_mob_tools(&definition, mob_tools_slot, Arc::clone(&session_service));
let mut spec = Self::new(definition, storage, session_service);
spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
spec.agent_mob_default_llm_client_slot = Some(agent_mob_default_llm_client_slot);
spec.runtime_adapter = Some(runtime_adapter);
spec.binary_blob_store = Some(binary_blob_store);
spec
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn ephemeral_runtime_backed_inner(
definition: MobDefinition,
storage: MobStorage,
store_path: PathBuf,
max_sessions: usize,
custom_session_store: Option<Arc<dyn SessionStore>>,
custom_blob_store: Option<Arc<dyn meerkat_core::BlobStore>>,
hook: Option<PreBuildHook>,
mut caps: CapabilityFlags,
after_create_hook: Option<AfterCreateHook>,
agent_config: Option<Config>,
) -> Self {
caps.image_generation |= mob_definition_may_use_image_generation(&definition);
let config = agent_config.unwrap_or_default();
let session_store: Arc<dyn SessionStore> = custom_session_store
.clone()
.unwrap_or_else(|| Arc::new(meerkat_store::MemoryStore::new()));
let (binary_blob_store, blob_store): (
Arc<dyn BinaryBlobStore>,
Arc<dyn meerkat_core::BlobStore>,
) = if let Some(blob_store) = custom_blob_store {
(
Arc::new(BinaryBlobStoreAdapter::new(blob_store.clone())),
blob_store,
)
} else {
let binary_blob_store: Arc<dyn BinaryBlobStore> =
Arc::new(ObjectStoreBlobStore::memory());
let blob_store: Arc<dyn meerkat_core::BlobStore> =
Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
(binary_blob_store, blob_store)
};
let base_runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
Arc::new(meerkat_runtime::InMemoryRuntimeStore::new());
let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
if let Some(custom_session_store) = custom_session_store.clone() {
Arc::new(SessionStoreBackedRuntimeStore::new(
Arc::clone(&base_runtime_store),
custom_session_store,
))
} else {
Arc::clone(&base_runtime_store)
};
let runtime_adapter = Arc::new(meerkat_runtime::MeerkatMachine::persistent(
Arc::clone(&runtime_store),
Arc::clone(&blob_store),
));
let mut factory = AgentFactory::new(&store_path)
.builtins(caps.builtins)
.shell(caps.shell)
.mob(caps.mob)
.comms(caps.comms)
.memory(caps.memory);
if caps.image_generation {
factory = factory.with_image_generation_machine(runtime_adapter.clone());
}
let mut builder = FactoryAgentBuilder::new(factory, config);
builder.default_session_store = Some(Arc::new(StoreAdapter::new(session_store.clone())));
builder.default_blob_store = Some(blob_store.clone());
let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
let session_service: Arc<dyn MobSessionService> =
if let Some(custom_session_store) = custom_session_store {
Arc::new(meerkat_session::PersistentSessionService::new(
builder,
max_sessions,
custom_session_store,
Some(runtime_store.clone()),
blob_store,
))
} else {
Arc::new(meerkat_session::EphemeralSessionService::new(
builder,
max_sessions,
))
};
let hook = hook.unwrap_or_else(no_op_pre_build_hook);
let runtime_adapter_for_after_create = runtime_adapter.clone();
let combined_after_create_hook: AfterCreateHook = Arc::new(move |session_id, ctx| {
let runtime_adapter = runtime_adapter_for_after_create.clone();
let after_create_hook = after_create_hook.clone();
Box::pin(async move {
runtime_adapter.register_session(session_id.clone()).await;
if let Some(after_create_hook) = after_create_hook {
after_create_hook(session_id, ctx).await;
}
})
});
let session_service = Arc::new(PreBuildMobSessionService {
inner: session_service,
hook,
after_create_hook: Some(combined_after_create_hook),
runtime_adapter_override: Some(runtime_adapter.clone()),
}) as Arc<dyn MobSessionService>;
let (
agent_mob_mcp_state,
implicit_delegate_retirement_overrides,
agent_mob_default_llm_client_slot,
) = install_agent_mob_tools(&definition, mob_tools_slot, Arc::clone(&session_service));
let mut spec = Self::new(definition, storage, session_service);
spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
spec.agent_mob_default_llm_client_slot = Some(agent_mob_default_llm_client_slot);
spec.runtime_adapter = Some(runtime_adapter);
spec.binary_blob_store = Some(binary_blob_store);
spec
}
}
#[derive(Debug)]
pub enum MobRuntimeError {
Mob(MobError),
InvalidInput(&'static str),
InvalidConfig(String),
}
impl std::fmt::Display for MobRuntimeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Mob(err) => write!(f, "{err}"),
Self::InvalidInput(message) => write!(f, "{message}"),
Self::InvalidConfig(message) => write!(f, "{message}"),
}
}
}
impl std::error::Error for MobRuntimeError {}
impl From<MobError> for MobRuntimeError {
fn from(value: MobError) -> Self {
Self::Mob(value)
}
}
#[derive(Clone, Debug)]
pub struct SessionCreatedContext {
pub model: String,
pub labels: std::collections::BTreeMap<String, String>,
pub system_prompt: Option<String>,
}
#[async_trait]
pub trait SessionHook: Send + Sync {
async fn before_create(&self, _req: &mut CreateSessionRequest) -> Result<(), SessionError> {
Ok(())
}
async fn after_create(
&self,
_session_id: &meerkat_core::types::SessionId,
_ctx: &SessionCreatedContext,
) {
}
}
#[derive(Clone, Copy, Debug)]
pub struct CapabilityFlags {
pub builtins: bool,
pub shell: bool,
pub mob: bool,
pub comms: bool,
pub memory: bool,
pub image_generation: bool,
}
impl Default for CapabilityFlags {
fn default() -> Self {
Self {
builtins: true,
shell: true,
mob: true,
comms: true,
memory: true,
image_generation: false,
}
}
}
pub type RealMobRuntime = MobRuntime;
#[derive(Clone)]
pub struct MobRuntime {
handle: MobHandle,
session_service: Option<Arc<dyn MobSessionService>>,
agent_mob_mcp_state: Option<Arc<meerkat_mob_mcp::MobMcpState>>,
implicit_delegate_retirement_overrides: Option<ImplicitDelegateRetirementOverrides>,
binary_blob_store: Option<Arc<dyn BinaryBlobStore>>,
baseline_member_specs: Arc<tokio::sync::RwLock<Vec<SpawnMemberSpec>>>,
_ephemeral_dir: Option<Arc<tempfile::TempDir>>,
}
impl MobRuntime {
pub async fn bootstrap(spec: MobBootstrapSpec) -> Result<Self, MobRuntimeError> {
let ephemeral_dir = spec._ephemeral_dir.clone();
let session_service = spec.session_service.clone();
let binary_blob_store = spec.binary_blob_store.clone();
let mob_id = spec.definition.id.clone();
let agent_mob_mcp_state = spec.agent_mob_mcp_state.clone();
let implicit_delegate_retirement_overrides =
spec.implicit_delegate_retirement_overrides.clone();
let default_llm_client = spec
.options
.default_llm_client
.clone()
.map(ReplaySanitizingLlmClient::wrap);
if let Some(slot) = spec.agent_mob_default_llm_client_slot.as_ref() {
*slot
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = default_llm_client.clone();
}
let effective_runtime_adapter = spec
.runtime_adapter
.clone()
.or_else(|| session_service.runtime_adapter());
let mut builder = MobBuilder::new(spec.definition, spec.storage);
if let Some(adapter) = effective_runtime_adapter {
builder = builder.with_runtime_adapter(adapter);
}
builder = builder
.with_session_service(session_service.clone())
.allow_ephemeral_sessions(spec.options.allow_ephemeral_sessions)
.notify_orchestrator_on_resume(spec.options.notify_orchestrator_on_resume);
if let Some(client) = default_llm_client {
builder = builder.with_default_llm_client(client);
}
let handle = builder.create().await?;
if let Some(state) = agent_mob_mcp_state.as_ref() {
state.mob_insert_handle(mob_id, handle.clone()).await;
}
Ok(Self {
handle,
session_service: Some(session_service),
agent_mob_mcp_state,
implicit_delegate_retirement_overrides,
binary_blob_store,
baseline_member_specs: Arc::new(tokio::sync::RwLock::new(Vec::new())),
_ephemeral_dir: ephemeral_dir,
})
}
pub fn from_handle(handle: MobHandle) -> Self {
Self {
handle,
session_service: None,
agent_mob_mcp_state: None,
implicit_delegate_retirement_overrides: None,
binary_blob_store: None,
baseline_member_specs: Arc::new(tokio::sync::RwLock::new(Vec::new())),
_ephemeral_dir: None,
}
}
pub fn handle(&self) -> MobHandle {
self.handle.clone()
}
pub fn agent_mob_mcp_state(&self) -> Option<Arc<meerkat_mob_mcp::MobMcpState>> {
self.agent_mob_mcp_state.clone()
}
pub(crate) fn implicit_delegate_retirement_overrides(
&self,
) -> Option<ImplicitDelegateRetirementOverrides> {
self.implicit_delegate_retirement_overrides.clone()
}
pub async fn set_baseline_member_specs(&self, specs: Vec<SpawnMemberSpec>) {
*self.baseline_member_specs.write().await = specs;
}
pub async fn baseline_member_specs(&self) -> Vec<SpawnMemberSpec> {
self.baseline_member_specs.read().await.clone()
}
pub async fn read_session_history(
&self,
session_id_str: &str,
offset: usize,
limit: Option<usize>,
) -> Result<SessionHistoryPage, MobRuntimeError> {
if session_id_str.trim().is_empty() {
return Err(MobRuntimeError::InvalidInput(
"session_id must not be empty",
));
}
let Some(session_service) = self.session_service.as_ref() else {
return Err(MobRuntimeError::InvalidInput(
"session history unavailable for this runtime",
));
};
let session_id = meerkat_core::types::SessionId::parse(session_id_str)
.map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
SessionServiceHistoryExt::read_history(
session_service.as_ref(),
&session_id,
SessionHistoryQuery { offset, limit },
)
.await
.map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))
}
#[allow(dead_code)]
pub(crate) async fn runtime_state_for_session(
&self,
session_id_str: &str,
) -> Result<Option<meerkat_runtime::RuntimeState>, MobRuntimeError> {
if session_id_str.trim().is_empty() {
return Err(MobRuntimeError::InvalidInput(
"session_id must not be empty",
));
}
let Some(session_service) = self.session_service.as_ref() else {
return Ok(None);
};
let Some(runtime_adapter) = session_service.runtime_adapter() else {
return Ok(None);
};
let session_id = meerkat_core::types::SessionId::parse(session_id_str)
.map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
let state = meerkat_runtime::service_ext::SessionServiceRuntimeExt::runtime_state(
runtime_adapter.as_ref(),
&session_id,
)
.await
.map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))?;
Ok(Some(state))
}
#[allow(dead_code)]
pub(crate) async fn comms_runtime_for_session(
&self,
session_id_str: &str,
) -> Result<Option<Arc<dyn CommsRuntime>>, MobRuntimeError> {
if session_id_str.trim().is_empty() {
return Err(MobRuntimeError::InvalidInput(
"session_id must not be empty",
));
}
let Some(session_service) = self.session_service.as_ref() else {
return Ok(None);
};
let session_id = meerkat_core::types::SessionId::parse(session_id_str)
.map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
Ok(
meerkat_core::service::SessionServiceCommsExt::comms_runtime(
session_service.as_ref(),
&session_id,
)
.await,
)
}
#[allow(dead_code)]
pub(crate) async fn active_input_ids_for_session(
&self,
session_id_str: &str,
) -> Result<Option<Vec<String>>, MobRuntimeError> {
if session_id_str.trim().is_empty() {
return Err(MobRuntimeError::InvalidInput(
"session_id must not be empty",
));
}
let Some(session_service) = self.session_service.as_ref() else {
return Ok(None);
};
let Some(runtime_adapter) = session_service.runtime_adapter() else {
return Ok(None);
};
let session_id = meerkat_core::types::SessionId::parse(session_id_str)
.map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
let input_ids = meerkat_runtime::service_ext::SessionServiceRuntimeExt::list_active_inputs(
runtime_adapter.as_ref(),
&session_id,
)
.await
.map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))?;
Ok(Some(
input_ids.into_iter().map(|id| id.to_string()).collect(),
))
}
#[allow(dead_code)]
pub(crate) async fn ensure_comms_drain_for_session(
&self,
session_id_str: &str,
) -> Result<Option<bool>, MobRuntimeError> {
if session_id_str.trim().is_empty() {
return Err(MobRuntimeError::InvalidInput(
"session_id must not be empty",
));
}
let Some(session_service) = self.session_service.as_ref() else {
return Ok(None);
};
let Some(runtime_adapter) = session_service.runtime_adapter() else {
return Ok(None);
};
let session_id = meerkat_core::types::SessionId::parse(session_id_str)
.map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
let comms_runtime = meerkat_core::service::SessionServiceCommsExt::comms_runtime(
session_service.as_ref(),
&session_id,
)
.await;
if let Some(comms) = comms_runtime {
let _handle = meerkat_runtime::comms_drain::spawn_comms_drain(
runtime_adapter.clone(),
session_id,
comms,
None,
);
Ok(Some(true))
} else {
Ok(Some(false))
}
}
pub fn session_service(&self) -> Option<&Arc<dyn MobSessionService>> {
self.session_service.as_ref()
}
pub fn binary_blob_store(&self) -> Option<Arc<dyn BinaryBlobStore>> {
self.binary_blob_store.clone()
}
}
pub fn member_entry_to_json(entry: &meerkat_mob::runtime::MobMemberListEntry) -> serde_json::Value {
serde_json::to_value(entry).unwrap_or(serde_json::Value::Null)
}
pub fn content_input_has_images(content: &meerkat_core::ContentInput) -> bool {
match content {
meerkat_core::ContentInput::Text(_) => false,
meerkat_core::ContentInput::Blocks(blocks) => blocks
.iter()
.any(|block| matches!(block, meerkat_core::ContentBlock::Image { .. })),
}
}
pub fn model_capabilities_for_model(
provider: Provider,
model: &str,
) -> crate::runtime::ConsoleModelCapabilities {
let image_input = meerkat_core::model_profile::profile_for(provider, model)
.map(|profile| profile.vision)
.unwrap_or(false);
crate::runtime::ConsoleModelCapabilities { image_input }
}
pub fn model_capabilities_for_profile(
profile: &Profile,
) -> crate::runtime::ConsoleModelCapabilities {
let image_input = Provider::infer_from_model(&profile.model)
.and_then(|provider| meerkat_core::model_profile::profile_for(provider, &profile.model))
.map(|profile| profile.vision)
.unwrap_or(false);
crate::runtime::ConsoleModelCapabilities { image_input }
}
pub fn model_capabilities_for_role(
definition: &MobDefinition,
role: &str,
) -> crate::runtime::ConsoleModelCapabilities {
let profile_name = ProfileName::from(role);
definition
.resolve_inline_profile(&profile_name)
.map(model_capabilities_for_profile)
.unwrap_or(crate::runtime::ConsoleModelCapabilities { image_input: false })
}
pub fn model_capabilities_for_member_entry(
definition: &MobDefinition,
entry: &meerkat_mob::runtime::MobMemberListEntry,
) -> crate::runtime::ConsoleModelCapabilities {
model_capabilities_for_role(definition, entry.role.as_str())
}
pub async fn model_capabilities_for_member(
handle: &MobHandle,
session_service: Option<&Arc<dyn MobSessionService>>,
member_id: &meerkat_mob::ids::MeerkatId,
) -> crate::runtime::ConsoleModelCapabilities {
if let Some(service) = session_service
&& let Some(session_id) = handle.resolve_bridge_session_id(member_id).await
&& let Ok(view) = service.read(&session_id).await
{
return model_capabilities_for_model(view.state.provider, &view.state.model);
}
handle
.get_member(member_id)
.await
.map(|member| model_capabilities_for_role(handle.definition(), member.role.as_str()))
.unwrap_or(crate::runtime::ConsoleModelCapabilities { image_input: false })
}
pub async fn assert_member_accepts_images(
handle: &MobHandle,
session_service: Option<&Arc<dyn MobSessionService>>,
member_id: &str,
content: &meerkat_core::ContentInput,
) -> Result<(), MobRuntimeError> {
if !content_input_has_images(content) {
return Ok(());
}
let mid = meerkat_mob::ids::MeerkatId::from(member_id);
let Some(member) = handle.get_member(&mid).await else {
return Err(MobRuntimeError::InvalidInput("member not found"));
};
let caps = model_capabilities_for_member(handle, session_service, &member.agent_identity).await;
if caps.image_input {
Ok(())
} else {
Err(MobRuntimeError::InvalidInput(
"target member model cannot accept image input",
))
}
}
pub async fn send_message_on_mob(
handle: &MobHandle,
member_id: &str,
content: impl Into<meerkat_core::ContentInput>,
) -> Result<String, MobRuntimeError> {
send_message_on_mob_with_mode(
handle,
member_id,
content,
meerkat_core::types::HandlingMode::Queue,
)
.await
}
pub async fn send_message_on_mob_with_mode(
handle: &MobHandle,
member_id: &str,
content: impl Into<meerkat_core::ContentInput>,
handling_mode: meerkat_core::types::HandlingMode,
) -> Result<String, MobRuntimeError> {
if member_id.trim().is_empty() {
return Err(MobRuntimeError::InvalidInput("member_id must not be empty"));
}
let content = content.into();
let is_empty = match &content {
meerkat_core::ContentInput::Text(s) => s.trim().is_empty(),
meerkat_core::ContentInput::Blocks(blocks) => blocks.is_empty(),
};
if is_empty {
return Err(MobRuntimeError::InvalidInput("content must not be empty"));
}
let mid = meerkat_mob::ids::MeerkatId::from(member_id);
let _receipt = handle
.member(&mid)
.await?
.send(content, handling_mode)
.await?;
let session_id = handle
.resolve_bridge_session_id(&mid)
.await
.ok_or_else(|| {
MobRuntimeError::Mob(MobError::Internal(
"member has no bridge session after send".to_string(),
))
})?;
Ok(session_id.to_string())
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
struct EmptyDispatcher;
#[async_trait::async_trait]
impl meerkat_core::AgentToolDispatcher for EmptyDispatcher {
fn tools(&self) -> Arc<[Arc<meerkat_core::types::ToolDef>]> {
Vec::<Arc<meerkat_core::types::ToolDef>>::new().into()
}
async fn dispatch(
&self,
call: meerkat_core::types::ToolCallView<'_>,
) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
Err(meerkat_core::ToolError::not_found(call.name))
}
fn capabilities(&self) -> meerkat_core::agent::DispatcherCapabilities {
meerkat_core::agent::DispatcherCapabilities::default()
}
}
fn wrapper_with_overrides(
overrides: ImplicitDelegateRetirementOverrides,
) -> AutoWireParentMobToolDispatcher {
AutoWireParentMobToolDispatcher {
inner: Arc::new(EmptyDispatcher),
implicit_delegate_retirement_overrides: overrides,
}
}
#[test]
fn delegate_tool_schema_exposes_idle_retire_secs() {
let tool = meerkat_core::types::ToolDef::new(
"delegate",
"Delegate work",
serde_json::json!({
"type": "object",
"properties": {
"task": {"type": "string"}
},
"required": ["task"]
}),
);
let patched = delegate_tool_def_with_idle_retire_secs(&tool);
let idle_retire_secs = &patched.input_schema["properties"]["idle_retire_secs"];
assert!(patched.description.contains("IDLE RETIREMENT:"));
assert_eq!(idle_retire_secs["anyOf"][0]["type"], "integer");
assert_eq!(idle_retire_secs["anyOf"][0]["minimum"], 0);
assert_eq!(idle_retire_secs["anyOf"][1]["type"], "null");
}
#[test]
fn mob_spawn_tool_schema_exposes_opt_in_idle_retire_secs() {
let tool = meerkat_core::types::ToolDef::new(
"mob_spawn_member",
"Spawn member",
serde_json::json!({
"type": "object",
"properties": {
"profile": {"type": "string"},
"member_id": {"type": "string"}
},
"required": ["profile", "member_id"]
}),
);
let patched = mob_spawn_tool_def_with_idle_retire_secs(&tool);
let idle_retire_secs = &patched.input_schema["properties"]["idle_retire_secs"];
assert!(
patched
.description
.contains("Omit idle_retire_secs to leave this spawned member out")
);
assert_eq!(idle_retire_secs["anyOf"][0]["type"], "integer");
assert_eq!(idle_retire_secs["anyOf"][0]["minimum"], 0);
assert_eq!(idle_retire_secs["anyOf"][1]["type"], "null");
}
#[tokio::test]
async fn auto_wire_wrapper_preserves_ops_lifecycle_binding() {
use meerkat_core::AgentToolDispatcher;
use std::sync::atomic::{AtomicBool, Ordering};
struct BindAwareDispatcher {
bound: Arc<AtomicBool>,
}
#[async_trait::async_trait]
impl meerkat_core::AgentToolDispatcher for BindAwareDispatcher {
fn tools(&self) -> Arc<[Arc<meerkat_core::types::ToolDef>]> {
Vec::<Arc<meerkat_core::types::ToolDef>>::new().into()
}
async fn dispatch(
&self,
call: meerkat_core::types::ToolCallView<'_>,
) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
Err(meerkat_core::ToolError::not_found(call.name))
}
fn capabilities(&self) -> meerkat_core::agent::DispatcherCapabilities {
meerkat_core::agent::DispatcherCapabilities {
ops_lifecycle: true,
}
}
fn bind_ops_lifecycle(
self: Arc<Self>,
_registry: Arc<dyn meerkat_core::ops_lifecycle::OpsLifecycleRegistry>,
_owner_bridge_session_id: meerkat_core::types::SessionId,
) -> Result<meerkat_core::agent::BindOutcome, meerkat_core::agent::OpsLifecycleBindError>
{
self.bound.store(true, Ordering::SeqCst);
Ok(meerkat_core::agent::BindOutcome::Bound(self))
}
}
let bound = Arc::new(AtomicBool::new(false));
let dispatcher = Arc::new(AutoWireParentMobToolDispatcher {
inner: Arc::new(BindAwareDispatcher {
bound: Arc::clone(&bound),
}),
implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides::default(),
});
assert!(dispatcher.capabilities().ops_lifecycle);
let outcome = dispatcher
.bind_ops_lifecycle(
Arc::new(meerkat_runtime::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
meerkat_core::types::SessionId::new(),
)
.expect("wrapper should delegate ops lifecycle binding");
assert!(outcome.was_bound());
assert!(bound.load(Ordering::SeqCst));
assert!(outcome.into_dispatcher().capabilities().ops_lifecycle);
}
#[test]
fn delegate_idle_retire_secs_arg_is_stripped_and_parsed() {
let mut args = serde_json::json!({
"task": "inspect",
"idle_retire_secs": 42
});
let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
.expect("valid idle retire arg");
assert_eq!(parsed, Some(DelegateIdleRetireOverride::Seconds(42)));
assert!(args.get("idle_retire_secs").is_none());
}
#[test]
fn delegate_idle_retire_secs_null_disables_member_retirement() {
let mut args = serde_json::json!({
"task": "inspect",
"idle_retire_secs": null
});
let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
.expect("valid idle retire arg");
assert_eq!(parsed, Some(DelegateIdleRetireOverride::Disabled));
assert!(args.get("idle_retire_secs").is_none());
}
#[test]
fn delegate_idle_retire_secs_omitted_inherits_runtime_default() {
let mut args = serde_json::json!({"task": "inspect"});
let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
.expect("omitted idle retire arg");
assert_eq!(parsed, None);
assert_eq!(args, serde_json::json!({"task": "inspect"}));
}
#[test]
fn delegate_idle_retire_secs_rejects_negative_or_fractional_values() {
let mut negative = serde_json::json!({"task": "inspect", "idle_retire_secs": -1});
let mut fractional = serde_json::json!({"task": "inspect", "idle_retire_secs": 1.5});
assert!(delegate_idle_retire_override_from_args("delegate", &mut negative).is_err());
assert!(delegate_idle_retire_override_from_args("delegate", &mut fractional).is_err());
}
#[test]
fn mob_spawn_idle_retire_targets_use_args_when_result_omits_mob_id() {
let args = serde_json::json!({
"mob_id": "ob3",
"profile": "review-worker",
"member_id": "review-worker-vibe-forward",
});
let fallback_targets = idle_retire_targets_from_spawn_args(&args);
assert_eq!(
fallback_targets,
vec![IdleRetireTarget {
mob_id: "ob3".to_string(),
member_id: "review-worker-vibe-forward".to_string(),
}]
);
assert_eq!(
idle_retire_targets_from_outcome_text(
r#"{"agent_identity":"review-worker-vibe-forward","member_ref":"opaque"}"#,
&fallback_targets,
),
fallback_targets
);
}
#[test]
fn mob_spawn_idle_retire_targets_support_canonical_specs_shape() {
let args = serde_json::json!({
"mob_id": "ob3",
"specs": [
{"profile": "person-worker", "agent_identity": "person-worker-a"},
{"profile": "person-worker", "member_id": "person-worker-b", "mob_id": "other"}
]
});
let fallback_targets = idle_retire_targets_from_spawn_args(&args);
assert_eq!(
fallback_targets,
vec![
IdleRetireTarget {
mob_id: "ob3".to_string(),
member_id: "person-worker-a".to_string(),
},
IdleRetireTarget {
mob_id: "other".to_string(),
member_id: "person-worker-b".to_string(),
},
]
);
assert_eq!(
idle_retire_targets_from_outcome_text(
r#"{"members":[{"agent_identity":"person-worker-a"},{"agent_identity":"person-worker-b","mob_id":"other"}]}"#,
&fallback_targets,
),
fallback_targets
);
}
#[tokio::test]
async fn implicit_delegate_retirement_overrides_round_trip_per_member() {
let overrides = ImplicitDelegateRetirementOverrides::default();
overrides
.set("mob-a", "worker-1", DelegateIdleRetireOverride::Seconds(12))
.await;
overrides
.set("mob-a", "worker-2", DelegateIdleRetireOverride::Disabled)
.await;
assert_eq!(
overrides.get("mob-a", "worker-1").await,
Some(DelegateIdleRetireOverride::Seconds(12))
);
assert_eq!(
overrides.get("mob-a", "worker-2").await,
Some(DelegateIdleRetireOverride::Disabled)
);
assert_eq!(overrides.get("mob-a", "worker-3").await, None);
}
#[tokio::test]
async fn mob_spawn_idle_retire_registration_uses_spawn_args_when_result_omits_mob_id() {
let overrides = ImplicitDelegateRetirementOverrides::default();
let dispatcher = wrapper_with_overrides(overrides.clone());
let fallback_targets = idle_retire_targets_from_spawn_args(&serde_json::json!({
"mob_id": "ob3",
"member_id": "review-worker-vibe-forward",
}));
let outcome =
meerkat_core::ToolDispatchOutcome::sync_result(meerkat_core::types::ToolResult::new(
"spawn-1".to_string(),
r#"{"agent_identity":"review-worker-vibe-forward","member_ref":"opaque"}"#
.to_string(),
false,
));
dispatcher
.register_idle_retire_override_from_outcome(
&outcome,
Some(DelegateIdleRetireOverride::Seconds(900)),
&fallback_targets,
)
.await;
assert_eq!(
overrides.get("ob3", "review-worker-vibe-forward").await,
Some(DelegateIdleRetireOverride::Seconds(900))
);
}
#[test]
fn image_generation_substrate_defaults_off_for_inline_profiles() {
let definition = meerkat_mob::MobDefinition::from_toml(
r#"
[mob]
id = "test"
[profiles.worker]
model = "gpt-5.5"
[profiles.worker.tools]
builtins = true
"#,
)
.unwrap_or_else(|e| panic!("{e}"));
assert!(
!mob_definition_may_use_image_generation(&definition),
"inline profiles should not wire the image substrate unless a profile opts in"
);
}
#[test]
fn image_generation_substrate_follows_profile_tool_config() {
let definition = meerkat_mob::MobDefinition::from_toml(
r#"
[mob]
id = "test"
[profiles.commander]
model = "gpt-5.5"
[profiles.commander.tools]
builtins = true
image_generation = true
[profiles.investigator]
model = "gpt-5.5"
[profiles.investigator.tools]
builtins = true
image_generation = false
"#,
)
.unwrap_or_else(|e| panic!("{e}"));
let commander = definition.profiles["commander"].as_inline().unwrap();
let investigator = definition.profiles["investigator"].as_inline().unwrap();
assert!(commander.tools.image_generation);
assert!(!investigator.tools.image_generation);
assert!(
mob_definition_may_use_image_generation(&definition),
"one opt-in profile is enough to wire substrate; Meerkat gates visibility per profile"
);
}
#[test]
fn image_generation_profiles_can_disable_builtins_with_meerkat_062() {
let definition = meerkat_mob::MobDefinition::from_toml(
r#"
[mob]
id = "test"
[profiles.commander]
model = "gpt-5.5"
[profiles.commander.tools]
builtins = false
image_generation = true
"#,
)
.unwrap_or_else(|e| panic!("{e}"));
let commander = definition.profiles["commander"].as_inline().unwrap();
assert!(!commander.tools.builtins);
assert!(commander.tools.image_generation);
assert!(
mob_definition_may_use_image_generation(&definition),
"image generation now has its own Meerkat tool gate"
);
}
#[test]
fn image_generation_substrate_is_conservative_for_realm_profile_refs() {
let definition = meerkat_mob::MobDefinition::from_toml(
r#"
[mob]
id = "test"
[profiles.worker]
realm_profile = "worker-v2"
"#,
)
.unwrap_or_else(|e| panic!("{e}"));
assert!(
mob_definition_may_use_image_generation(&definition),
"realm profiles resolve at spawn time, so MobKit wires substrate and lets Meerkat enforce profile policy"
);
}
#[test]
fn sanitize_llm_request_drops_replay_unsafe_server_tool_blocks() {
let request = meerkat_client::LlmRequest::new(
"gpt-5.5",
vec![meerkat_core::Message::BlockAssistant(
meerkat_core::BlockAssistantMessage::new(
vec![
meerkat_core::AssistantBlock::Text {
text: "done".to_string(),
meta: None,
},
meerkat_core::AssistantBlock::ServerToolContent {
id: Some("ws-stream".to_string()),
name: "web_search".to_string(),
content: serde_json::json!({
"type": "response.web_search_call.searching",
"item_id": "ws_123"
}),
meta: None,
},
meerkat_core::AssistantBlock::ServerToolContent {
id: Some("ws_123".to_string()),
name: "web_search_call".to_string(),
content: serde_json::json!({
"type": "web_search_call",
"id": "ws_123",
"status": "completed"
}),
meta: None,
},
meerkat_core::AssistantBlock::ServerToolContent {
id: None,
name: "web_search_annotations".to_string(),
content: serde_json::json!({
"type": "message_annotations",
"annotations": []
}),
meta: None,
},
],
meerkat_core::StopReason::EndTurn,
),
)],
);
let sanitized = sanitize_llm_request_for_stateless_replay(&request);
let meerkat_core::Message::BlockAssistant(assistant) = &sanitized.messages[0] else {
panic!("expected block assistant");
};
assert_eq!(assistant.blocks.len(), 2);
assert!(matches!(
assistant.blocks[0],
meerkat_core::AssistantBlock::Text { .. }
));
assert!(matches!(
assistant.blocks[1],
meerkat_core::AssistantBlock::ServerToolContent { ref name, .. }
if name == "web_search_call"
));
}
#[test]
fn sanitize_llm_request_preserves_generated_images_for_meerkat_062() {
let request = meerkat_client::LlmRequest::new(
"gpt-5.5",
vec![meerkat_core::Message::BlockAssistant(
meerkat_core::BlockAssistantMessage::new(
vec![
meerkat_core::AssistantBlock::Text {
text: "visible".to_string(),
meta: None,
},
generated_image_block_for_test(),
],
meerkat_core::StopReason::EndTurn,
),
)],
);
let sanitized = sanitize_llm_request_for_stateless_replay(&request);
let meerkat_core::Message::BlockAssistant(original_assistant) = &request.messages[0] else {
panic!("expected original block assistant");
};
assert!(
original_assistant
.blocks
.iter()
.any(|block| matches!(block, meerkat_core::AssistantBlock::Image { .. })),
"request-view sanitization must not rewrite canonical caller-owned messages"
);
let meerkat_core::Message::BlockAssistant(sanitized_assistant) = &sanitized.messages[0]
else {
panic!("expected sanitized block assistant");
};
assert!(
sanitized_assistant
.blocks
.iter()
.any(|block| matches!(block, meerkat_core::AssistantBlock::Image { .. })),
"Meerkat 0.6.2 owns provider replay projection for generated images"
);
}
#[derive(Default)]
struct CapturingLlmClient {
projected_messages: std::sync::Mutex<Vec<meerkat_core::Message>>,
}
#[async_trait]
impl LlmClient for CapturingLlmClient {
fn project_replay_messages(
&self,
messages: &[meerkat_core::Message],
) -> Result<Vec<meerkat_core::Message>, meerkat_client::LlmError> {
*self
.projected_messages
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = messages.to_vec();
Ok(messages.to_vec())
}
fn stream<'a>(&'a self, _request: &'a LlmRequest) -> LlmStream<'a> {
Box::pin(futures::stream::iter([Ok(
meerkat_client::LlmEvent::Done {
outcome: meerkat_client::LlmDoneOutcome::Success {
stop_reason: meerkat_core::StopReason::EndTurn,
},
},
)]))
}
fn provider(&self) -> &'static str {
"openai"
}
async fn health_check(&self) -> Result<(), meerkat_client::LlmError> {
Ok(())
}
}
#[test]
fn replay_sanitizing_llm_client_delegates_provider_projection() {
let capture = Arc::new(CapturingLlmClient::default());
let inner: Arc<dyn LlmClient> = capture.clone();
let wrapped = ReplaySanitizingLlmClient::new(inner);
let messages = vec![meerkat_core::Message::BlockAssistant(
meerkat_core::BlockAssistantMessage::new(
vec![
meerkat_core::AssistantBlock::Text {
text: "visible".to_string(),
meta: None,
},
meerkat_core::AssistantBlock::ServerToolContent {
id: Some("ws-stream".to_string()),
name: "web_search".to_string(),
content: serde_json::json!({
"type": "response.web_search_call.searching",
"item_id": "ws_123"
}),
meta: None,
},
],
meerkat_core::StopReason::EndTurn,
),
)];
let projected = wrapped
.project_replay_messages(&messages)
.expect("wrapped client should delegate provider projection");
let seen = capture
.projected_messages
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone();
let meerkat_core::Message::BlockAssistant(assistant) = &seen[0] else {
panic!("expected block assistant");
};
assert_eq!(
assistant.blocks.len(),
1,
"MobKit sanitization must happen before Meerkat provider projection"
);
assert!(matches!(
assistant.blocks[0],
meerkat_core::AssistantBlock::Text { .. }
));
assert_eq!(
serde_json::to_value(&projected).expect("projected messages serialize"),
serde_json::to_value(&seen).expect("seen messages serialize")
);
}
#[derive(Default)]
struct CapturingAgentLlmClient {
seen_messages: std::sync::Mutex<Vec<meerkat_core::Message>>,
}
#[async_trait]
impl meerkat_core::AgentLlmClient for CapturingAgentLlmClient {
async fn stream_response(
&self,
messages: &[meerkat_core::Message],
_tools: &[Arc<meerkat_core::ToolDef>],
_max_tokens: u32,
_temperature: Option<f32>,
_provider_params: Option<
&meerkat_core::lifecycle::run_primitive::ProviderParamsOverride,
>,
) -> Result<meerkat_core::agent::LlmStreamResult, meerkat_core::AgentError> {
*self
.seen_messages
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = messages.to_vec();
Ok(meerkat_core::agent::LlmStreamResult::new(
Vec::new(),
meerkat_core::StopReason::EndTurn,
meerkat_core::Usage::default(),
))
}
fn provider(&self) -> &'static str {
"openai"
}
fn model(&self) -> &'static str {
"gpt-5.5"
}
}
#[tokio::test]
async fn sanitize_agent_llm_client_drops_replay_unsafe_server_tool_blocks() {
let capture = Arc::new(CapturingAgentLlmClient::default());
let inner: Arc<dyn meerkat_core::AgentLlmClient> = capture.clone();
let wrapped = ReplaySanitizingAgentLlmClient::wrap(inner);
let messages = vec![meerkat_core::Message::BlockAssistant(
meerkat_core::BlockAssistantMessage::new(
vec![
meerkat_core::AssistantBlock::Text {
text: "visible".to_string(),
meta: None,
},
meerkat_core::AssistantBlock::ServerToolContent {
id: Some("ws-stream".to_string()),
name: "web_search".to_string(),
content: serde_json::json!({
"type": "response.web_search_call.searching",
"item_id": "ws_123"
}),
meta: None,
},
meerkat_core::AssistantBlock::ServerToolContent {
id: Some("ok".to_string()),
name: "web_search_call".to_string(),
content: serde_json::json!({
"type": "web_search_call",
"id": "ws_123",
"status": "completed"
}),
meta: None,
},
],
meerkat_core::StopReason::EndTurn,
),
)];
let tools: Vec<Arc<meerkat_core::ToolDef>> = Vec::new();
wrapped
.stream_response(&messages, &tools, 512, None, None)
.await
.expect("wrapped client should delegate");
let seen = capture
.seen_messages
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone();
let meerkat_core::Message::BlockAssistant(assistant) = &seen[0] else {
panic!("expected block assistant");
};
assert_eq!(assistant.blocks.len(), 2);
assert!(matches!(
assistant.blocks[0],
meerkat_core::AssistantBlock::Text { .. }
));
assert!(matches!(
assistant.blocks[1],
meerkat_core::AssistantBlock::ServerToolContent { ref name, .. }
if name == "web_search_call"
));
}
fn generated_image_block_for_test() -> meerkat_core::AssistantBlock {
serde_json::from_value(serde_json::json!({
"block_type": "image",
"data": {
"image_id": "00000000-0000-0000-0000-000000000051",
"blob_ref": {
"blob_id": "sha256:test-generated-image",
"media_type": "image/png"
},
"media_type": "image/png",
"width": 1024,
"height": 1024,
"revised_prompt": { "disposition": "not_requested" },
"meta": { "provider": "not_emitted" }
}
}))
.expect("test image block should deserialize")
}
#[test]
fn sanitize_message_preserves_assistant_image_blocks() {
let message =
meerkat_core::Message::BlockAssistant(meerkat_core::BlockAssistantMessage::new(
vec![
meerkat_core::AssistantBlock::Text {
text: "Here is the image.".to_string(),
meta: None,
},
generated_image_block_for_test(),
],
meerkat_core::StopReason::EndTurn,
));
let sanitized = sanitize_message_for_stateless_replay(message);
let meerkat_core::Message::BlockAssistant(assistant) = sanitized else {
panic!("expected block assistant");
};
assert_eq!(assistant.blocks.len(), 2);
assert!(matches!(
assistant.blocks[0],
meerkat_core::AssistantBlock::Text { .. }
));
assert!(
matches!(
assistant.blocks[1],
meerkat_core::AssistantBlock::Image { .. }
),
"generated image blocks should reach Meerkat's provider projection"
);
}
#[test]
fn persistent_with_hook_wraps_session_service() {
let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
let store_path = dir.path().to_path_buf();
let Ok(sqlite) = meerkat_store::SqliteSessionStore::open(store_path.join("sessions.db"))
else {
panic!("failed to open sqlite session store");
};
let session_store: Arc<dyn SessionStore> = Arc::new(sqlite);
let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
panic!("failed to parse minimal mob definition");
};
let hook_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
let hook_called_clone = hook_called.clone();
let spec = MobBootstrapSpec::persistent_with_hook(
definition,
meerkat_mob::MobStorage::in_memory(),
store_path.clone(),
4,
session_store,
move |_req: &mut CreateSessionRequest| {
hook_called_clone.store(true, std::sync::atomic::Ordering::Relaxed);
Box::pin(async { Ok(()) })
},
);
assert!(
spec.runtime_adapter.is_some(),
"persistent_with_hook must provide a runtime adapter via spec.runtime_adapter"
);
assert!(
spec.session_service.runtime_adapter().is_some(),
"session service must own a runtime_store so archive/retire don't \
hit the store-only-projection rejection in meerkat-session"
);
assert!(
store_path.join("runtime.sqlite").exists(),
"persistent_inner must open a SqliteRuntimeStore at <store_path>/runtime.sqlite"
);
assert!(
!hook_called.load(std::sync::atomic::Ordering::Relaxed),
"hook must not be called before create_session"
);
}
#[test]
fn ephemeral_with_hook_creates_spec() {
let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
let store_path = dir.path().to_path_buf();
let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
panic!("failed to parse minimal mob definition");
};
let hook_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
let hook_called_clone = hook_called.clone();
let spec = MobBootstrapSpec::ephemeral_with_hook(
definition,
meerkat_mob::MobStorage::in_memory(),
store_path,
4,
None,
move |_req: &mut CreateSessionRequest| {
hook_called_clone.store(true, std::sync::atomic::Ordering::Relaxed);
Box::pin(async { Ok(()) })
},
);
assert!(spec.runtime_adapter.is_none());
assert!(
!hook_called.load(std::sync::atomic::Ordering::Relaxed),
"hook must not be called before create_session"
);
}
#[tokio::test]
async fn pre_build_hook_mutates_create_session_request() {
use std::sync::Mutex;
let captured = Arc::new(Mutex::new(None::<(String, Option<String>)>));
let captured_clone = captured.clone();
let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
let factory = AgentFactory::new(dir.path()).builtins(true);
let config = Config::default();
let builder = FactoryAgentBuilder::new(factory, config);
let inner: Arc<dyn MobSessionService> =
Arc::new(meerkat_session::EphemeralSessionService::new(builder, 4));
let hook: PreBuildHook = Arc::new(move |req: &mut CreateSessionRequest| {
req.model = "hooked-model".to_string();
req.system_prompt = Some("injected-prompt".to_string());
let labels = req.labels.get_or_insert_with(Default::default);
labels.insert("hook_label".to_string(), "hook_value".to_string());
let mut lock = captured_clone
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*lock = Some((req.model.clone(), req.system_prompt.clone()));
Box::pin(async { Ok(()) })
});
let wrapped = PreBuildMobSessionService {
inner,
hook,
after_create_hook: None,
runtime_adapter_override: None,
};
let req = CreateSessionRequest {
model: "original-model".to_string(),
prompt: meerkat_core::ContentInput::Text("test".to_string()),
render_metadata: None,
system_prompt: None,
max_tokens: None,
event_tx: None,
skill_references: None,
initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
build: None,
labels: None,
deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
};
let _ = meerkat_core::service::SessionService::create_session(&wrapped, req).await;
let (model, prompt) = captured
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
.expect("hook must have been called");
assert_eq!(model, "hooked-model", "hook must mutate the model");
assert_eq!(
prompt.as_deref(),
Some("injected-prompt"),
"hook must set the system prompt"
);
}
#[derive(Default)]
struct ForwardingProbe {
calls: Mutex<Vec<&'static str>>,
}
impl ForwardingProbe {
fn record(&self, call: &'static str) {
self.calls
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(call);
}
fn calls(&self) -> Vec<&'static str> {
self.calls
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
}
#[async_trait]
impl meerkat_core::service::SessionService for ForwardingProbe {
async fn create_session(
&self,
_req: CreateSessionRequest,
) -> Result<meerkat_core::types::RunResult, SessionError> {
Err(SessionError::Unsupported("create_session".to_string()))
}
async fn start_turn(
&self,
_id: &meerkat_core::types::SessionId,
_req: meerkat_core::service::StartTurnRequest,
) -> Result<meerkat_core::types::RunResult, SessionError> {
Err(SessionError::Unsupported("start_turn".to_string()))
}
async fn interrupt(
&self,
_id: &meerkat_core::types::SessionId,
) -> Result<(), SessionError> {
self.record("interrupt");
Ok(())
}
async fn read(
&self,
id: &meerkat_core::types::SessionId,
) -> Result<meerkat_core::service::SessionView, SessionError> {
Err(SessionError::NotFound { id: id.clone() })
}
async fn list(
&self,
_query: meerkat_core::service::SessionQuery,
) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
Ok(Vec::new())
}
async fn archive(&self, _id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
self.record("archive");
Ok(())
}
}
#[async_trait]
impl meerkat_core::service::SessionServiceCommsExt for ForwardingProbe {}
#[async_trait]
impl meerkat_core::service::SessionServiceControlExt for ForwardingProbe {
async fn append_system_context(
&self,
_id: &meerkat_core::types::SessionId,
_req: meerkat_core::service::AppendSystemContextRequest,
) -> Result<
meerkat_core::service::AppendSystemContextResult,
meerkat_core::service::SessionControlError,
> {
self.record("append_system_context");
Ok(meerkat_core::service::AppendSystemContextResult {
status: meerkat_core::service::AppendSystemContextStatus::Applied,
})
}
async fn stage_tool_results(
&self,
_id: &meerkat_core::types::SessionId,
_req: meerkat_core::service::StageToolResultsRequest,
) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
self.record("stage_tool_results");
Ok(meerkat_core::service::StageToolResultsResult {
accepted_result_count: 7,
})
}
}
#[async_trait]
impl meerkat_core::service::SessionServiceHistoryExt for ForwardingProbe {
async fn read_history(
&self,
id: &meerkat_core::types::SessionId,
_query: meerkat_core::service::SessionHistoryQuery,
) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
Err(SessionError::NotFound { id: id.clone() })
}
}
#[async_trait]
impl MobSessionService for ForwardingProbe {
fn supports_persistent_sessions(&self) -> bool {
true
}
fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
Some(Arc::new(meerkat_runtime::MeerkatMachine::ephemeral()))
}
async fn archive_with_mob_lifecycle_authority(
&self,
_session_id: &meerkat_core::types::SessionId,
) -> Result<(), SessionError> {
self.record("archive_with_mob_lifecycle_authority");
Ok(())
}
async fn stage_runtime_system_context_for_active_turn(
&self,
_session_id: &meerkat_core::types::SessionId,
_expected_run_id: &meerkat_core::lifecycle::RunId,
_appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
) -> Result<Option<Vec<u8>>, SessionError> {
self.record("stage_runtime_system_context_for_active_turn");
Ok(Some(b"snapshot".to_vec()))
}
async fn discard_runtime_system_context_for_active_turn(
&self,
_session_id: &meerkat_core::types::SessionId,
_expected_run_id: &meerkat_core::lifecycle::RunId,
_idempotency_keys: Vec<String>,
) -> Result<(), SessionError> {
self.record("discard_runtime_system_context_for_active_turn");
Ok(())
}
async fn active_turn_system_context_boundary_available(
&self,
_session_id: &meerkat_core::types::SessionId,
) -> Result<Option<bool>, SessionError> {
self.record("active_turn_system_context_boundary_available");
Ok(Some(true))
}
}
#[tokio::test]
async fn pre_build_wrapper_forwards_mob_authority_and_control_extensions() {
let probe = Arc::new(ForwardingProbe::default());
let inner: Arc<dyn MobSessionService> = probe.clone();
let wrapped = PreBuildMobSessionService {
inner,
hook: no_op_pre_build_hook(),
after_create_hook: None,
runtime_adapter_override: Some(Arc::new(meerkat_runtime::MeerkatMachine::ephemeral())),
};
let session_id = meerkat_core::types::SessionId::new();
MobSessionService::archive_with_mob_lifecycle_authority(&wrapped, &session_id)
.await
.expect("archive_with_mob_lifecycle_authority should forward to inner service");
let staged = meerkat_core::service::SessionServiceControlExt::stage_tool_results(
&wrapped,
&session_id,
meerkat_core::service::StageToolResultsRequest {
results: Vec::new(),
},
)
.await
.expect("stage_tool_results should forward to inner service");
assert_eq!(staged.accepted_result_count, 7);
let boundary_available = wrapped
.active_turn_system_context_boundary_available(&session_id)
.await
.expect("active-turn boundary probe should forward");
assert_eq!(boundary_available, Some(true));
let snapshot = wrapped
.stage_runtime_system_context_for_active_turn(
&session_id,
&meerkat_core::lifecycle::RunId::new(),
vec![meerkat_core::session::PendingSystemContextAppend {
text: "steer".to_string(),
source: Some("test".to_string()),
idempotency_key: Some("test".to_string()),
accepted_at: meerkat_core::time_compat::SystemTime::now(),
}],
)
.await
.expect("active-turn staging should forward");
assert_eq!(snapshot.as_deref(), Some(&b"snapshot"[..]));
wrapped
.discard_runtime_system_context_for_active_turn(
&session_id,
&meerkat_core::lifecycle::RunId::new(),
vec!["test".to_string()],
)
.await
.expect("active-turn rollback should forward");
assert_eq!(
probe.calls(),
vec![
"archive_with_mob_lifecycle_authority",
"stage_tool_results",
"active_turn_system_context_boundary_available",
"stage_runtime_system_context_for_active_turn",
"discard_runtime_system_context_for_active_turn",
]
);
}
#[test]
fn persistent_bootstrap_uses_sqlite_runtime_store() {
let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
let store_path = dir.path().to_path_buf();
let Ok(sqlite) = meerkat_store::SqliteSessionStore::open(store_path.join("sessions.db"))
else {
panic!("failed to open sqlite session store");
};
let session_store: Arc<dyn SessionStore> = Arc::new(sqlite);
let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
panic!("failed to parse minimal mob definition");
};
let spec = MobBootstrapSpec::persistent(
definition,
meerkat_mob::MobStorage::in_memory(),
store_path.clone(),
4,
session_store,
);
assert!(
spec.runtime_adapter.is_some(),
"persistent bootstrap must provide its own runtime adapter via spec.runtime_adapter"
);
assert!(
spec.session_service.runtime_adapter().is_some(),
"session service must own a runtime_store so archive/retire don't \
hit the store-only-projection rejection"
);
assert!(
store_path.join("runtime.sqlite").exists(),
"persistent_inner must open a SqliteRuntimeStore at <store_path>/runtime.sqlite"
);
}
#[test]
fn ephemeral_runtime_backed_uses_session_service_runtime_adapter() {
let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
let store_path = dir.path().to_path_buf();
let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
panic!("failed to parse minimal mob definition");
};
let spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
definition,
meerkat_mob::MobStorage::in_memory(),
store_path,
4,
None,
None,
None,
CapabilityFlags::default(),
None,
None,
);
assert!(
spec.runtime_adapter.is_some(),
"ephemeral_runtime_backed_inner must expose the shared runtime authority"
);
assert!(
spec.session_service.runtime_adapter().is_some(),
"session service must still expose a runtime adapter so autonomous-host comms can wire"
);
}
#[tokio::test]
async fn agent_mob_tools_expose_definition_profiles_as_realm_profiles() {
let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
let store_path = dir.path().to_path_buf();
let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
"[mob]\nid = \"test\"\n\n[profiles.investigation-worker]\nmodel = \"gpt-5.5\"\n[profiles.investigation-worker.tools]\ncomms = true\nmob = true\n\n[profiles.person-worker]\nmodel = \"gpt-5.5\"\n[profiles.person-worker.tools]\ncomms = true\n",
) else {
panic!("failed to parse mob definition with worker profiles");
};
let spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
definition,
meerkat_mob::MobStorage::in_memory(),
store_path,
4,
None,
None,
None,
CapabilityFlags::default(),
None,
None,
);
let state = spec
.agent_mob_mcp_state
.expect("agent mob MCP state should be installed");
let profiles = state
.realm_profile_list()
.await
.expect("definition profiles should list through agent mob tools");
let names = profiles
.iter()
.map(|profile| profile.name.as_str())
.collect::<Vec<_>>();
assert!(
names.contains(&"investigation-worker"),
"definition profiles must be visible to mob_profile_list so agents can create mobs that reference them"
);
assert!(names.contains(&"person-worker"));
let worker = state
.realm_profile_get("investigation-worker")
.await
.expect("definition profile lookup should succeed")
.expect("definition profile should exist");
assert_eq!(worker.profile.model, "gpt-5.5");
assert_eq!(
worker.revision, 0,
"definition-backed profiles are immutable runtime seeds, not persisted realm revisions"
);
}
#[tokio::test]
async fn agent_created_mobs_can_spawn_definition_seeded_realm_profiles() {
let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
let store_path = dir.path().to_path_buf();
let Ok(parent_definition) = meerkat_mob::MobDefinition::from_toml(
"[mob]\nid = \"parent\"\n\n[profiles.investigation-worker]\nmodel = \"gpt-5.5\"\n[profiles.investigation-worker.tools]\ncomms = true\nmob = true\n",
) else {
panic!("failed to parse parent mob definition");
};
let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
parent_definition,
meerkat_mob::MobStorage::in_memory(),
store_path,
4,
None,
None,
None,
CapabilityFlags::default(),
None,
None,
);
spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
let runtime = MobRuntime::bootstrap(spec)
.await
.unwrap_or_else(|e| panic!("{e}"));
let state = runtime
.agent_mob_mcp_state
.clone()
.expect("agent mob MCP state should be installed");
let Ok(child_definition) = meerkat_mob::MobDefinition::from_toml(
"[mob]\nid = \"child\"\n\n[profiles.investigation-worker]\nrealm_profile = \"investigation-worker\"\n",
) else {
panic!("failed to parse child mob definition");
};
let mob_id = state
.mob_create_definition(child_definition)
.await
.expect("child mob should be created");
state
.mob_spawn_spec(
&mob_id,
SpawnMemberSpec::new(
ProfileName::from("investigation-worker"),
meerkat_mob::AgentIdentity::from("investigation-worker:one"),
),
)
.await
.expect("created mob should resolve definition-seeded realm profile at spawn time");
}
#[tokio::test]
async fn ephemeral_runtime_backed_custom_session_store_persists_created_session() {
let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
let store_path = dir.path().to_path_buf();
let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
"[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\n[profiles.worker.tools]\ncomms = true\n",
) else {
panic!("failed to parse minimal mob definition");
};
let custom_store: Arc<dyn SessionStore> = Arc::new(meerkat_store::MemoryStore::new());
let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
definition,
meerkat_mob::MobStorage::in_memory(),
store_path,
4,
Some(custom_store.clone()),
None,
None,
CapabilityFlags::default(),
None,
None,
);
spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
let runtime = MobRuntime::bootstrap(spec)
.await
.unwrap_or_else(|e| panic!("{e}"));
let mid = meerkat_mob::ids::MeerkatId::from("worker:one");
runtime
.handle
.spawn_spec(SpawnMemberSpec::new(
ProfileName::from("worker"),
mid.clone(),
))
.await
.unwrap_or_else(|e| panic!("{e}"));
let session_id = runtime
.handle
.resolve_bridge_session_id(&mid)
.await
.unwrap_or_else(|| panic!("spawned worker has no bridge session id"));
let stored = custom_store
.load(&session_id)
.await
.unwrap_or_else(|e| panic!("{e}"));
assert!(
stored.is_some(),
"ephemeral runtime-backed builds with a custom store must persist through that store"
);
}
#[tokio::test]
async fn ephemeral_runtime_backed_custom_session_store_resumes_after_runtime_restart() {
let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
let store_path = dir.path().to_path_buf();
let definition_toml = "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\n[profiles.worker.tools]\ncomms = true\n";
let Ok(definition) = meerkat_mob::MobDefinition::from_toml(definition_toml) else {
panic!("failed to parse minimal mob definition");
};
let custom_store: Arc<dyn SessionStore> = Arc::new(meerkat_store::MemoryStore::new());
let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
definition,
meerkat_mob::MobStorage::in_memory(),
store_path.clone(),
4,
Some(custom_store.clone()),
None,
None,
CapabilityFlags::default(),
None,
None,
);
spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
let runtime = MobRuntime::bootstrap(spec)
.await
.unwrap_or_else(|e| panic!("{e}"));
let mid = meerkat_mob::ids::MeerkatId::from("worker:one");
runtime
.handle
.spawn_spec(SpawnMemberSpec::new(
ProfileName::from("worker"),
mid.clone(),
))
.await
.unwrap_or_else(|e| panic!("{e}"));
let session_id = runtime
.handle
.resolve_bridge_session_id(&mid)
.await
.unwrap_or_else(|| panic!("spawned worker has no bridge session id"));
drop(runtime);
let Ok(definition) = meerkat_mob::MobDefinition::from_toml(definition_toml) else {
panic!("failed to parse minimal mob definition");
};
let mut restarted_spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
definition,
meerkat_mob::MobStorage::in_memory(),
store_path,
4,
Some(custom_store),
None,
None,
CapabilityFlags::default(),
None,
None,
);
restarted_spec.options.default_llm_client =
Some(Arc::new(meerkat_client::TestClient::default()));
let restarted = MobRuntime::bootstrap(restarted_spec)
.await
.unwrap_or_else(|e| panic!("{e}"));
let mut resume_spec = SpawnMemberSpec::new(ProfileName::from("worker"), mid.clone());
resume_spec.launch_mode = meerkat_mob::MemberLaunchMode::Resume {
bridge_session_id: session_id.clone(),
};
restarted
.handle
.spawn_spec(resume_spec)
.await
.unwrap_or_else(|e| panic!("resume should load the external session snapshot: {e}"));
let resumed_session_id = restarted
.handle
.resolve_bridge_session_id(&mid)
.await
.unwrap_or_else(|| panic!("resumed worker has no bridge session id"));
assert_eq!(resumed_session_id, session_id);
}
#[test]
fn ephemeral_bootstrap_without_image_generation_stays_direct() {
let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
let store_path = dir.path().to_path_buf();
let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
"[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\nruntime_mode = \"autonomous_host\"\n[profiles.worker.tools]\ncomms = true\n",
) else {
panic!("failed to parse definition");
};
let spec = MobBootstrapSpec::ephemeral_inner(
definition,
meerkat_mob::MobStorage::in_memory(),
store_path,
4,
None,
None,
CapabilityFlags::default(),
None,
None,
);
assert!(
spec.runtime_adapter.is_none(),
"public ephemeral builds only need a runtime adapter when the definition may use image generation"
);
}
#[test]
fn ephemeral_bootstrap_with_image_generation_shares_runtime_adapter() {
let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
let store_path = dir.path().to_path_buf();
let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
r#"
[mob]
id = "test"
[profiles.commander]
model = "gpt-5.5"
[profiles.commander.tools]
builtins = true
image_generation = true
"#,
) else {
panic!("failed to parse image-generation definition");
};
let spec = MobBootstrapSpec::ephemeral(
definition,
meerkat_mob::MobStorage::in_memory(),
store_path,
4,
None,
);
let spec_adapter = spec
.runtime_adapter
.as_ref()
.expect("image-generation ephemeral builds must expose a runtime adapter");
let service_adapter = spec
.session_service
.runtime_adapter()
.expect("session service must expose the same runtime adapter");
assert!(
spec_adapter.shares_runtime_persistence_with(&service_adapter),
"image-generation tool state and session state must share one runtime authority"
);
}
#[test]
fn normalize_runtime_turn_request_strips_runtime_owned_semantics() {
let req = meerkat_core::service::StartTurnRequest {
prompt: meerkat_core::ContentInput::Text("checkpoint".to_string()),
system_prompt: Some("system".to_string()),
event_tx: None,
runtime: meerkat_core::service::StartTurnRuntimeSemantics {
render_metadata: Some(meerkat_core::types::RenderMetadata {
class: meerkat_core::types::RenderClass::OpsProgress,
salience: meerkat_core::types::RenderSalience::Urgent,
}),
handling_mode: meerkat_core::types::HandlingMode::Steer,
skill_references: None,
flow_tool_overlay: None,
pre_turn_context_appends: Vec::new(),
typed_turn_appends: Vec::new(),
turn_metadata: None,
},
};
let expected_prompt = req.prompt.clone();
let expected_system_prompt = req.system_prompt.clone();
let normalized = normalize_runtime_turn_request(req);
assert_eq!(
normalized.runtime.handling_mode,
meerkat_core::types::HandlingMode::Queue,
"runtime-applied turns must downgrade Steer before reaching direct session services"
);
assert!(
normalized.runtime.render_metadata.is_none(),
"runtime-owned render metadata must not be forwarded through the direct agent path"
);
assert_eq!(normalized.prompt, expected_prompt);
assert_eq!(normalized.system_prompt, expected_system_prompt);
}
#[test]
fn session_created_context_fields() {
let ctx = SessionCreatedContext {
model: "claude-sonnet-4-5".to_string(),
labels: std::collections::BTreeMap::from([(
"agent_type".to_string(),
"lead".to_string(),
)]),
system_prompt: Some("You are a lead agent.".to_string()),
};
assert_eq!(ctx.model, "claude-sonnet-4-5");
assert_eq!(ctx.labels["agent_type"], "lead");
assert_eq!(ctx.system_prompt.as_deref(), Some("You are a lead agent."));
}
#[tokio::test]
async fn session_hook_default_impls_are_noop() {
struct EmptyHook;
#[async_trait]
impl SessionHook for EmptyHook {}
let hook = EmptyHook;
let mut req = CreateSessionRequest {
model: "test".to_string(),
prompt: meerkat_core::ContentInput::Text("test".to_string()),
render_metadata: None,
system_prompt: None,
max_tokens: None,
event_tx: None,
skill_references: None,
initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
build: None,
labels: None,
deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
};
hook.before_create(&mut req).await.unwrap();
let ctx = SessionCreatedContext {
model: "test".to_string(),
labels: Default::default(),
system_prompt: None,
};
hook.after_create(&meerkat_core::types::SessionId::new(), &ctx)
.await;
}
#[tokio::test]
async fn session_hook_before_create_can_abort() {
struct AbortHook;
#[async_trait]
impl SessionHook for AbortHook {
async fn before_create(
&self,
_req: &mut CreateSessionRequest,
) -> Result<(), SessionError> {
Err(SessionError::Unsupported("hook abort".into()))
}
}
let hook = AbortHook;
let mut req = CreateSessionRequest {
model: "test".to_string(),
prompt: meerkat_core::ContentInput::Text("test".to_string()),
render_metadata: None,
system_prompt: None,
max_tokens: None,
event_tx: None,
skill_references: None,
initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
build: None,
labels: None,
deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
};
let result = hook.before_create(&mut req).await;
assert!(result.is_err());
}
#[tokio::test]
async fn session_hook_before_create_mutates_request() {
struct MutatingHook;
#[async_trait]
impl SessionHook for MutatingHook {
async fn before_create(
&self,
req: &mut CreateSessionRequest,
) -> Result<(), SessionError> {
req.model = "hook-overridden".to_string();
req.system_prompt = Some("injected by hook".to_string());
Ok(())
}
}
let hook = MutatingHook;
let mut req = CreateSessionRequest {
model: "original".to_string(),
prompt: meerkat_core::ContentInput::Text("test".to_string()),
render_metadata: None,
system_prompt: None,
max_tokens: None,
event_tx: None,
skill_references: None,
initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
build: None,
labels: None,
deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
};
hook.before_create(&mut req).await.unwrap();
assert_eq!(req.model, "hook-overridden");
assert_eq!(req.system_prompt.as_deref(), Some("injected by hook"));
}
#[test]
fn recoverable_lifecycle_cleanup_accepts_ambiguous_member_cleanup() {
let error = "previous member cleanup ambiguous for member rt:deep-investigator:singleton:0";
assert!(is_previous_member_cleanup_ambiguous_error(error));
assert!(is_recoverable_lifecycle_cleanup_error(error));
}
#[test]
fn recoverable_lifecycle_cleanup_preserves_archive_cancel_race() {
let error = "internal error: disposal completed but ArchiveSession failed: \
session error: agent error: Internal error: runtime cancel-before-retire failed \
for 019e3c52-0f1b-73d3-a5c7-4b21c2bbf131: Runtime not ready: running";
assert!(is_recoverable_lifecycle_cleanup_error(error));
}
#[test]
fn recoverable_lifecycle_cleanup_rejects_unrelated_errors() {
assert!(!is_recoverable_lifecycle_cleanup_error(
"actor task dropped"
));
assert!(!is_recoverable_lifecycle_cleanup_error(
"model provider returned rate limit"
));
}
}