1use std::collections::BTreeMap;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use meerkat::{AgentFactory, Config, FactoryAgentBuilder, SessionStore};
10use meerkat_client::types::LlmStream;
11use meerkat_client::{LlmClient, LlmRequest};
12use meerkat_core::agent::CommsRuntime;
13use meerkat_core::service::{
14 CreateSessionRequest, SessionError, SessionHistoryPage, SessionHistoryQuery,
15 SessionServiceHistoryExt,
16};
17use meerkat_core::{AgentSessionStore, AssistantBlock, Message, Provider};
18use meerkat_mob::{
19 MobBuilder, MobDefinition, MobError, MobHandle, MobSessionService, MobStorage, Profile,
20 ProfileName, SpawnMemberSpec,
21};
22use meerkat_runtime::input_state::StoredInputState;
23use meerkat_runtime::store::MachineLifecycleCommit;
24use meerkat_store::StoreAdapter;
25use serde_json::Value;
26
27use crate::blob_store::{
28 Base64BlobStoreAdapter, BinaryBlobStore, BinaryBlobStoreAdapter, ObjectStoreBlobStore,
29};
30
31pub(crate) const DELEGATE_IDLE_RETIRE_SECS_LABEL: &str = "implicit_delegate_idle_retire_secs";
32pub(crate) const DELEGATE_IDLE_RETIRE_DISABLED_LABEL: &str = "disabled";
33
34#[cfg(test)]
35use std::sync::{Mutex, OnceLock};
36
37pub const MEMBER_STATE_ACTIVE: &str = "active";
39pub const MEMBER_STATE_RETIRING: &str = "retiring";
41
42#[derive(Clone, Default)]
44pub struct MobBootstrapOptions {
45 pub allow_ephemeral_sessions: bool,
46 pub notify_orchestrator_on_resume: bool,
47 pub default_llm_client: Option<Arc<dyn LlmClient>>,
48}
49
50pub struct ReplaySanitizingLlmClient {
54 inner: Arc<dyn LlmClient>,
55}
56
57impl ReplaySanitizingLlmClient {
58 pub fn new(inner: Arc<dyn LlmClient>) -> Self {
59 Self { inner }
60 }
61
62 pub fn wrap(inner: Arc<dyn LlmClient>) -> Arc<dyn LlmClient> {
63 Arc::new(Self::new(inner))
64 }
65}
66
67pub struct ReplaySanitizingAgentLlmClient {
75 inner: Arc<dyn meerkat_core::AgentLlmClient>,
76}
77
78impl ReplaySanitizingAgentLlmClient {
79 pub fn new(inner: Arc<dyn meerkat_core::AgentLlmClient>) -> Self {
80 Self { inner }
81 }
82
83 pub fn wrap(
84 inner: Arc<dyn meerkat_core::AgentLlmClient>,
85 ) -> Arc<dyn meerkat_core::AgentLlmClient> {
86 Arc::new(Self::new(inner))
87 }
88}
89
90#[async_trait]
91impl meerkat_core::AgentLlmClient for ReplaySanitizingAgentLlmClient {
92 async fn stream_response(
93 &self,
94 messages: &[Message],
95 tools: &[Arc<meerkat_core::ToolDef>],
96 max_tokens: u32,
97 temperature: Option<f32>,
98 provider_params: Option<&meerkat_core::lifecycle::run_primitive::ProviderParamsOverride>,
99 ) -> Result<meerkat_core::agent::LlmStreamResult, meerkat_core::AgentError> {
100 let sanitized: Vec<Message> = messages
101 .iter()
102 .cloned()
103 .map(sanitize_message_for_stateless_replay)
104 .collect();
105 self.inner
106 .stream_response(&sanitized, tools, max_tokens, temperature, provider_params)
107 .await
108 }
109
110 fn provider(&self) -> &'static str {
111 self.inner.provider()
112 }
113
114 fn model(&self) -> &str {
115 self.inner.model()
116 }
117
118 fn compile_schema(
119 &self,
120 output_schema: &meerkat_core::OutputSchema,
121 ) -> Result<meerkat_core::schema::CompiledSchema, meerkat_core::schema::SchemaError> {
122 self.inner.compile_schema(output_schema)
123 }
124}
125
126#[async_trait]
127impl LlmClient for ReplaySanitizingLlmClient {
128 fn project_replay_messages(
129 &self,
130 messages: &[Message],
131 ) -> Result<Vec<Message>, meerkat_client::LlmError> {
132 let sanitized: Vec<Message> = messages
133 .iter()
134 .cloned()
135 .map(sanitize_message_for_stateless_replay)
136 .collect();
137 self.inner.project_replay_messages(&sanitized)
138 }
139
140 fn stream<'a>(&'a self, request: &'a LlmRequest) -> LlmStream<'a> {
141 let inner = Arc::clone(&self.inner);
142 let sanitized = sanitize_llm_request_for_stateless_replay(request);
143 Box::pin(async_stream::stream! {
144 let mut stream = inner.stream(&sanitized);
145 while let Some(event) = stream.next().await {
146 if runtime_turn_diagnostics_enabled()
147 && let Err(error) = &event
148 {
149 tracing::error!(
150 error = %error,
151 error_debug = ?error,
152 "mobkit llm client stream error"
153 );
154 }
155 yield event;
156 }
157 })
158 }
159
160 fn provider(&self) -> &'static str {
161 self.inner.provider()
162 }
163
164 async fn health_check(&self) -> Result<(), meerkat_client::LlmError> {
165 self.inner.health_check().await
166 }
167
168 fn compile_schema(
169 &self,
170 output_schema: &meerkat_core::OutputSchema,
171 ) -> Result<meerkat_core::schema::CompiledSchema, meerkat_core::schema::SchemaError> {
172 self.inner.compile_schema(output_schema)
173 }
174}
175
176pub(crate) type PreBuildHook = Arc<
204 dyn Fn(
205 &mut CreateSessionRequest,
206 ) -> std::pin::Pin<
207 Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
208 > + Send
209 + Sync,
210>;
211
212pub type AfterCreateHook = Arc<
214 dyn Fn(
215 meerkat_core::types::SessionId,
216 SessionCreatedContext,
217 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
218 + Send
219 + Sync,
220>;
221
222struct PreBuildMobSessionService {
229 inner: Arc<dyn MobSessionService>,
230 hook: PreBuildHook,
231 after_create_hook: Option<AfterCreateHook>,
232 runtime_adapter_override: Option<Arc<meerkat_runtime::MeerkatMachine>>,
233}
234
235fn no_op_pre_build_hook() -> PreBuildHook {
236 Arc::new(|_req: &mut CreateSessionRequest| Box::pin(async { Ok(()) }))
237}
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq)]
240pub(crate) enum DelegateIdleRetireOverride {
241 Disabled,
242 Seconds(u64),
243}
244
245#[derive(Clone, Default)]
246pub(crate) struct ImplicitDelegateRetirementOverrides {
247 inner: Arc<tokio::sync::RwLock<BTreeMap<(String, String), DelegateIdleRetireOverride>>>,
248}
249
250impl ImplicitDelegateRetirementOverrides {
251 pub(crate) async fn set(
252 &self,
253 mob_id: impl Into<String>,
254 member_id: impl Into<String>,
255 override_policy: DelegateIdleRetireOverride,
256 ) {
257 self.inner
258 .write()
259 .await
260 .insert((mob_id.into(), member_id.into()), override_policy);
261 }
262
263 pub(crate) async fn get(
264 &self,
265 mob_id: &str,
266 member_id: &str,
267 ) -> Option<DelegateIdleRetireOverride> {
268 self.inner
269 .read()
270 .await
271 .get(&(mob_id.to_string(), member_id.to_string()))
272 .copied()
273 }
274}
275
276struct AutoWireParentMobToolsFactory {
277 inner: Arc<dyn meerkat_core::service::MobToolsFactory>,
278 state: Arc<meerkat_mob_mcp::MobMcpState>,
279 implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides,
280}
281
282#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
283#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
284impl meerkat_core::service::MobToolsFactory for AutoWireParentMobToolsFactory {
285 async fn build_mob_tools(
286 &self,
287 args: meerkat_core::service::MobToolsBuildArgs,
288 ) -> Result<Arc<dyn meerkat_core::AgentToolDispatcher>, Box<dyn std::error::Error + Send + Sync>>
289 {
290 let owner_identity = args
291 .comms_name
292 .as_deref()
293 .and_then(owner_identity_from_comms_name);
294 let inner = self.inner.build_mob_tools(args).await?;
295 Ok(Arc::new(AutoWireParentMobToolDispatcher {
296 inner,
297 state: Arc::clone(&self.state),
298 owner_identity,
299 implicit_delegate_retirement_overrides: self
300 .implicit_delegate_retirement_overrides
301 .clone(),
302 }))
303 }
304}
305
306struct AutoWireParentMobToolDispatcher {
307 inner: Arc<dyn meerkat_core::AgentToolDispatcher>,
308 state: Arc<meerkat_mob_mcp::MobMcpState>,
309 owner_identity: Option<String>,
310 implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides,
311}
312
313#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
314#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
315impl meerkat_core::AgentToolDispatcher for AutoWireParentMobToolDispatcher {
316 fn tools(&self) -> Arc<[Arc<meerkat_core::types::ToolDef>]> {
317 self.inner
318 .tools()
319 .iter()
320 .map(|tool| {
321 if tool.name == "delegate" {
322 Arc::new(delegate_tool_def_with_idle_retire_secs(tool))
323 } else {
324 Arc::clone(tool)
325 }
326 })
327 .collect::<Vec<_>>()
328 .into()
329 }
330
331 async fn dispatch(
332 &self,
333 call: meerkat_core::types::ToolCallView<'_>,
334 ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
335 if call.name == "delegate" {
336 return self.dispatch_delegate(call).await;
337 }
338 if call.name != "mob_spawn_member" {
339 return self.inner.dispatch(call).await;
340 }
341
342 let mut args = serde_json::from_str::<Value>(call.args.get()).map_err(|error| {
343 meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
344 })?;
345 let mut auto_wire_parent = true;
346 if let Some(object) = args.as_object_mut() {
347 auto_wire_parent = object
348 .get("auto_wire_parent")
349 .and_then(Value::as_bool)
350 .unwrap_or(true);
351 object
352 .entry("auto_wire_parent".to_string())
353 .or_insert(Value::Bool(true));
354 }
355 let mob_id = args
356 .get("mob_id")
357 .and_then(Value::as_str)
358 .map(str::to_string);
359 let member_id = args
360 .get("member_id")
361 .and_then(Value::as_str)
362 .map(str::to_string);
363 let args = serde_json::value::RawValue::from_string(args.to_string()).map_err(|error| {
364 meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
365 })?;
366 let call = meerkat_core::types::ToolCallView {
367 id: call.id,
368 name: call.name,
369 args: &args,
370 };
371 let outcome = self.inner.dispatch(call).await?;
372 if auto_wire_parent
373 && let (Some(mob_id), Some(member_id), Some(owner_identity)) =
374 (mob_id, member_id, self.owner_identity.as_deref())
375 && member_id != owner_identity
376 {
377 self.state
378 .mob_wire(
379 &meerkat_mob::MobId::from(mob_id.as_str()),
380 meerkat_mob::AgentIdentity::from(member_id.as_str()),
381 meerkat_mob::PeerTarget::Local(meerkat_mob::AgentIdentity::from(
382 owner_identity,
383 )),
384 )
385 .await
386 .map_err(|error| {
387 meerkat_core::ToolError::execution_failed(format!(
388 "spawned member but failed to wire to spawning agent: {error}"
389 ))
390 })?;
391 }
392 Ok(outcome)
393 }
394}
395
396impl AutoWireParentMobToolDispatcher {
397 async fn dispatch_delegate(
398 &self,
399 call: meerkat_core::types::ToolCallView<'_>,
400 ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
401 let mut args = serde_json::from_str::<Value>(call.args.get()).map_err(|error| {
402 meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
403 })?;
404 let idle_retire_override = delegate_idle_retire_override_from_args(call.name, &mut args)?;
405 let args = serde_json::value::RawValue::from_string(args.to_string()).map_err(|error| {
406 meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
407 })?;
408 let call = meerkat_core::types::ToolCallView {
409 id: call.id,
410 name: call.name,
411 args: &args,
412 };
413 let outcome = self.inner.dispatch(call).await?;
414
415 if !outcome.result.is_error
416 && let Some(override_policy) = idle_retire_override
417 && let Ok(payload) = serde_json::from_str::<Value>(&outcome.result.text_content())
418 && let (Some(mob_id), Some(member_id)) = (
419 payload.get("mob_id").and_then(Value::as_str),
420 payload.get("agent_identity").and_then(Value::as_str),
421 )
422 {
423 self.implicit_delegate_retirement_overrides
424 .set(mob_id, member_id, override_policy)
425 .await;
426 }
427
428 Ok(outcome)
429 }
430}
431
432fn delegate_idle_retire_override_from_args(
433 tool_name: &str,
434 args: &mut Value,
435) -> Result<Option<DelegateIdleRetireOverride>, meerkat_core::ToolError> {
436 let Some(object) = args.as_object_mut() else {
437 return Ok(None);
438 };
439 let Some(value) = object.remove("idle_retire_secs") else {
440 return Ok(None);
441 };
442 if value.is_null() {
443 return Ok(Some(DelegateIdleRetireOverride::Disabled));
444 }
445 value
446 .as_u64()
447 .map(DelegateIdleRetireOverride::Seconds)
448 .map(Some)
449 .ok_or_else(|| {
450 meerkat_core::ToolError::invalid_arguments(
451 tool_name,
452 "idle_retire_secs must be a non-negative integer or null",
453 )
454 })
455}
456
457fn delegate_tool_def_with_idle_retire_secs(
458 tool: &meerkat_core::types::ToolDef,
459) -> meerkat_core::types::ToolDef {
460 let mut patched = tool.clone();
461 if !patched.description.contains("IDLE RETIREMENT:") {
462 patched.description.push_str(
463 "\n\nIDLE RETIREMENT:\n\
464 Omit idle_retire_secs to use the runtime default. Pass an integer \
465 number of seconds to override idle auto-retirement for this helper. \
466 Pass null to disable auto-retirement for this helper.",
467 );
468 }
469 if let Some(properties) = patched
470 .input_schema
471 .get_mut("properties")
472 .and_then(Value::as_object_mut)
473 {
474 properties
475 .entry("idle_retire_secs".to_string())
476 .or_insert_with(|| {
477 serde_json::json!({
478 "description": "Override idle auto-retirement for this helper. Omit to use the runtime default, use an integer number of seconds to override, or null to disable auto-retirement for this helper.",
479 "anyOf": [
480 {"type": "integer", "minimum": 0},
481 {"type": "null"}
482 ]
483 })
484 });
485 }
486 patched
487}
488
489fn owner_identity_from_comms_name(name: &str) -> Option<String> {
490 name.rsplit('/')
491 .next()
492 .filter(|value| !value.is_empty())
493 .map(str::to_string)
494}
495
496fn install_agent_mob_tools(
497 slot: Arc<std::sync::RwLock<Option<Arc<dyn meerkat_core::service::MobToolsFactory>>>>,
498 session_service: Arc<dyn MobSessionService>,
499) -> (
500 Arc<meerkat_mob_mcp::MobMcpState>,
501 ImplicitDelegateRetirementOverrides,
502) {
503 let state = Arc::new(meerkat_mob_mcp::MobMcpState::new(session_service));
504 let implicit_delegate_retirement_overrides = ImplicitDelegateRetirementOverrides::default();
505 let inner = Arc::new(meerkat_mob_mcp::AgentMobToolSurfaceFactory::new(
506 Arc::clone(&state),
507 ));
508 let factory = Arc::new(AutoWireParentMobToolsFactory {
509 inner,
510 state: Arc::clone(&state),
511 implicit_delegate_retirement_overrides: implicit_delegate_retirement_overrides.clone(),
512 });
513 *slot
514 .write()
515 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(factory);
516 (state, implicit_delegate_retirement_overrides)
517}
518
519#[cfg(test)]
520#[allow(dead_code)]
521#[derive(Debug, Clone)]
522pub(crate) struct RuntimeTurnTrace {
523 pub(crate) session_id: String,
524 pub(crate) boundary: String,
525 pub(crate) contributing_input_count: usize,
526 pub(crate) outcome: String,
527}
528
529fn is_replay_unsafe_server_tool_content(name: &str, content: &Value) -> bool {
530 name == "web_search_annotations"
531 || content
532 .get("type")
533 .and_then(Value::as_str)
534 .is_some_and(|kind| kind.starts_with("response."))
535}
536
537fn sanitize_llm_request_for_stateless_replay(request: &LlmRequest) -> LlmRequest {
538 let mut sanitized = request.clone();
539 sanitized.messages = request
540 .messages
541 .iter()
542 .cloned()
543 .map(sanitize_message_for_stateless_replay)
544 .collect();
545 sanitized
546}
547
548fn sanitize_create_session_request_llm_override(req: &mut CreateSessionRequest) {
549 let Some(build) = req.build.as_mut() else {
550 return;
551 };
552 let Some(client) = build
553 .llm_client_override
554 .as_ref()
555 .and_then(meerkat::decode_llm_client_override_from_service)
556 else {
557 return;
558 };
559 build.llm_client_override = Some(meerkat::encode_llm_client_override_for_service(
560 ReplaySanitizingLlmClient::wrap(client),
561 ));
562}
563
564fn sanitize_message_for_stateless_replay(message: Message) -> Message {
565 match message {
566 Message::BlockAssistant(mut assistant) => {
567 assistant.blocks = assistant
568 .blocks
569 .into_iter()
570 .filter_map(|block| match block {
571 AssistantBlock::ServerToolContent { name, content, .. }
572 if is_replay_unsafe_server_tool_content(&name, &content) =>
573 {
574 None
575 }
576 other => Some(other),
577 })
578 .collect();
579 Message::BlockAssistant(assistant)
580 }
581 other => other,
582 }
583}
584
585fn build_persistent_runtime_store(store_path: &Path) -> Arc<dyn meerkat_runtime::RuntimeStore> {
595 let runtime_db = store_path.join("runtime.sqlite");
596 match meerkat_runtime::store::SqliteRuntimeStore::new(&runtime_db) {
597 Ok(store) => Arc::new(store),
598 Err(err) => {
599 tracing::warn!(
600 path = %runtime_db.display(),
601 error = %err,
602 "failed to open SqliteRuntimeStore; falling back to InMemoryRuntimeStore. \
603 Sessions will not survive process restart and archive operations may fail.",
604 );
605 Arc::new(meerkat_runtime::InMemoryRuntimeStore::new())
606 }
607 }
608}
609
610struct SessionStoreBackedRuntimeStore {
618 inner: Arc<dyn meerkat_runtime::RuntimeStore>,
619 session_store: Arc<dyn SessionStore>,
620}
621
622impl SessionStoreBackedRuntimeStore {
623 fn new(
624 inner: Arc<dyn meerkat_runtime::RuntimeStore>,
625 session_store: Arc<dyn SessionStore>,
626 ) -> Self {
627 Self {
628 inner,
629 session_store,
630 }
631 }
632
633 fn session_id_for_runtime(
634 runtime_id: &meerkat_runtime::LogicalRuntimeId,
635 ) -> Result<meerkat_core::types::SessionId, meerkat_runtime::store::RuntimeStoreError> {
636 const PREFIX: &str = "rt:session:";
637 let Some(raw) = runtime_id.0.strip_prefix(PREFIX) else {
638 return Err(meerkat_runtime::store::RuntimeStoreError::Unsupported(
639 format!("cannot map non-session runtime id '{runtime_id}' to SessionStore"),
640 ));
641 };
642 meerkat_core::types::SessionId::parse(raw).map_err(|err| {
643 meerkat_runtime::store::RuntimeStoreError::Internal(format!(
644 "invalid session runtime id '{runtime_id}': {err}"
645 ))
646 })
647 }
648
649 fn decode_session(
650 snapshot: &[u8],
651 session_store_key: Option<&meerkat_core::types::SessionId>,
652 ) -> Result<meerkat_core::Session, meerkat_runtime::store::RuntimeStoreError> {
653 let session: meerkat_core::Session = serde_json::from_slice(snapshot).map_err(|err| {
654 meerkat_runtime::store::RuntimeStoreError::Internal(format!(
655 "failed to deserialize runtime session snapshot: {err}"
656 ))
657 })?;
658 if let Some(expected) = session_store_key
659 && session.id() != expected
660 {
661 return Err(
662 meerkat_runtime::store::RuntimeStoreError::SessionKeyMismatch {
663 expected: expected.clone(),
664 actual: session.id().clone(),
665 },
666 );
667 }
668 Ok(session)
669 }
670
671 async fn save_session_snapshot_to_store(
672 &self,
673 snapshot: &[u8],
674 session_store_key: Option<&meerkat_core::types::SessionId>,
675 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
676 let session = Self::decode_session(snapshot, session_store_key)?;
677 self.session_store.save(&session).await.map_err(|err| {
678 meerkat_runtime::store::RuntimeStoreError::WriteFailed(format!(
679 "external SessionStore save failed: {err}"
680 ))
681 })
682 }
683}
684
685#[async_trait]
686impl meerkat_runtime::RuntimeStore for SessionStoreBackedRuntimeStore {
687 fn auth_authority_key(&self) -> Option<String> {
688 self.inner.auth_authority_key()
689 }
690
691 fn persist_auth_oauth_flow_snapshot(
692 &self,
693 snapshot_json: &[u8],
694 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
695 self.inner.persist_auth_oauth_flow_snapshot(snapshot_json)
696 }
697
698 fn load_auth_oauth_flow_snapshot(
699 &self,
700 ) -> Result<Option<Vec<u8>>, meerkat_runtime::store::RuntimeStoreError> {
701 self.inner.load_auth_oauth_flow_snapshot()
702 }
703
704 fn update_auth_oauth_flow_snapshot(
705 &self,
706 update: &mut meerkat_runtime::store::AuthOAuthFlowSnapshotUpdate<'_>,
707 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
708 self.inner.update_auth_oauth_flow_snapshot(update)
709 }
710
711 async fn commit_session_snapshot(
712 &self,
713 runtime_id: &meerkat_runtime::LogicalRuntimeId,
714 session_delta: meerkat_runtime::store::SessionDelta,
715 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
716 let session_id = Self::session_id_for_runtime(runtime_id)?;
717 self.save_session_snapshot_to_store(&session_delta.session_snapshot, Some(&session_id))
718 .await?;
719 self.inner
720 .commit_session_snapshot(runtime_id, session_delta)
721 .await
722 }
723
724 async fn atomic_apply(
725 &self,
726 runtime_id: &meerkat_runtime::LogicalRuntimeId,
727 session_delta: Option<meerkat_runtime::store::SessionDelta>,
728 receipt: meerkat_core::lifecycle::RunBoundaryReceipt,
729 input_updates: Vec<StoredInputState>,
730 session_store_key: Option<meerkat_core::types::SessionId>,
731 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
732 if let Some(delta) = session_delta.as_ref() {
733 self.save_session_snapshot_to_store(
734 &delta.session_snapshot,
735 session_store_key.as_ref(),
736 )
737 .await?;
738 }
739 self.inner
740 .atomic_apply(
741 runtime_id,
742 session_delta,
743 receipt,
744 input_updates,
745 session_store_key,
746 )
747 .await
748 }
749
750 async fn load_input_states(
751 &self,
752 runtime_id: &meerkat_runtime::LogicalRuntimeId,
753 ) -> Result<Vec<StoredInputState>, meerkat_runtime::store::RuntimeStoreError> {
754 self.inner.load_input_states(runtime_id).await
755 }
756
757 async fn load_boundary_receipt(
758 &self,
759 runtime_id: &meerkat_runtime::LogicalRuntimeId,
760 run_id: &meerkat_core::lifecycle::RunId,
761 sequence: u64,
762 ) -> Result<
763 Option<meerkat_core::lifecycle::RunBoundaryReceipt>,
764 meerkat_runtime::store::RuntimeStoreError,
765 > {
766 self.inner
767 .load_boundary_receipt(runtime_id, run_id, sequence)
768 .await
769 }
770
771 async fn load_session_snapshot(
772 &self,
773 runtime_id: &meerkat_runtime::LogicalRuntimeId,
774 ) -> Result<Option<Vec<u8>>, meerkat_runtime::store::RuntimeStoreError> {
775 if let Some(snapshot) = self.inner.load_session_snapshot(runtime_id).await? {
776 return Ok(Some(snapshot));
777 }
778 let session_id = Self::session_id_for_runtime(runtime_id)?;
779 let Some(session) = self.session_store.load(&session_id).await.map_err(|err| {
780 meerkat_runtime::store::RuntimeStoreError::ReadFailed(format!(
781 "external SessionStore load failed: {err}"
782 ))
783 })?
784 else {
785 return Ok(None);
786 };
787 serde_json::to_vec(&session).map(Some).map_err(|err| {
788 meerkat_runtime::store::RuntimeStoreError::Internal(format!(
789 "failed to serialize external SessionStore snapshot: {err}"
790 ))
791 })
792 }
793
794 async fn persist_input_state(
795 &self,
796 runtime_id: &meerkat_runtime::LogicalRuntimeId,
797 state: &StoredInputState,
798 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
799 self.inner.persist_input_state(runtime_id, state).await
800 }
801
802 async fn load_input_state(
803 &self,
804 runtime_id: &meerkat_runtime::LogicalRuntimeId,
805 input_id: &meerkat_core::lifecycle::InputId,
806 ) -> Result<Option<StoredInputState>, meerkat_runtime::store::RuntimeStoreError> {
807 self.inner.load_input_state(runtime_id, input_id).await
808 }
809
810 async fn load_runtime_state(
811 &self,
812 runtime_id: &meerkat_runtime::LogicalRuntimeId,
813 ) -> Result<Option<meerkat_runtime::RuntimeState>, meerkat_runtime::store::RuntimeStoreError>
814 {
815 self.inner.load_runtime_state(runtime_id).await
816 }
817
818 async fn commit_machine_lifecycle(
819 &self,
820 runtime_id: &meerkat_runtime::LogicalRuntimeId,
821 commit: MachineLifecycleCommit,
822 input_states: &[StoredInputState],
823 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
824 self.inner
825 .commit_machine_lifecycle(runtime_id, commit, input_states)
826 .await
827 }
828
829 async fn persist_ops_lifecycle(
830 &self,
831 runtime_id: &meerkat_runtime::LogicalRuntimeId,
832 snapshot: &meerkat_runtime::ops_lifecycle::PersistedOpsSnapshot,
833 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
834 self.inner.persist_ops_lifecycle(runtime_id, snapshot).await
835 }
836
837 async fn load_ops_lifecycle(
838 &self,
839 runtime_id: &meerkat_runtime::LogicalRuntimeId,
840 ) -> Result<
841 Option<meerkat_runtime::ops_lifecycle::PersistedOpsSnapshot>,
842 meerkat_runtime::store::RuntimeStoreError,
843 > {
844 self.inner.load_ops_lifecycle(runtime_id).await
845 }
846}
847
848#[cfg(test)]
849static RUNTIME_TURN_TRACES: OnceLock<Mutex<Vec<RuntimeTurnTrace>>> = OnceLock::new();
850
851#[cfg(test)]
852fn runtime_turn_traces() -> &'static Mutex<Vec<RuntimeTurnTrace>> {
853 RUNTIME_TURN_TRACES.get_or_init(|| Mutex::new(Vec::new()))
854}
855
856#[cfg(test)]
857#[allow(clippy::expect_used)]
858fn record_runtime_turn_trace(trace: RuntimeTurnTrace) {
859 runtime_turn_traces()
860 .lock()
861 .expect("runtime turn traces mutex")
862 .push(trace);
863}
864
865#[cfg(test)]
866#[allow(dead_code)]
867#[allow(clippy::expect_used)]
868pub(crate) fn take_runtime_turn_traces() -> Vec<RuntimeTurnTrace> {
869 std::mem::take(
870 &mut *runtime_turn_traces()
871 .lock()
872 .expect("runtime turn traces mutex"),
873 )
874}
875
876#[cfg(not(test))]
877#[allow(dead_code)]
878fn record_runtime_turn_trace(_trace: ()) {}
879
880fn runtime_turn_diagnostics_enabled() -> bool {
881 std::env::var_os("MOBKIT_TRACE_RUNTIME_TURNS").is_some()
882}
883
884fn summarize_runtime_prompt(prompt: &meerkat_core::ContentInput) -> String {
885 match prompt {
886 meerkat_core::ContentInput::Text(text) => {
887 text.lines().take(6).collect::<Vec<_>>().join(" ")
888 }
889 meerkat_core::ContentInput::Blocks(blocks) => blocks
890 .iter()
891 .map(|block| block.text_projection().to_string())
892 .collect::<Vec<_>>()
893 .join(" ")
894 .lines()
895 .take(6)
896 .collect::<Vec<_>>()
897 .join(" "),
898 }
899}
900
901pub fn mob_definition_may_use_image_generation(definition: &MobDefinition) -> bool {
907 definition.profiles.values().any(|binding| {
908 binding
909 .as_inline()
910 .is_none_or(|profile| profile.tools.image_generation)
911 })
912}
913
914fn normalize_runtime_turn_request(
915 mut req: meerkat_core::service::StartTurnRequest,
916) -> meerkat_core::service::StartTurnRequest {
917 req.runtime.handling_mode = meerkat_core::types::HandlingMode::Queue;
923 req.runtime.render_metadata = None;
924 req
925}
926
927macro_rules! delegate_mob_session_service {
930 ($wrapper:ty) => {
931 #[async_trait]
932 impl meerkat_core::service::SessionService for $wrapper {
933 async fn create_session(
934 &self,
935 mut req: CreateSessionRequest,
936 ) -> Result<meerkat_core::types::RunResult, SessionError> {
937 (self.hook)(&mut req).await?;
938 sanitize_create_session_request_llm_override(&mut req);
939
940 let ctx = SessionCreatedContext {
942 model: req.model.clone(),
943 labels: req.labels.clone().unwrap_or_default(),
944 system_prompt: req.system_prompt.clone(),
945 };
946 let result = self.inner.create_session(req).await?;
947
948 if let Some(ref after_hook) = self.after_create_hook {
950 after_hook(result.session_id.clone(), ctx).await;
951 }
952
953 Ok(result)
954 }
955 async fn start_turn(
956 &self,
957 id: &meerkat_core::types::SessionId,
958 req: meerkat_core::service::StartTurnRequest,
959 ) -> Result<meerkat_core::types::RunResult, SessionError> {
960 self.inner.start_turn(id, req).await
961 }
962 async fn interrupt(
963 &self,
964 id: &meerkat_core::types::SessionId,
965 ) -> Result<(), SessionError> {
966 self.inner.interrupt(id).await
967 }
968 async fn cancel_after_boundary(
969 &self,
970 id: &meerkat_core::types::SessionId,
971 ) -> Result<(), SessionError> {
972 self.inner.cancel_after_boundary(id).await
973 }
974 async fn set_session_client(
975 &self,
976 id: &meerkat_core::types::SessionId,
977 client: Arc<dyn meerkat_core::AgentLlmClient>,
978 ) -> Result<(), SessionError> {
979 self.inner
980 .set_session_client(id, ReplaySanitizingAgentLlmClient::wrap(client))
981 .await
982 }
983 async fn hot_swap_session_llm_identity(
984 &self,
985 id: &meerkat_core::types::SessionId,
986 client: Arc<dyn meerkat_core::AgentLlmClient>,
987 identity: meerkat_core::session::SessionLlmIdentity,
988 request_policy: meerkat_core::SessionLlmRequestPolicy,
989 ) -> Result<(), SessionError> {
990 self.inner
991 .hot_swap_session_llm_identity(
992 id,
993 ReplaySanitizingAgentLlmClient::wrap(client),
994 identity,
995 request_policy,
996 )
997 .await
998 }
999 async fn update_session_keep_alive(
1000 &self,
1001 id: &meerkat_core::types::SessionId,
1002 keep_alive: bool,
1003 ) -> Result<(), SessionError> {
1004 self.inner.update_session_keep_alive(id, keep_alive).await
1005 }
1006 async fn update_session_mob_authority_context(
1007 &self,
1008 id: &meerkat_core::types::SessionId,
1009 authority_context: Option<meerkat_core::service::MobToolAuthorityContext>,
1010 ) -> Result<(), SessionError> {
1011 self.inner
1012 .update_session_mob_authority_context(id, authority_context)
1013 .await
1014 }
1015 async fn has_live_session(
1016 &self,
1017 id: &meerkat_core::types::SessionId,
1018 ) -> Result<bool, SessionError> {
1019 self.inner.has_live_session(id).await
1020 }
1021 async fn set_session_tool_visibility_state(
1022 &self,
1023 id: &meerkat_core::types::SessionId,
1024 state: Option<meerkat_core::SessionToolVisibilityState>,
1025 ) -> Result<(), SessionError> {
1026 self.inner
1027 .set_session_tool_visibility_state(id, state)
1028 .await
1029 }
1030 async fn set_session_tool_filter(
1031 &self,
1032 id: &meerkat_core::types::SessionId,
1033 filter: meerkat_core::ToolFilter,
1034 ) -> Result<(), SessionError> {
1035 self.inner.set_session_tool_filter(id, filter).await
1036 }
1037 async fn read(
1038 &self,
1039 id: &meerkat_core::types::SessionId,
1040 ) -> Result<meerkat_core::service::SessionView, SessionError> {
1041 self.inner.read(id).await
1042 }
1043 async fn list(
1044 &self,
1045 query: meerkat_core::service::SessionQuery,
1046 ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
1047 self.inner.list(query).await
1048 }
1049 async fn archive(
1050 &self,
1051 id: &meerkat_core::types::SessionId,
1052 ) -> Result<(), SessionError> {
1053 self.inner.archive(id).await
1054 }
1055 async fn subscribe_session_events(
1056 &self,
1057 id: &meerkat_core::types::SessionId,
1058 ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1059 meerkat_core::service::SessionService::subscribe_session_events(
1060 self.inner.as_ref(),
1061 id,
1062 )
1063 .await
1064 }
1065 }
1066
1067 #[async_trait]
1068 impl meerkat_core::service::SessionServiceCommsExt for $wrapper {
1069 async fn comms_runtime(
1070 &self,
1071 id: &meerkat_core::types::SessionId,
1072 ) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
1073 self.inner.comms_runtime(id).await
1074 }
1075
1076 async fn event_injector(
1077 &self,
1078 id: &meerkat_core::types::SessionId,
1079 ) -> Option<Arc<dyn meerkat_core::EventInjector>> {
1080 self.inner.event_injector(id).await
1081 }
1082
1083 async fn interaction_event_injector(
1084 &self,
1085 id: &meerkat_core::types::SessionId,
1086 ) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
1087 self.inner.interaction_event_injector(id).await
1088 }
1089 }
1090
1091 #[async_trait]
1092 impl meerkat_core::service::SessionServiceControlExt for $wrapper {
1093 async fn append_system_context(
1094 &self,
1095 id: &meerkat_core::types::SessionId,
1096 req: meerkat_core::service::AppendSystemContextRequest,
1097 ) -> Result<
1098 meerkat_core::service::AppendSystemContextResult,
1099 meerkat_core::service::SessionControlError,
1100 > {
1101 self.inner.append_system_context(id, req).await
1102 }
1103 async fn stage_tool_results(
1104 &self,
1105 id: &meerkat_core::types::SessionId,
1106 req: meerkat_core::service::StageToolResultsRequest,
1107 ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
1108 self.inner.stage_tool_results(id, req).await
1109 }
1110 }
1111
1112 #[async_trait]
1113 impl meerkat_core::service::SessionServiceHistoryExt for $wrapper {
1114 async fn read_history(
1115 &self,
1116 id: &meerkat_core::types::SessionId,
1117 query: meerkat_core::service::SessionHistoryQuery,
1118 ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
1119 self.inner.read_history(id, query).await
1120 }
1121 }
1122
1123 #[async_trait]
1124 impl MobSessionService for $wrapper {
1125 fn supports_persistent_sessions(&self) -> bool {
1126 self.inner.supports_persistent_sessions()
1127 }
1128 fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
1129 self.runtime_adapter_override
1130 .clone()
1131 .or_else(|| self.inner.runtime_adapter())
1132 }
1133 async fn interrupt_with_machine_authority(
1134 &self,
1135 session_id: &meerkat_core::types::SessionId,
1136 authority: meerkat_runtime::MachineSessionControlAuthority,
1137 ) -> Result<(), SessionError> {
1138 self.inner
1139 .interrupt_with_machine_authority(session_id, authority)
1140 .await
1141 }
1142 async fn cancel_after_boundary_with_machine_authority(
1143 &self,
1144 session_id: &meerkat_core::types::SessionId,
1145 authority: meerkat_runtime::MachineSessionControlAuthority,
1146 ) -> Result<(), SessionError> {
1147 self.inner
1148 .cancel_after_boundary_with_machine_authority(session_id, authority)
1149 .await
1150 }
1151 async fn session_belongs_to_mob(
1152 &self,
1153 session_id: &meerkat_core::types::SessionId,
1154 mob_id: &meerkat_mob::MobId,
1155 ) -> bool {
1156 self.inner.session_belongs_to_mob(session_id, mob_id).await
1157 }
1158 async fn load_persisted_session(
1159 &self,
1160 session_id: &meerkat_core::types::SessionId,
1161 ) -> Result<Option<meerkat_core::session::Session>, SessionError> {
1162 self.inner.load_persisted_session(session_id).await
1163 }
1164 async fn subscribe_session_events(
1165 &self,
1166 session_id: &meerkat_core::types::SessionId,
1167 ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1168 meerkat_mob::MobSessionService::subscribe_session_events(
1169 self.inner.as_ref(),
1170 session_id,
1171 )
1172 .await
1173 }
1174 async fn archive_with_mob_lifecycle_authority(
1175 &self,
1176 session_id: &meerkat_core::types::SessionId,
1177 ) -> Result<(), SessionError> {
1178 self.inner
1179 .archive_with_mob_lifecycle_authority(session_id)
1180 .await
1181 }
1182 async fn execution_snapshot(
1183 &self,
1184 session_id: &meerkat_core::types::SessionId,
1185 ) -> Result<Option<meerkat_core::agent::AgentExecutionSnapshot>, SessionError> {
1186 self.inner.execution_snapshot(session_id).await
1187 }
1188 async fn tool_scope_snapshot(
1189 &self,
1190 session_id: &meerkat_core::types::SessionId,
1191 ) -> Result<Option<meerkat_core::ToolScopeSnapshot>, SessionError> {
1192 self.inner.tool_scope_snapshot(session_id).await
1193 }
1194 async fn external_tool_surface_snapshot(
1195 &self,
1196 session_id: &meerkat_core::types::SessionId,
1197 ) -> Result<Option<meerkat_core::ExternalToolSurfaceSnapshot>, SessionError> {
1198 self.inner.external_tool_surface_snapshot(session_id).await
1199 }
1200 async fn peer_ingress_runtime_snapshot(
1201 &self,
1202 session_id: &meerkat_core::types::SessionId,
1203 ) -> Result<Option<meerkat_core::PeerIngressRuntimeSnapshot>, SessionError> {
1204 self.inner.peer_ingress_runtime_snapshot(session_id).await
1205 }
1206 async fn apply_runtime_turn(
1207 &self,
1208 session_id: &meerkat_core::types::SessionId,
1209 run_id: meerkat_core::lifecycle::RunId,
1210 req: meerkat_core::service::StartTurnRequest,
1211 boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1212 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1213 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1214 #[cfg(test)]
1215 let boundary_name = format!("{boundary:?}");
1216 #[cfg(test)]
1217 let contributing_count = contributing_input_ids.len();
1218 let run_id_for_log = run_id.to_string();
1219 let prompt_summary = if runtime_turn_diagnostics_enabled() {
1220 Some(summarize_runtime_prompt(&req.prompt))
1221 } else {
1222 None
1223 };
1224 if let Some(summary) = prompt_summary.as_ref() {
1225 tracing::warn!(
1226 session_id = %session_id,
1227 run_id = %run_id_for_log,
1228 boundary = ?boundary,
1229 contributing_inputs = contributing_input_ids.len(),
1230 prompt = %summary,
1231 runtime = ?req.runtime,
1232 "mobkit runtime turn start"
1233 );
1234 }
1235 let result = self
1236 .inner
1237 .apply_runtime_turn(
1238 session_id,
1239 run_id,
1240 normalize_runtime_turn_request(req),
1241 boundary,
1242 contributing_input_ids,
1243 )
1244 .await;
1245 #[cfg(test)]
1246 record_runtime_turn_trace(RuntimeTurnTrace {
1247 session_id: session_id.to_string(),
1248 boundary: boundary_name,
1249 contributing_input_count: contributing_count,
1250 outcome: match &result {
1251 Ok(_) => "ok".to_string(),
1252 Err(error) => format!("err:{error}"),
1253 },
1254 });
1255 if runtime_turn_diagnostics_enabled() {
1256 match &result {
1257 Ok(_) => tracing::warn!(
1258 session_id = %session_id,
1259 run_id = %run_id_for_log,
1260 "mobkit runtime turn ok"
1261 ),
1262 Err(error) => tracing::error!(
1263 session_id = %session_id,
1264 run_id = %run_id_for_log,
1265 error = %error,
1266 error_debug = ?error,
1267 "mobkit runtime turn error"
1268 ),
1269 }
1270 }
1271 result
1272 }
1273 async fn apply_runtime_context_appends(
1274 &self,
1275 session_id: &meerkat_core::types::SessionId,
1276 run_id: meerkat_core::lifecycle::RunId,
1277 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1278 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1279 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1280 self.inner
1281 .apply_runtime_context_appends(
1282 session_id,
1283 run_id,
1284 appends,
1285 contributing_input_ids,
1286 )
1287 .await
1288 }
1289 async fn apply_runtime_context_appends_with_boundary(
1290 &self,
1291 session_id: &meerkat_core::types::SessionId,
1292 run_id: meerkat_core::lifecycle::RunId,
1293 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1294 boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1295 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1296 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1297 self.inner
1298 .apply_runtime_context_appends_with_boundary(
1299 session_id,
1300 run_id,
1301 appends,
1302 boundary,
1303 contributing_input_ids,
1304 )
1305 .await
1306 }
1307 async fn apply_runtime_system_context_for_turn(
1308 &self,
1309 session_id: &meerkat_core::types::SessionId,
1310 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1311 ) -> Result<(), SessionError> {
1312 self.inner
1313 .apply_runtime_system_context_for_turn(session_id, appends)
1314 .await
1315 }
1316 async fn discard_live_session(
1317 &self,
1318 session_id: &meerkat_core::types::SessionId,
1319 ) -> Result<(), SessionError> {
1320 self.inner.discard_live_session(session_id).await
1321 }
1322 async fn checkpoint_committed_runtime_session_snapshot(
1323 &self,
1324 session_id: &meerkat_core::types::SessionId,
1325 session_snapshot: &[u8],
1326 ) -> Result<(), SessionError> {
1327 self.inner
1328 .checkpoint_committed_runtime_session_snapshot(session_id, session_snapshot)
1329 .await
1330 }
1331 async fn cancel_all_checkpointers(&self) {
1332 self.inner.cancel_all_checkpointers().await;
1333 }
1334 async fn rearm_all_checkpointers(&self) {
1335 self.inner.rearm_all_checkpointers().await;
1336 }
1337 }
1338 };
1339}
1340
1341delegate_mob_session_service!(PreBuildMobSessionService);
1342
1343struct AfterCreateMobSessionService {
1348 inner: Arc<dyn MobSessionService>,
1349 after_hook: AfterCreateHook,
1350}
1351
1352#[async_trait]
1353impl meerkat_core::service::SessionService for AfterCreateMobSessionService {
1354 async fn create_session(
1355 &self,
1356 mut req: CreateSessionRequest,
1357 ) -> Result<meerkat_core::types::RunResult, SessionError> {
1358 sanitize_create_session_request_llm_override(&mut req);
1359 let ctx = SessionCreatedContext {
1369 model: req.model.clone(),
1370 labels: req.labels.clone().unwrap_or_default(),
1371 system_prompt: req.system_prompt.clone(),
1372 };
1373 let result = self.inner.create_session(req).await?;
1374 (self.after_hook)(result.session_id.clone(), ctx).await;
1375 Ok(result)
1376 }
1377 async fn start_turn(
1378 &self,
1379 id: &meerkat_core::types::SessionId,
1380 req: meerkat_core::service::StartTurnRequest,
1381 ) -> Result<meerkat_core::types::RunResult, SessionError> {
1382 self.inner.start_turn(id, req).await
1383 }
1384 async fn interrupt(&self, id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
1385 self.inner.interrupt(id).await
1386 }
1387 async fn cancel_after_boundary(
1388 &self,
1389 id: &meerkat_core::types::SessionId,
1390 ) -> Result<(), SessionError> {
1391 self.inner.cancel_after_boundary(id).await
1392 }
1393 async fn set_session_client(
1394 &self,
1395 id: &meerkat_core::types::SessionId,
1396 client: Arc<dyn meerkat_core::AgentLlmClient>,
1397 ) -> Result<(), SessionError> {
1398 self.inner
1399 .set_session_client(id, ReplaySanitizingAgentLlmClient::wrap(client))
1400 .await
1401 }
1402 async fn hot_swap_session_llm_identity(
1403 &self,
1404 id: &meerkat_core::types::SessionId,
1405 client: Arc<dyn meerkat_core::AgentLlmClient>,
1406 identity: meerkat_core::session::SessionLlmIdentity,
1407 request_policy: meerkat_core::SessionLlmRequestPolicy,
1408 ) -> Result<(), SessionError> {
1409 self.inner
1410 .hot_swap_session_llm_identity(
1411 id,
1412 ReplaySanitizingAgentLlmClient::wrap(client),
1413 identity,
1414 request_policy,
1415 )
1416 .await
1417 }
1418 async fn update_session_keep_alive(
1419 &self,
1420 id: &meerkat_core::types::SessionId,
1421 keep_alive: bool,
1422 ) -> Result<(), SessionError> {
1423 self.inner.update_session_keep_alive(id, keep_alive).await
1424 }
1425 async fn update_session_mob_authority_context(
1426 &self,
1427 id: &meerkat_core::types::SessionId,
1428 authority_context: Option<meerkat_core::service::MobToolAuthorityContext>,
1429 ) -> Result<(), SessionError> {
1430 self.inner
1431 .update_session_mob_authority_context(id, authority_context)
1432 .await
1433 }
1434 async fn has_live_session(
1435 &self,
1436 id: &meerkat_core::types::SessionId,
1437 ) -> Result<bool, SessionError> {
1438 self.inner.has_live_session(id).await
1439 }
1440 async fn set_session_tool_visibility_state(
1441 &self,
1442 id: &meerkat_core::types::SessionId,
1443 state: Option<meerkat_core::SessionToolVisibilityState>,
1444 ) -> Result<(), SessionError> {
1445 self.inner
1446 .set_session_tool_visibility_state(id, state)
1447 .await
1448 }
1449 async fn set_session_tool_filter(
1450 &self,
1451 id: &meerkat_core::types::SessionId,
1452 filter: meerkat_core::ToolFilter,
1453 ) -> Result<(), SessionError> {
1454 self.inner.set_session_tool_filter(id, filter).await
1455 }
1456 async fn read(
1457 &self,
1458 id: &meerkat_core::types::SessionId,
1459 ) -> Result<meerkat_core::service::SessionView, SessionError> {
1460 self.inner.read(id).await
1461 }
1462 async fn list(
1463 &self,
1464 query: meerkat_core::service::SessionQuery,
1465 ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
1466 self.inner.list(query).await
1467 }
1468 async fn archive(&self, id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
1469 self.inner.archive(id).await
1470 }
1471 async fn subscribe_session_events(
1472 &self,
1473 id: &meerkat_core::types::SessionId,
1474 ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1475 meerkat_core::service::SessionService::subscribe_session_events(self.inner.as_ref(), id)
1476 .await
1477 }
1478}
1479
1480#[async_trait]
1481impl meerkat_core::service::SessionServiceCommsExt for AfterCreateMobSessionService {
1482 async fn comms_runtime(
1483 &self,
1484 id: &meerkat_core::types::SessionId,
1485 ) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
1486 self.inner.comms_runtime(id).await
1487 }
1488
1489 async fn event_injector(
1490 &self,
1491 id: &meerkat_core::types::SessionId,
1492 ) -> Option<Arc<dyn meerkat_core::EventInjector>> {
1493 self.inner.event_injector(id).await
1494 }
1495
1496 async fn interaction_event_injector(
1497 &self,
1498 id: &meerkat_core::types::SessionId,
1499 ) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
1500 self.inner.interaction_event_injector(id).await
1501 }
1502}
1503
1504#[async_trait]
1505impl meerkat_core::service::SessionServiceControlExt for AfterCreateMobSessionService {
1506 async fn append_system_context(
1507 &self,
1508 id: &meerkat_core::types::SessionId,
1509 req: meerkat_core::service::AppendSystemContextRequest,
1510 ) -> Result<
1511 meerkat_core::service::AppendSystemContextResult,
1512 meerkat_core::service::SessionControlError,
1513 > {
1514 self.inner.append_system_context(id, req).await
1515 }
1516 async fn stage_tool_results(
1517 &self,
1518 id: &meerkat_core::types::SessionId,
1519 req: meerkat_core::service::StageToolResultsRequest,
1520 ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
1521 self.inner.stage_tool_results(id, req).await
1522 }
1523}
1524
1525#[async_trait]
1526impl meerkat_core::service::SessionServiceHistoryExt for AfterCreateMobSessionService {
1527 async fn read_history(
1528 &self,
1529 id: &meerkat_core::types::SessionId,
1530 query: meerkat_core::service::SessionHistoryQuery,
1531 ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
1532 self.inner.read_history(id, query).await
1533 }
1534}
1535
1536#[async_trait]
1537impl MobSessionService for AfterCreateMobSessionService {
1538 fn supports_persistent_sessions(&self) -> bool {
1539 self.inner.supports_persistent_sessions()
1540 }
1541 fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
1542 self.inner.runtime_adapter()
1543 }
1544 async fn interrupt_with_machine_authority(
1545 &self,
1546 session_id: &meerkat_core::types::SessionId,
1547 authority: meerkat_runtime::MachineSessionControlAuthority,
1548 ) -> Result<(), SessionError> {
1549 self.inner
1550 .interrupt_with_machine_authority(session_id, authority)
1551 .await
1552 }
1553 async fn cancel_after_boundary_with_machine_authority(
1554 &self,
1555 session_id: &meerkat_core::types::SessionId,
1556 authority: meerkat_runtime::MachineSessionControlAuthority,
1557 ) -> Result<(), SessionError> {
1558 self.inner
1559 .cancel_after_boundary_with_machine_authority(session_id, authority)
1560 .await
1561 }
1562 async fn session_belongs_to_mob(
1563 &self,
1564 session_id: &meerkat_core::types::SessionId,
1565 mob_id: &meerkat_mob::MobId,
1566 ) -> bool {
1567 self.inner.session_belongs_to_mob(session_id, mob_id).await
1568 }
1569 async fn load_persisted_session(
1570 &self,
1571 session_id: &meerkat_core::types::SessionId,
1572 ) -> Result<Option<meerkat_core::session::Session>, SessionError> {
1573 self.inner.load_persisted_session(session_id).await
1574 }
1575 async fn subscribe_session_events(
1576 &self,
1577 session_id: &meerkat_core::types::SessionId,
1578 ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1579 meerkat_mob::MobSessionService::subscribe_session_events(self.inner.as_ref(), session_id)
1580 .await
1581 }
1582 async fn archive_with_mob_lifecycle_authority(
1583 &self,
1584 session_id: &meerkat_core::types::SessionId,
1585 ) -> Result<(), SessionError> {
1586 self.inner
1587 .archive_with_mob_lifecycle_authority(session_id)
1588 .await
1589 }
1590 async fn execution_snapshot(
1591 &self,
1592 session_id: &meerkat_core::types::SessionId,
1593 ) -> Result<Option<meerkat_core::agent::AgentExecutionSnapshot>, SessionError> {
1594 self.inner.execution_snapshot(session_id).await
1595 }
1596 async fn tool_scope_snapshot(
1597 &self,
1598 session_id: &meerkat_core::types::SessionId,
1599 ) -> Result<Option<meerkat_core::ToolScopeSnapshot>, SessionError> {
1600 self.inner.tool_scope_snapshot(session_id).await
1601 }
1602 async fn external_tool_surface_snapshot(
1603 &self,
1604 session_id: &meerkat_core::types::SessionId,
1605 ) -> Result<Option<meerkat_core::ExternalToolSurfaceSnapshot>, SessionError> {
1606 self.inner.external_tool_surface_snapshot(session_id).await
1607 }
1608 async fn peer_ingress_runtime_snapshot(
1609 &self,
1610 session_id: &meerkat_core::types::SessionId,
1611 ) -> Result<Option<meerkat_core::PeerIngressRuntimeSnapshot>, SessionError> {
1612 self.inner.peer_ingress_runtime_snapshot(session_id).await
1613 }
1614 async fn apply_runtime_turn(
1615 &self,
1616 session_id: &meerkat_core::types::SessionId,
1617 run_id: meerkat_core::lifecycle::RunId,
1618 req: meerkat_core::service::StartTurnRequest,
1619 boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1620 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1621 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1622 self.inner
1623 .apply_runtime_turn(session_id, run_id, req, boundary, contributing_input_ids)
1624 .await
1625 }
1626 async fn apply_runtime_context_appends(
1627 &self,
1628 session_id: &meerkat_core::types::SessionId,
1629 run_id: meerkat_core::lifecycle::RunId,
1630 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1631 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1632 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1633 self.inner
1634 .apply_runtime_context_appends(session_id, run_id, appends, contributing_input_ids)
1635 .await
1636 }
1637 async fn apply_runtime_context_appends_with_boundary(
1638 &self,
1639 session_id: &meerkat_core::types::SessionId,
1640 run_id: meerkat_core::lifecycle::RunId,
1641 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1642 boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1643 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1644 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1645 self.inner
1646 .apply_runtime_context_appends_with_boundary(
1647 session_id,
1648 run_id,
1649 appends,
1650 boundary,
1651 contributing_input_ids,
1652 )
1653 .await
1654 }
1655 async fn apply_runtime_system_context_for_turn(
1656 &self,
1657 session_id: &meerkat_core::types::SessionId,
1658 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1659 ) -> Result<(), SessionError> {
1660 self.inner
1661 .apply_runtime_system_context_for_turn(session_id, appends)
1662 .await
1663 }
1664 async fn discard_live_session(
1665 &self,
1666 session_id: &meerkat_core::types::SessionId,
1667 ) -> Result<(), SessionError> {
1668 self.inner.discard_live_session(session_id).await
1669 }
1670 async fn checkpoint_committed_runtime_session_snapshot(
1671 &self,
1672 session_id: &meerkat_core::types::SessionId,
1673 session_snapshot: &[u8],
1674 ) -> Result<(), SessionError> {
1675 self.inner
1676 .checkpoint_committed_runtime_session_snapshot(session_id, session_snapshot)
1677 .await
1678 }
1679 async fn cancel_all_checkpointers(&self) {
1680 self.inner.cancel_all_checkpointers().await;
1681 }
1682 async fn rearm_all_checkpointers(&self) {
1683 self.inner.rearm_all_checkpointers().await;
1684 }
1685}
1686
1687pub struct MobBootstrapSpec {
1689 pub definition: MobDefinition,
1690 pub storage: MobStorage,
1691 pub session_service: Arc<dyn MobSessionService>,
1692 pub binary_blob_store: Option<Arc<dyn BinaryBlobStore>>,
1693 pub(crate) agent_mob_mcp_state: Option<Arc<meerkat_mob_mcp::MobMcpState>>,
1694 pub(crate) implicit_delegate_retirement_overrides: Option<ImplicitDelegateRetirementOverrides>,
1695 pub options: MobBootstrapOptions,
1696 pub runtime_adapter: Option<Arc<meerkat_runtime::MeerkatMachine>>,
1702 pub(crate) _ephemeral_dir: Option<Arc<tempfile::TempDir>>,
1705}
1706
1707impl MobBootstrapSpec {
1708 pub fn new(
1709 definition: MobDefinition,
1710 storage: MobStorage,
1711 session_service: Arc<dyn MobSessionService>,
1712 ) -> Self {
1713 let session_service = Arc::new(PreBuildMobSessionService {
1714 inner: session_service,
1715 hook: no_op_pre_build_hook(),
1716 after_create_hook: None,
1717 runtime_adapter_override: None,
1718 }) as Arc<dyn MobSessionService>;
1719 Self {
1720 definition,
1721 storage,
1722 session_service,
1723 binary_blob_store: None,
1724 agent_mob_mcp_state: None,
1725 implicit_delegate_retirement_overrides: None,
1726 options: MobBootstrapOptions {
1727 allow_ephemeral_sessions: true,
1728 notify_orchestrator_on_resume: true,
1729 default_llm_client: None,
1730 },
1731 runtime_adapter: None,
1732 _ephemeral_dir: None,
1733 }
1734 }
1735
1736 pub fn with_options(mut self, options: MobBootstrapOptions) -> Self {
1737 self.options = options;
1738 self
1739 }
1740
1741 pub fn with_session_runtime_adapter(
1749 mut self,
1750 adapter: Arc<meerkat_runtime::MeerkatMachine>,
1751 ) -> Self {
1752 self.session_service = Arc::new(PreBuildMobSessionService {
1753 inner: self.session_service,
1754 hook: no_op_pre_build_hook(),
1755 after_create_hook: None,
1756 runtime_adapter_override: Some(adapter),
1757 });
1758 self
1759 }
1760
1761 pub fn with_after_create_hook(mut self, hook: AfterCreateHook) -> Self {
1767 self.session_service = Arc::new(AfterCreateMobSessionService {
1768 inner: self.session_service,
1769 after_hook: hook,
1770 });
1771 self
1772 }
1773
1774 pub fn ephemeral(
1779 definition: MobDefinition,
1780 storage: MobStorage,
1781 store_path: PathBuf,
1782 max_sessions: usize,
1783 session_store: Option<Arc<dyn AgentSessionStore>>,
1784 ) -> Self {
1785 Self::ephemeral_inner(
1786 definition,
1787 storage,
1788 store_path,
1789 max_sessions,
1790 session_store,
1791 None,
1792 CapabilityFlags::default(),
1793 None,
1794 )
1795 }
1796
1797 pub fn ephemeral_with_hook(
1801 definition: MobDefinition,
1802 storage: MobStorage,
1803 store_path: PathBuf,
1804 max_sessions: usize,
1805 session_store: Option<Arc<dyn AgentSessionStore>>,
1806 hook: impl Fn(
1807 &mut CreateSessionRequest,
1808 ) -> std::pin::Pin<
1809 Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
1810 > + Send
1811 + Sync
1812 + 'static,
1813 ) -> Self {
1814 Self::ephemeral_inner(
1815 definition,
1816 storage,
1817 store_path,
1818 max_sessions,
1819 session_store,
1820 Some(Arc::new(hook)),
1821 CapabilityFlags::default(),
1822 None,
1823 )
1824 }
1825
1826 #[allow(clippy::too_many_arguments)]
1827 pub(crate) fn ephemeral_inner(
1828 definition: MobDefinition,
1829 storage: MobStorage,
1830 store_path: PathBuf,
1831 max_sessions: usize,
1832 session_store: Option<Arc<dyn AgentSessionStore>>,
1833 hook: Option<PreBuildHook>,
1834 mut caps: CapabilityFlags,
1835 after_create_hook: Option<AfterCreateHook>,
1836 ) -> Self {
1837 caps.image_generation |= mob_definition_may_use_image_generation(&definition);
1838 let binary_blob_store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
1839 let blob_store: Arc<dyn meerkat_core::BlobStore> =
1840 Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
1841 let runtime_adapter = if caps.image_generation {
1842 let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
1843 Arc::new(meerkat_runtime::InMemoryRuntimeStore::new());
1844 Some(Arc::new(meerkat_runtime::MeerkatMachine::persistent(
1845 runtime_store,
1846 Arc::clone(&blob_store),
1847 )))
1848 } else {
1849 None
1850 };
1851 let mut factory = AgentFactory::new(&store_path)
1852 .builtins(caps.builtins)
1853 .shell(caps.shell)
1854 .mob(caps.mob)
1855 .comms(caps.comms)
1856 .memory(caps.memory);
1857 if let Some(machine) = runtime_adapter.clone() {
1858 factory = factory.with_image_generation_machine(machine);
1859 }
1860 let config = Config::default();
1861 let mut builder = FactoryAgentBuilder::new(factory, config);
1862 builder.default_blob_store = Some(blob_store);
1863 if let Some(store) = session_store {
1864 builder.default_session_store = Some(store);
1865 }
1866 let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
1867 let session_service: Arc<dyn MobSessionService> = Arc::new(
1868 meerkat_session::EphemeralSessionService::new(builder, max_sessions),
1869 );
1870 let hook = hook.unwrap_or_else(no_op_pre_build_hook);
1871 let after_create_hook = if let Some(runtime_adapter) = runtime_adapter.clone() {
1872 let user_after_create_hook = after_create_hook.clone();
1873 Some(Arc::new(
1874 move |session_id: meerkat_core::types::SessionId, ctx: SessionCreatedContext| {
1875 let runtime_adapter = runtime_adapter.clone();
1876 let user_after_create_hook = user_after_create_hook.clone();
1877 Box::pin(async move {
1878 runtime_adapter.register_session(session_id.clone()).await;
1879 if let Some(user_after_create_hook) = user_after_create_hook {
1880 user_after_create_hook(session_id, ctx).await;
1881 }
1882 })
1883 as std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
1884 },
1885 ) as AfterCreateHook)
1886 } else {
1887 after_create_hook
1888 };
1889 let session_service = Arc::new(PreBuildMobSessionService {
1890 inner: session_service,
1891 hook,
1892 after_create_hook,
1893 runtime_adapter_override: runtime_adapter.clone(),
1894 }) as Arc<dyn MobSessionService>;
1895 let (agent_mob_mcp_state, implicit_delegate_retirement_overrides) =
1896 install_agent_mob_tools(mob_tools_slot, Arc::clone(&session_service));
1897 let mut spec = Self::new(definition, storage, session_service);
1898 spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
1899 spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
1900 spec.runtime_adapter = runtime_adapter;
1901 spec.binary_blob_store = Some(binary_blob_store);
1902 spec
1903 }
1904
1905 pub fn persistent(
1912 definition: MobDefinition,
1913 storage: MobStorage,
1914 store_path: PathBuf,
1915 max_sessions: usize,
1916 session_store: Arc<dyn SessionStore>,
1917 ) -> Self {
1918 Self::persistent_inner(
1919 definition,
1920 storage,
1921 store_path,
1922 max_sessions,
1923 session_store,
1924 None,
1925 None,
1926 CapabilityFlags::default(),
1927 None,
1928 )
1929 }
1930
1931 pub fn persistent_with_hook(
1935 definition: MobDefinition,
1936 storage: MobStorage,
1937 store_path: PathBuf,
1938 max_sessions: usize,
1939 session_store: Arc<dyn SessionStore>,
1940 hook: impl Fn(
1941 &mut CreateSessionRequest,
1942 ) -> std::pin::Pin<
1943 Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
1944 > + Send
1945 + Sync
1946 + 'static,
1947 ) -> Self {
1948 Self::persistent_inner(
1949 definition,
1950 storage,
1951 store_path,
1952 max_sessions,
1953 session_store,
1954 None,
1955 Some(Arc::new(hook)),
1956 CapabilityFlags::default(),
1957 None,
1958 )
1959 }
1960
1961 #[allow(clippy::too_many_arguments)]
1962 pub(crate) fn persistent_inner(
1963 definition: MobDefinition,
1964 storage: MobStorage,
1965 store_path: PathBuf,
1966 max_sessions: usize,
1967 session_store: Arc<dyn SessionStore>,
1968 custom_blob_store: Option<Arc<dyn meerkat_core::BlobStore>>,
1969 hook: Option<PreBuildHook>,
1970 mut caps: CapabilityFlags,
1971 after_create_hook: Option<AfterCreateHook>,
1972 ) -> Self {
1973 caps.image_generation |= mob_definition_may_use_image_generation(&definition);
1974 let (binary_blob_store, blob_store): (
1975 Arc<dyn BinaryBlobStore>,
1976 Arc<dyn meerkat_core::BlobStore>,
1977 ) = if let Some(blob_store) = custom_blob_store {
1978 (
1979 Arc::new(BinaryBlobStoreAdapter::new(blob_store.clone())),
1980 blob_store,
1981 )
1982 } else {
1983 let binary_blob_store: Arc<dyn BinaryBlobStore> = match ObjectStoreBlobStore::local(
1984 store_path.join("blobs"),
1985 ) {
1986 Ok(store) => Arc::new(store),
1987 Err(err) => {
1988 tracing::warn!(
1989 error = %err,
1990 "failed to initialize persistent binary blob store; falling back to in-memory blobs"
1991 );
1992 Arc::new(ObjectStoreBlobStore::memory())
1993 }
1994 };
1995 let blob_store: Arc<dyn meerkat_core::BlobStore> =
1996 Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
1997 (binary_blob_store, blob_store)
1998 };
1999 let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2009 build_persistent_runtime_store(&store_path);
2010 let runtime_adapter = Arc::new(meerkat_runtime::MeerkatMachine::persistent(
2011 Arc::clone(&runtime_store),
2012 Arc::clone(&blob_store),
2013 ));
2014 let mut factory = AgentFactory::new(&store_path)
2015 .builtins(caps.builtins)
2016 .shell(caps.shell)
2017 .mob(caps.mob)
2018 .comms(caps.comms)
2019 .memory(caps.memory);
2020 if caps.image_generation {
2021 factory = factory.with_image_generation_machine(runtime_adapter.clone());
2022 }
2023 let config = Config::default();
2024 let mut builder = FactoryAgentBuilder::new(factory, config);
2025 builder.default_session_store = Some(Arc::new(StoreAdapter::new(session_store.clone())));
2026 builder.default_blob_store = Some(blob_store.clone());
2027 let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
2028 let session_service: Arc<dyn MobSessionService> =
2029 Arc::new(meerkat_session::PersistentSessionService::new(
2030 builder,
2031 max_sessions,
2032 session_store,
2033 Some(runtime_store),
2034 blob_store,
2035 ));
2036 let hook = hook.unwrap_or_else(no_op_pre_build_hook);
2037 let session_service = Arc::new(PreBuildMobSessionService {
2038 inner: session_service,
2039 hook,
2040 after_create_hook,
2041 runtime_adapter_override: None,
2042 }) as Arc<dyn MobSessionService>;
2043 let (agent_mob_mcp_state, implicit_delegate_retirement_overrides) =
2044 install_agent_mob_tools(mob_tools_slot, Arc::clone(&session_service));
2045 let mut spec = Self::new(definition, storage, session_service);
2046 spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
2047 spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
2048 spec.runtime_adapter = Some(runtime_adapter);
2049 spec.binary_blob_store = Some(binary_blob_store);
2050 spec
2051 }
2052
2053 #[allow(clippy::too_many_arguments)]
2054 pub(crate) fn ephemeral_runtime_backed_inner(
2055 definition: MobDefinition,
2056 storage: MobStorage,
2057 store_path: PathBuf,
2058 max_sessions: usize,
2059 custom_session_store: Option<Arc<dyn SessionStore>>,
2060 custom_blob_store: Option<Arc<dyn meerkat_core::BlobStore>>,
2061 hook: Option<PreBuildHook>,
2062 mut caps: CapabilityFlags,
2063 after_create_hook: Option<AfterCreateHook>,
2064 ) -> Self {
2065 caps.image_generation |= mob_definition_may_use_image_generation(&definition);
2066 let config = Config::default();
2067 let session_store: Arc<dyn SessionStore> = custom_session_store
2068 .clone()
2069 .unwrap_or_else(|| Arc::new(meerkat_store::MemoryStore::new()));
2070 let (binary_blob_store, blob_store): (
2071 Arc<dyn BinaryBlobStore>,
2072 Arc<dyn meerkat_core::BlobStore>,
2073 ) = if let Some(blob_store) = custom_blob_store {
2074 (
2075 Arc::new(BinaryBlobStoreAdapter::new(blob_store.clone())),
2076 blob_store,
2077 )
2078 } else {
2079 let binary_blob_store: Arc<dyn BinaryBlobStore> =
2080 Arc::new(ObjectStoreBlobStore::memory());
2081 let blob_store: Arc<dyn meerkat_core::BlobStore> =
2082 Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
2083 (binary_blob_store, blob_store)
2084 };
2085 let base_runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2093 Arc::new(meerkat_runtime::InMemoryRuntimeStore::new());
2094 let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2095 if let Some(custom_session_store) = custom_session_store.clone() {
2096 Arc::new(SessionStoreBackedRuntimeStore::new(
2097 Arc::clone(&base_runtime_store),
2098 custom_session_store,
2099 ))
2100 } else {
2101 Arc::clone(&base_runtime_store)
2102 };
2103 let runtime_adapter = Arc::new(meerkat_runtime::MeerkatMachine::persistent(
2104 Arc::clone(&runtime_store),
2105 Arc::clone(&blob_store),
2106 ));
2107 let mut factory = AgentFactory::new(&store_path)
2108 .builtins(caps.builtins)
2109 .shell(caps.shell)
2110 .mob(caps.mob)
2111 .comms(caps.comms)
2112 .memory(caps.memory);
2113 if caps.image_generation {
2114 factory = factory.with_image_generation_machine(runtime_adapter.clone());
2115 }
2116 let mut builder = FactoryAgentBuilder::new(factory, config);
2117 builder.default_session_store = Some(Arc::new(StoreAdapter::new(session_store.clone())));
2118 builder.default_blob_store = Some(blob_store.clone());
2119 let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
2120 let session_service: Arc<dyn MobSessionService> =
2121 if let Some(custom_session_store) = custom_session_store {
2122 Arc::new(meerkat_session::PersistentSessionService::new(
2123 builder,
2124 max_sessions,
2125 custom_session_store,
2126 Some(runtime_store.clone()),
2127 blob_store,
2128 ))
2129 } else {
2130 Arc::new(meerkat_session::EphemeralSessionService::new(
2131 builder,
2132 max_sessions,
2133 ))
2134 };
2135 let hook = hook.unwrap_or_else(no_op_pre_build_hook);
2136 let runtime_adapter_for_after_create = runtime_adapter.clone();
2137 let combined_after_create_hook: AfterCreateHook = Arc::new(move |session_id, ctx| {
2138 let runtime_adapter = runtime_adapter_for_after_create.clone();
2139 let after_create_hook = after_create_hook.clone();
2140 Box::pin(async move {
2141 runtime_adapter.register_session(session_id.clone()).await;
2142 if let Some(after_create_hook) = after_create_hook {
2143 after_create_hook(session_id, ctx).await;
2144 }
2145 })
2146 });
2147 let session_service = Arc::new(PreBuildMobSessionService {
2148 inner: session_service,
2149 hook,
2150 after_create_hook: Some(combined_after_create_hook),
2151 runtime_adapter_override: Some(runtime_adapter.clone()),
2152 }) as Arc<dyn MobSessionService>;
2153 let (agent_mob_mcp_state, implicit_delegate_retirement_overrides) =
2154 install_agent_mob_tools(mob_tools_slot, Arc::clone(&session_service));
2155 let mut spec = Self::new(definition, storage, session_service);
2156 spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
2157 spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
2158 spec.runtime_adapter = Some(runtime_adapter);
2159 spec.binary_blob_store = Some(binary_blob_store);
2160 spec
2161 }
2162}
2163
2164#[derive(Debug)]
2166pub enum MobRuntimeError {
2167 Mob(MobError),
2168 InvalidInput(&'static str),
2169 InvalidConfig(String),
2170}
2171
2172impl std::fmt::Display for MobRuntimeError {
2173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2174 match self {
2175 Self::Mob(err) => write!(f, "{err}"),
2176 Self::InvalidInput(message) => write!(f, "{message}"),
2177 Self::InvalidConfig(message) => write!(f, "{message}"),
2178 }
2179 }
2180}
2181
2182impl std::error::Error for MobRuntimeError {}
2183
2184impl From<MobError> for MobRuntimeError {
2185 fn from(value: MobError) -> Self {
2186 Self::Mob(value)
2187 }
2188}
2189
2190#[derive(Clone, Debug)]
2199pub struct SessionCreatedContext {
2200 pub model: String,
2201 pub labels: std::collections::BTreeMap<String, String>,
2202 pub system_prompt: Option<String>,
2203}
2204
2205#[async_trait]
2212pub trait SessionHook: Send + Sync {
2213 async fn before_create(&self, _req: &mut CreateSessionRequest) -> Result<(), SessionError> {
2217 Ok(())
2218 }
2219
2220 async fn after_create(
2223 &self,
2224 _session_id: &meerkat_core::types::SessionId,
2225 _ctx: &SessionCreatedContext,
2226 ) {
2227 }
2228}
2229
2230#[derive(Clone, Copy, Debug)]
2232pub struct CapabilityFlags {
2233 pub builtins: bool,
2234 pub shell: bool,
2235 pub mob: bool,
2236 pub comms: bool,
2237 pub memory: bool,
2238 pub image_generation: bool,
2239}
2240
2241impl Default for CapabilityFlags {
2242 fn default() -> Self {
2243 Self {
2244 builtins: true,
2245 shell: true,
2246 mob: true,
2247 comms: true,
2248 memory: true,
2249 image_generation: false,
2250 }
2251 }
2252}
2253
2254pub type RealMobRuntime = MobRuntime;
2256
2257#[derive(Clone)]
2259pub struct MobRuntime {
2260 handle: MobHandle,
2261 session_service: Option<Arc<dyn MobSessionService>>,
2262 agent_mob_mcp_state: Option<Arc<meerkat_mob_mcp::MobMcpState>>,
2263 implicit_delegate_retirement_overrides: Option<ImplicitDelegateRetirementOverrides>,
2264 binary_blob_store: Option<Arc<dyn BinaryBlobStore>>,
2265 baseline_member_specs: Arc<tokio::sync::RwLock<Vec<SpawnMemberSpec>>>,
2266 _ephemeral_dir: Option<Arc<tempfile::TempDir>>,
2269}
2270
2271impl MobRuntime {
2272 pub async fn bootstrap(spec: MobBootstrapSpec) -> Result<Self, MobRuntimeError> {
2273 let ephemeral_dir = spec._ephemeral_dir.clone();
2274 let session_service = spec.session_service.clone();
2275 let binary_blob_store = spec.binary_blob_store.clone();
2276 let mob_id = spec.definition.id.clone();
2277 let agent_mob_mcp_state = spec.agent_mob_mcp_state.clone();
2278 let implicit_delegate_retirement_overrides =
2279 spec.implicit_delegate_retirement_overrides.clone();
2280 let effective_runtime_adapter = spec
2281 .runtime_adapter
2282 .clone()
2283 .or_else(|| session_service.runtime_adapter());
2284
2285 let mut builder = MobBuilder::new(spec.definition, spec.storage);
2286
2287 if let Some(adapter) = effective_runtime_adapter {
2294 builder = builder.with_runtime_adapter(adapter);
2295 }
2296
2297 builder = builder
2298 .with_session_service(session_service.clone())
2299 .allow_ephemeral_sessions(spec.options.allow_ephemeral_sessions)
2300 .notify_orchestrator_on_resume(spec.options.notify_orchestrator_on_resume);
2301
2302 if let Some(client) = spec.options.default_llm_client {
2303 builder = builder.with_default_llm_client(ReplaySanitizingLlmClient::wrap(client));
2304 }
2305
2306 let handle = builder.create().await?;
2307 if let Some(state) = agent_mob_mcp_state.as_ref() {
2308 state.mob_insert_handle(mob_id, handle.clone()).await;
2309 }
2310 Ok(Self {
2311 handle,
2312 session_service: Some(session_service),
2313 agent_mob_mcp_state,
2314 implicit_delegate_retirement_overrides,
2315 binary_blob_store,
2316 baseline_member_specs: Arc::new(tokio::sync::RwLock::new(Vec::new())),
2317 _ephemeral_dir: ephemeral_dir,
2318 })
2319 }
2320
2321 pub fn from_handle(handle: MobHandle) -> Self {
2322 Self {
2323 handle,
2324 session_service: None,
2325 agent_mob_mcp_state: None,
2326 implicit_delegate_retirement_overrides: None,
2327 binary_blob_store: None,
2328 baseline_member_specs: Arc::new(tokio::sync::RwLock::new(Vec::new())),
2329 _ephemeral_dir: None,
2330 }
2331 }
2332
2333 pub fn handle(&self) -> MobHandle {
2334 self.handle.clone()
2335 }
2336
2337 pub fn agent_mob_mcp_state(&self) -> Option<Arc<meerkat_mob_mcp::MobMcpState>> {
2338 self.agent_mob_mcp_state.clone()
2339 }
2340
2341 pub(crate) fn implicit_delegate_retirement_overrides(
2342 &self,
2343 ) -> Option<ImplicitDelegateRetirementOverrides> {
2344 self.implicit_delegate_retirement_overrides.clone()
2345 }
2346
2347 pub async fn set_baseline_member_specs(&self, specs: Vec<SpawnMemberSpec>) {
2348 *self.baseline_member_specs.write().await = specs;
2349 }
2350
2351 pub async fn baseline_member_specs(&self) -> Vec<SpawnMemberSpec> {
2352 self.baseline_member_specs.read().await.clone()
2353 }
2354
2355 pub async fn read_session_history(
2356 &self,
2357 session_id_str: &str,
2358 offset: usize,
2359 limit: Option<usize>,
2360 ) -> Result<SessionHistoryPage, MobRuntimeError> {
2361 if session_id_str.trim().is_empty() {
2362 return Err(MobRuntimeError::InvalidInput(
2363 "session_id must not be empty",
2364 ));
2365 }
2366 let Some(session_service) = self.session_service.as_ref() else {
2367 return Err(MobRuntimeError::InvalidInput(
2368 "session history unavailable for this runtime",
2369 ));
2370 };
2371 let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2372 .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2373 SessionServiceHistoryExt::read_history(
2374 session_service.as_ref(),
2375 &session_id,
2376 SessionHistoryQuery { offset, limit },
2377 )
2378 .await
2379 .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))
2380 }
2381
2382 #[allow(dead_code)]
2383 pub(crate) async fn runtime_state_for_session(
2384 &self,
2385 session_id_str: &str,
2386 ) -> Result<Option<meerkat_runtime::RuntimeState>, MobRuntimeError> {
2387 if session_id_str.trim().is_empty() {
2388 return Err(MobRuntimeError::InvalidInput(
2389 "session_id must not be empty",
2390 ));
2391 }
2392 let Some(session_service) = self.session_service.as_ref() else {
2393 return Ok(None);
2394 };
2395 let Some(runtime_adapter) = session_service.runtime_adapter() else {
2396 return Ok(None);
2397 };
2398 let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2399 .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2400 let state = meerkat_runtime::service_ext::SessionServiceRuntimeExt::runtime_state(
2401 runtime_adapter.as_ref(),
2402 &session_id,
2403 )
2404 .await
2405 .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))?;
2406 Ok(Some(state))
2407 }
2408
2409 #[allow(dead_code)]
2410 pub(crate) async fn comms_runtime_for_session(
2411 &self,
2412 session_id_str: &str,
2413 ) -> Result<Option<Arc<dyn CommsRuntime>>, MobRuntimeError> {
2414 if session_id_str.trim().is_empty() {
2415 return Err(MobRuntimeError::InvalidInput(
2416 "session_id must not be empty",
2417 ));
2418 }
2419 let Some(session_service) = self.session_service.as_ref() else {
2420 return Ok(None);
2421 };
2422 let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2423 .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2424 Ok(
2425 meerkat_core::service::SessionServiceCommsExt::comms_runtime(
2426 session_service.as_ref(),
2427 &session_id,
2428 )
2429 .await,
2430 )
2431 }
2432
2433 #[allow(dead_code)]
2434 pub(crate) async fn active_input_ids_for_session(
2435 &self,
2436 session_id_str: &str,
2437 ) -> Result<Option<Vec<String>>, MobRuntimeError> {
2438 if session_id_str.trim().is_empty() {
2439 return Err(MobRuntimeError::InvalidInput(
2440 "session_id must not be empty",
2441 ));
2442 }
2443 let Some(session_service) = self.session_service.as_ref() else {
2444 return Ok(None);
2445 };
2446 let Some(runtime_adapter) = session_service.runtime_adapter() else {
2447 return Ok(None);
2448 };
2449 let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2450 .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2451 let input_ids = meerkat_runtime::service_ext::SessionServiceRuntimeExt::list_active_inputs(
2452 runtime_adapter.as_ref(),
2453 &session_id,
2454 )
2455 .await
2456 .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))?;
2457 Ok(Some(
2458 input_ids.into_iter().map(|id| id.to_string()).collect(),
2459 ))
2460 }
2461
2462 #[allow(dead_code)]
2463 pub(crate) async fn ensure_comms_drain_for_session(
2464 &self,
2465 session_id_str: &str,
2466 ) -> Result<Option<bool>, MobRuntimeError> {
2467 if session_id_str.trim().is_empty() {
2468 return Err(MobRuntimeError::InvalidInput(
2469 "session_id must not be empty",
2470 ));
2471 }
2472 let Some(session_service) = self.session_service.as_ref() else {
2473 return Ok(None);
2474 };
2475 let Some(runtime_adapter) = session_service.runtime_adapter() else {
2476 return Ok(None);
2477 };
2478 let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2479 .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2480 let comms_runtime = meerkat_core::service::SessionServiceCommsExt::comms_runtime(
2481 session_service.as_ref(),
2482 &session_id,
2483 )
2484 .await;
2485 if let Some(comms) = comms_runtime {
2486 let _handle = meerkat_runtime::comms_drain::spawn_comms_drain(
2487 runtime_adapter.clone(),
2488 session_id,
2489 comms,
2490 None,
2491 );
2492 Ok(Some(true))
2493 } else {
2494 Ok(Some(false))
2495 }
2496 }
2497
2498 pub fn session_service(&self) -> Option<&Arc<dyn MobSessionService>> {
2504 self.session_service.as_ref()
2505 }
2506
2507 pub fn binary_blob_store(&self) -> Option<Arc<dyn BinaryBlobStore>> {
2508 self.binary_blob_store.clone()
2509 }
2510}
2511
2512pub fn member_entry_to_json(entry: &meerkat_mob::runtime::MobMemberListEntry) -> serde_json::Value {
2519 serde_json::to_value(entry).unwrap_or(serde_json::Value::Null)
2520}
2521
2522pub fn content_input_has_images(content: &meerkat_core::ContentInput) -> bool {
2523 match content {
2524 meerkat_core::ContentInput::Text(_) => false,
2525 meerkat_core::ContentInput::Blocks(blocks) => blocks
2526 .iter()
2527 .any(|block| matches!(block, meerkat_core::ContentBlock::Image { .. })),
2528 }
2529}
2530
2531pub fn model_capabilities_for_model(
2532 provider: Provider,
2533 model: &str,
2534) -> crate::runtime::ConsoleModelCapabilities {
2535 let image_input = meerkat_core::model_profile::profile_for(provider, model)
2536 .map(|profile| profile.vision)
2537 .unwrap_or(false);
2538 crate::runtime::ConsoleModelCapabilities { image_input }
2539}
2540
2541pub fn model_capabilities_for_profile(
2542 profile: &Profile,
2543) -> crate::runtime::ConsoleModelCapabilities {
2544 let image_input = Provider::infer_from_model(&profile.model)
2545 .and_then(|provider| meerkat_core::model_profile::profile_for(provider, &profile.model))
2546 .map(|profile| profile.vision)
2547 .unwrap_or(false);
2548 crate::runtime::ConsoleModelCapabilities { image_input }
2549}
2550
2551pub fn model_capabilities_for_role(
2552 definition: &MobDefinition,
2553 role: &str,
2554) -> crate::runtime::ConsoleModelCapabilities {
2555 let profile_name = ProfileName::from(role);
2556 definition
2557 .resolve_inline_profile(&profile_name)
2558 .map(model_capabilities_for_profile)
2559 .unwrap_or(crate::runtime::ConsoleModelCapabilities { image_input: false })
2560}
2561
2562pub fn model_capabilities_for_member_entry(
2563 definition: &MobDefinition,
2564 entry: &meerkat_mob::runtime::MobMemberListEntry,
2565) -> crate::runtime::ConsoleModelCapabilities {
2566 model_capabilities_for_role(definition, entry.role.as_str())
2567}
2568
2569pub async fn model_capabilities_for_member(
2570 handle: &MobHandle,
2571 session_service: Option<&Arc<dyn MobSessionService>>,
2572 member_id: &meerkat_mob::ids::MeerkatId,
2573) -> crate::runtime::ConsoleModelCapabilities {
2574 if let Some(service) = session_service
2575 && let Some(session_id) = handle.resolve_bridge_session_id(member_id).await
2576 && let Ok(view) = service.read(&session_id).await
2577 {
2578 return model_capabilities_for_model(view.state.provider, &view.state.model);
2579 }
2580
2581 handle
2582 .get_member(member_id)
2583 .await
2584 .map(|member| model_capabilities_for_role(handle.definition(), member.role.as_str()))
2585 .unwrap_or(crate::runtime::ConsoleModelCapabilities { image_input: false })
2586}
2587
2588pub async fn assert_member_accepts_images(
2589 handle: &MobHandle,
2590 session_service: Option<&Arc<dyn MobSessionService>>,
2591 member_id: &str,
2592 content: &meerkat_core::ContentInput,
2593) -> Result<(), MobRuntimeError> {
2594 if !content_input_has_images(content) {
2595 return Ok(());
2596 }
2597 let mid = meerkat_mob::ids::MeerkatId::from(member_id);
2598 let Some(member) = handle.get_member(&mid).await else {
2599 return Err(MobRuntimeError::InvalidInput("member not found"));
2600 };
2601 let caps = model_capabilities_for_member(handle, session_service, &member.agent_identity).await;
2602 if caps.image_input {
2603 Ok(())
2604 } else {
2605 Err(MobRuntimeError::InvalidInput(
2606 "target member model cannot accept image input",
2607 ))
2608 }
2609}
2610
2611pub async fn send_message_on_mob(
2621 handle: &MobHandle,
2622 member_id: &str,
2623 content: impl Into<meerkat_core::ContentInput>,
2624) -> Result<String, MobRuntimeError> {
2625 send_message_on_mob_with_mode(
2626 handle,
2627 member_id,
2628 content,
2629 meerkat_core::types::HandlingMode::Queue,
2630 )
2631 .await
2632}
2633
2634pub async fn send_message_on_mob_with_mode(
2637 handle: &MobHandle,
2638 member_id: &str,
2639 content: impl Into<meerkat_core::ContentInput>,
2640 handling_mode: meerkat_core::types::HandlingMode,
2641) -> Result<String, MobRuntimeError> {
2642 if member_id.trim().is_empty() {
2643 return Err(MobRuntimeError::InvalidInput("member_id must not be empty"));
2644 }
2645 let content = content.into();
2646 let is_empty = match &content {
2647 meerkat_core::ContentInput::Text(s) => s.trim().is_empty(),
2648 meerkat_core::ContentInput::Blocks(blocks) => blocks.is_empty(),
2649 };
2650 if is_empty {
2651 return Err(MobRuntimeError::InvalidInput("content must not be empty"));
2652 }
2653 let mid = meerkat_mob::ids::MeerkatId::from(member_id);
2654 let _receipt = handle
2655 .member(&mid)
2656 .await?
2657 .send(content, handling_mode)
2658 .await?;
2659 let session_id = handle
2660 .resolve_bridge_session_id(&mid)
2661 .await
2662 .ok_or_else(|| {
2663 MobRuntimeError::Mob(MobError::Internal(
2664 "member has no bridge session after send".to_string(),
2665 ))
2666 })?;
2667 Ok(session_id.to_string())
2668}
2669
2670#[cfg(test)]
2671#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2672mod tests {
2673 use super::*;
2674
2675 #[test]
2676 fn delegate_tool_schema_exposes_idle_retire_secs() {
2677 let tool = meerkat_core::types::ToolDef::new(
2678 "delegate",
2679 "Delegate work",
2680 serde_json::json!({
2681 "type": "object",
2682 "properties": {
2683 "task": {"type": "string"}
2684 },
2685 "required": ["task"]
2686 }),
2687 );
2688
2689 let patched = delegate_tool_def_with_idle_retire_secs(&tool);
2690 let idle_retire_secs = &patched.input_schema["properties"]["idle_retire_secs"];
2691
2692 assert!(patched.description.contains("IDLE RETIREMENT:"));
2693 assert_eq!(idle_retire_secs["anyOf"][0]["type"], "integer");
2694 assert_eq!(idle_retire_secs["anyOf"][0]["minimum"], 0);
2695 assert_eq!(idle_retire_secs["anyOf"][1]["type"], "null");
2696 }
2697
2698 #[test]
2699 fn delegate_idle_retire_secs_arg_is_stripped_and_parsed() {
2700 let mut args = serde_json::json!({
2701 "task": "inspect",
2702 "idle_retire_secs": 42
2703 });
2704
2705 let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
2706 .expect("valid idle retire arg");
2707
2708 assert_eq!(parsed, Some(DelegateIdleRetireOverride::Seconds(42)));
2709 assert!(args.get("idle_retire_secs").is_none());
2710 }
2711
2712 #[test]
2713 fn delegate_idle_retire_secs_null_disables_member_retirement() {
2714 let mut args = serde_json::json!({
2715 "task": "inspect",
2716 "idle_retire_secs": null
2717 });
2718
2719 let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
2720 .expect("valid idle retire arg");
2721
2722 assert_eq!(parsed, Some(DelegateIdleRetireOverride::Disabled));
2723 assert!(args.get("idle_retire_secs").is_none());
2724 }
2725
2726 #[test]
2727 fn delegate_idle_retire_secs_omitted_inherits_runtime_default() {
2728 let mut args = serde_json::json!({"task": "inspect"});
2729
2730 let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
2731 .expect("omitted idle retire arg");
2732
2733 assert_eq!(parsed, None);
2734 assert_eq!(args, serde_json::json!({"task": "inspect"}));
2735 }
2736
2737 #[test]
2738 fn delegate_idle_retire_secs_rejects_negative_or_fractional_values() {
2739 let mut negative = serde_json::json!({"task": "inspect", "idle_retire_secs": -1});
2740 let mut fractional = serde_json::json!({"task": "inspect", "idle_retire_secs": 1.5});
2741
2742 assert!(delegate_idle_retire_override_from_args("delegate", &mut negative).is_err());
2743 assert!(delegate_idle_retire_override_from_args("delegate", &mut fractional).is_err());
2744 }
2745
2746 #[tokio::test]
2747 async fn implicit_delegate_retirement_overrides_round_trip_per_member() {
2748 let overrides = ImplicitDelegateRetirementOverrides::default();
2749
2750 overrides
2751 .set("mob-a", "worker-1", DelegateIdleRetireOverride::Seconds(12))
2752 .await;
2753 overrides
2754 .set("mob-a", "worker-2", DelegateIdleRetireOverride::Disabled)
2755 .await;
2756
2757 assert_eq!(
2758 overrides.get("mob-a", "worker-1").await,
2759 Some(DelegateIdleRetireOverride::Seconds(12))
2760 );
2761 assert_eq!(
2762 overrides.get("mob-a", "worker-2").await,
2763 Some(DelegateIdleRetireOverride::Disabled)
2764 );
2765 assert_eq!(overrides.get("mob-a", "worker-3").await, None);
2766 }
2767
2768 #[test]
2769 fn image_generation_substrate_defaults_off_for_inline_profiles() {
2770 let definition = meerkat_mob::MobDefinition::from_toml(
2771 r#"
2772[mob]
2773id = "test"
2774
2775[profiles.worker]
2776model = "gpt-5.5"
2777
2778[profiles.worker.tools]
2779builtins = true
2780"#,
2781 )
2782 .unwrap_or_else(|e| panic!("{e}"));
2783
2784 assert!(
2785 !mob_definition_may_use_image_generation(&definition),
2786 "inline profiles should not wire the image substrate unless a profile opts in"
2787 );
2788 }
2789
2790 #[test]
2791 fn image_generation_substrate_follows_profile_tool_config() {
2792 let definition = meerkat_mob::MobDefinition::from_toml(
2793 r#"
2794[mob]
2795id = "test"
2796
2797[profiles.commander]
2798model = "gpt-5.5"
2799
2800[profiles.commander.tools]
2801builtins = true
2802image_generation = true
2803
2804[profiles.investigator]
2805model = "gpt-5.5"
2806
2807[profiles.investigator.tools]
2808builtins = true
2809image_generation = false
2810"#,
2811 )
2812 .unwrap_or_else(|e| panic!("{e}"));
2813
2814 let commander = definition.profiles["commander"].as_inline().unwrap();
2815 let investigator = definition.profiles["investigator"].as_inline().unwrap();
2816 assert!(commander.tools.image_generation);
2817 assert!(!investigator.tools.image_generation);
2818 assert!(
2819 mob_definition_may_use_image_generation(&definition),
2820 "one opt-in profile is enough to wire substrate; Meerkat gates visibility per profile"
2821 );
2822 }
2823
2824 #[test]
2825 fn image_generation_profiles_can_disable_builtins_with_meerkat_062() {
2826 let definition = meerkat_mob::MobDefinition::from_toml(
2827 r#"
2828[mob]
2829id = "test"
2830
2831[profiles.commander]
2832model = "gpt-5.5"
2833
2834[profiles.commander.tools]
2835builtins = false
2836image_generation = true
2837"#,
2838 )
2839 .unwrap_or_else(|e| panic!("{e}"));
2840
2841 let commander = definition.profiles["commander"].as_inline().unwrap();
2842 assert!(!commander.tools.builtins);
2843 assert!(commander.tools.image_generation);
2844 assert!(
2845 mob_definition_may_use_image_generation(&definition),
2846 "image generation now has its own Meerkat tool gate"
2847 );
2848 }
2849
2850 #[test]
2851 fn image_generation_substrate_is_conservative_for_realm_profile_refs() {
2852 let definition = meerkat_mob::MobDefinition::from_toml(
2853 r#"
2854[mob]
2855id = "test"
2856
2857[profiles.worker]
2858realm_profile = "worker-v2"
2859"#,
2860 )
2861 .unwrap_or_else(|e| panic!("{e}"));
2862
2863 assert!(
2864 mob_definition_may_use_image_generation(&definition),
2865 "realm profiles resolve at spawn time, so MobKit wires substrate and lets Meerkat enforce profile policy"
2866 );
2867 }
2868
2869 #[test]
2870 fn sanitize_llm_request_drops_replay_unsafe_server_tool_blocks() {
2871 let request = meerkat_client::LlmRequest::new(
2872 "gpt-5.5",
2873 vec![meerkat_core::Message::BlockAssistant(
2874 meerkat_core::BlockAssistantMessage::new(
2875 vec![
2876 meerkat_core::AssistantBlock::Text {
2877 text: "done".to_string(),
2878 meta: None,
2879 },
2880 meerkat_core::AssistantBlock::ServerToolContent {
2881 id: Some("ws-stream".to_string()),
2882 name: "web_search".to_string(),
2883 content: serde_json::json!({
2884 "type": "response.web_search_call.searching",
2885 "item_id": "ws_123"
2886 }),
2887 meta: None,
2888 },
2889 meerkat_core::AssistantBlock::ServerToolContent {
2890 id: Some("ws_123".to_string()),
2891 name: "web_search_call".to_string(),
2892 content: serde_json::json!({
2893 "type": "web_search_call",
2894 "id": "ws_123",
2895 "status": "completed"
2896 }),
2897 meta: None,
2898 },
2899 meerkat_core::AssistantBlock::ServerToolContent {
2900 id: None,
2901 name: "web_search_annotations".to_string(),
2902 content: serde_json::json!({
2903 "type": "message_annotations",
2904 "annotations": []
2905 }),
2906 meta: None,
2907 },
2908 ],
2909 meerkat_core::StopReason::EndTurn,
2910 ),
2911 )],
2912 );
2913
2914 let sanitized = sanitize_llm_request_for_stateless_replay(&request);
2915 let meerkat_core::Message::BlockAssistant(assistant) = &sanitized.messages[0] else {
2916 panic!("expected block assistant");
2917 };
2918
2919 assert_eq!(assistant.blocks.len(), 2);
2920 assert!(matches!(
2921 assistant.blocks[0],
2922 meerkat_core::AssistantBlock::Text { .. }
2923 ));
2924 assert!(matches!(
2925 assistant.blocks[1],
2926 meerkat_core::AssistantBlock::ServerToolContent { ref name, .. }
2927 if name == "web_search_call"
2928 ));
2929 }
2930
2931 #[test]
2932 fn sanitize_llm_request_preserves_generated_images_for_meerkat_062() {
2933 let request = meerkat_client::LlmRequest::new(
2934 "gpt-5.5",
2935 vec![meerkat_core::Message::BlockAssistant(
2936 meerkat_core::BlockAssistantMessage::new(
2937 vec![
2938 meerkat_core::AssistantBlock::Text {
2939 text: "visible".to_string(),
2940 meta: None,
2941 },
2942 generated_image_block_for_test(),
2943 ],
2944 meerkat_core::StopReason::EndTurn,
2945 ),
2946 )],
2947 );
2948
2949 let sanitized = sanitize_llm_request_for_stateless_replay(&request);
2950
2951 let meerkat_core::Message::BlockAssistant(original_assistant) = &request.messages[0] else {
2952 panic!("expected original block assistant");
2953 };
2954 assert!(
2955 original_assistant
2956 .blocks
2957 .iter()
2958 .any(|block| matches!(block, meerkat_core::AssistantBlock::Image { .. })),
2959 "request-view sanitization must not rewrite canonical caller-owned messages"
2960 );
2961
2962 let meerkat_core::Message::BlockAssistant(sanitized_assistant) = &sanitized.messages[0]
2963 else {
2964 panic!("expected sanitized block assistant");
2965 };
2966 assert!(
2967 sanitized_assistant
2968 .blocks
2969 .iter()
2970 .any(|block| matches!(block, meerkat_core::AssistantBlock::Image { .. })),
2971 "Meerkat 0.6.2 owns provider replay projection for generated images"
2972 );
2973 }
2974
2975 #[derive(Default)]
2976 struct CapturingLlmClient {
2977 projected_messages: std::sync::Mutex<Vec<meerkat_core::Message>>,
2978 }
2979
2980 #[async_trait]
2981 impl LlmClient for CapturingLlmClient {
2982 fn project_replay_messages(
2983 &self,
2984 messages: &[meerkat_core::Message],
2985 ) -> Result<Vec<meerkat_core::Message>, meerkat_client::LlmError> {
2986 *self
2987 .projected_messages
2988 .lock()
2989 .unwrap_or_else(std::sync::PoisonError::into_inner) = messages.to_vec();
2990 Ok(messages.to_vec())
2991 }
2992
2993 fn stream<'a>(&'a self, _request: &'a LlmRequest) -> LlmStream<'a> {
2994 Box::pin(futures::stream::iter([Ok(
2995 meerkat_client::LlmEvent::Done {
2996 outcome: meerkat_client::LlmDoneOutcome::Success {
2997 stop_reason: meerkat_core::StopReason::EndTurn,
2998 },
2999 },
3000 )]))
3001 }
3002
3003 fn provider(&self) -> &'static str {
3004 "openai"
3005 }
3006
3007 async fn health_check(&self) -> Result<(), meerkat_client::LlmError> {
3008 Ok(())
3009 }
3010 }
3011
3012 #[test]
3013 fn replay_sanitizing_llm_client_delegates_provider_projection() {
3014 let capture = Arc::new(CapturingLlmClient::default());
3015 let inner: Arc<dyn LlmClient> = capture.clone();
3016 let wrapped = ReplaySanitizingLlmClient::new(inner);
3017 let messages = vec![meerkat_core::Message::BlockAssistant(
3018 meerkat_core::BlockAssistantMessage::new(
3019 vec![
3020 meerkat_core::AssistantBlock::Text {
3021 text: "visible".to_string(),
3022 meta: None,
3023 },
3024 meerkat_core::AssistantBlock::ServerToolContent {
3025 id: Some("ws-stream".to_string()),
3026 name: "web_search".to_string(),
3027 content: serde_json::json!({
3028 "type": "response.web_search_call.searching",
3029 "item_id": "ws_123"
3030 }),
3031 meta: None,
3032 },
3033 ],
3034 meerkat_core::StopReason::EndTurn,
3035 ),
3036 )];
3037
3038 let projected = wrapped
3039 .project_replay_messages(&messages)
3040 .expect("wrapped client should delegate provider projection");
3041
3042 let seen = capture
3043 .projected_messages
3044 .lock()
3045 .unwrap_or_else(std::sync::PoisonError::into_inner)
3046 .clone();
3047 let meerkat_core::Message::BlockAssistant(assistant) = &seen[0] else {
3048 panic!("expected block assistant");
3049 };
3050 assert_eq!(
3051 assistant.blocks.len(),
3052 1,
3053 "MobKit sanitization must happen before Meerkat provider projection"
3054 );
3055 assert!(matches!(
3056 assistant.blocks[0],
3057 meerkat_core::AssistantBlock::Text { .. }
3058 ));
3059 assert_eq!(
3060 serde_json::to_value(&projected).expect("projected messages serialize"),
3061 serde_json::to_value(&seen).expect("seen messages serialize")
3062 );
3063 }
3064
3065 #[derive(Default)]
3066 struct CapturingAgentLlmClient {
3067 seen_messages: std::sync::Mutex<Vec<meerkat_core::Message>>,
3068 }
3069
3070 #[async_trait]
3071 impl meerkat_core::AgentLlmClient for CapturingAgentLlmClient {
3072 async fn stream_response(
3073 &self,
3074 messages: &[meerkat_core::Message],
3075 _tools: &[Arc<meerkat_core::ToolDef>],
3076 _max_tokens: u32,
3077 _temperature: Option<f32>,
3078 _provider_params: Option<
3079 &meerkat_core::lifecycle::run_primitive::ProviderParamsOverride,
3080 >,
3081 ) -> Result<meerkat_core::agent::LlmStreamResult, meerkat_core::AgentError> {
3082 *self
3083 .seen_messages
3084 .lock()
3085 .unwrap_or_else(std::sync::PoisonError::into_inner) = messages.to_vec();
3086 Ok(meerkat_core::agent::LlmStreamResult::new(
3087 Vec::new(),
3088 meerkat_core::StopReason::EndTurn,
3089 meerkat_core::Usage::default(),
3090 ))
3091 }
3092
3093 fn provider(&self) -> &'static str {
3094 "openai"
3095 }
3096
3097 fn model(&self) -> &'static str {
3098 "gpt-5.5"
3099 }
3100 }
3101
3102 #[tokio::test]
3103 async fn sanitize_agent_llm_client_drops_replay_unsafe_server_tool_blocks() {
3104 let capture = Arc::new(CapturingAgentLlmClient::default());
3105 let inner: Arc<dyn meerkat_core::AgentLlmClient> = capture.clone();
3106 let wrapped = ReplaySanitizingAgentLlmClient::wrap(inner);
3107 let messages = vec![meerkat_core::Message::BlockAssistant(
3108 meerkat_core::BlockAssistantMessage::new(
3109 vec![
3110 meerkat_core::AssistantBlock::Text {
3111 text: "visible".to_string(),
3112 meta: None,
3113 },
3114 meerkat_core::AssistantBlock::ServerToolContent {
3115 id: Some("ws-stream".to_string()),
3116 name: "web_search".to_string(),
3117 content: serde_json::json!({
3118 "type": "response.web_search_call.searching",
3119 "item_id": "ws_123"
3120 }),
3121 meta: None,
3122 },
3123 meerkat_core::AssistantBlock::ServerToolContent {
3124 id: Some("ok".to_string()),
3125 name: "web_search_call".to_string(),
3126 content: serde_json::json!({
3127 "type": "web_search_call",
3128 "id": "ws_123",
3129 "status": "completed"
3130 }),
3131 meta: None,
3132 },
3133 ],
3134 meerkat_core::StopReason::EndTurn,
3135 ),
3136 )];
3137 let tools: Vec<Arc<meerkat_core::ToolDef>> = Vec::new();
3138
3139 wrapped
3140 .stream_response(&messages, &tools, 512, None, None)
3141 .await
3142 .expect("wrapped client should delegate");
3143
3144 let seen = capture
3145 .seen_messages
3146 .lock()
3147 .unwrap_or_else(std::sync::PoisonError::into_inner)
3148 .clone();
3149 let meerkat_core::Message::BlockAssistant(assistant) = &seen[0] else {
3150 panic!("expected block assistant");
3151 };
3152 assert_eq!(assistant.blocks.len(), 2);
3153 assert!(matches!(
3154 assistant.blocks[0],
3155 meerkat_core::AssistantBlock::Text { .. }
3156 ));
3157 assert!(matches!(
3158 assistant.blocks[1],
3159 meerkat_core::AssistantBlock::ServerToolContent { ref name, .. }
3160 if name == "web_search_call"
3161 ));
3162 }
3163
3164 fn generated_image_block_for_test() -> meerkat_core::AssistantBlock {
3165 serde_json::from_value(serde_json::json!({
3166 "block_type": "image",
3167 "data": {
3168 "image_id": "00000000-0000-0000-0000-000000000051",
3169 "blob_ref": {
3170 "blob_id": "sha256:test-generated-image",
3171 "media_type": "image/png"
3172 },
3173 "media_type": "image/png",
3174 "width": 1024,
3175 "height": 1024,
3176 "revised_prompt": { "disposition": "not_requested" },
3177 "meta": { "provider": "not_emitted" }
3178 }
3179 }))
3180 .expect("test image block should deserialize")
3181 }
3182
3183 #[test]
3184 fn sanitize_message_preserves_assistant_image_blocks() {
3185 let message =
3186 meerkat_core::Message::BlockAssistant(meerkat_core::BlockAssistantMessage::new(
3187 vec![
3188 meerkat_core::AssistantBlock::Text {
3189 text: "Here is the image.".to_string(),
3190 meta: None,
3191 },
3192 generated_image_block_for_test(),
3193 ],
3194 meerkat_core::StopReason::EndTurn,
3195 ));
3196
3197 let sanitized = sanitize_message_for_stateless_replay(message);
3198 let meerkat_core::Message::BlockAssistant(assistant) = sanitized else {
3199 panic!("expected block assistant");
3200 };
3201
3202 assert_eq!(assistant.blocks.len(), 2);
3203 assert!(matches!(
3204 assistant.blocks[0],
3205 meerkat_core::AssistantBlock::Text { .. }
3206 ));
3207 assert!(
3208 matches!(
3209 assistant.blocks[1],
3210 meerkat_core::AssistantBlock::Image { .. }
3211 ),
3212 "generated image blocks should reach Meerkat's provider projection"
3213 );
3214 }
3215
3216 #[test]
3219 fn persistent_with_hook_wraps_session_service() {
3220 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3221 let store_path = dir.path().to_path_buf();
3222 let Ok(sqlite) = meerkat_store::SqliteSessionStore::open(store_path.join("sessions.db"))
3223 else {
3224 panic!("failed to open sqlite session store");
3225 };
3226 let session_store: Arc<dyn SessionStore> = Arc::new(sqlite);
3227 let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
3228 panic!("failed to parse minimal mob definition");
3229 };
3230
3231 let hook_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
3232 let hook_called_clone = hook_called.clone();
3233
3234 let spec = MobBootstrapSpec::persistent_with_hook(
3235 definition,
3236 meerkat_mob::MobStorage::in_memory(),
3237 store_path.clone(),
3238 4,
3239 session_store,
3240 move |_req: &mut CreateSessionRequest| {
3241 hook_called_clone.store(true, std::sync::atomic::Ordering::Relaxed);
3242 Box::pin(async { Ok(()) })
3243 },
3244 );
3245
3246 assert!(
3254 spec.runtime_adapter.is_some(),
3255 "persistent_with_hook must provide a runtime adapter via spec.runtime_adapter"
3256 );
3257 assert!(
3258 spec.session_service.runtime_adapter().is_some(),
3259 "session service must own a runtime_store so archive/retire don't \
3260 hit the store-only-projection rejection in meerkat-session"
3261 );
3262 assert!(
3263 store_path.join("runtime.sqlite").exists(),
3264 "persistent_inner must open a SqliteRuntimeStore at <store_path>/runtime.sqlite"
3265 );
3266
3267 assert!(
3272 !hook_called.load(std::sync::atomic::Ordering::Relaxed),
3273 "hook must not be called before create_session"
3274 );
3275 }
3276
3277 #[test]
3279 fn ephemeral_with_hook_creates_spec() {
3280 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3281 let store_path = dir.path().to_path_buf();
3282 let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
3283 panic!("failed to parse minimal mob definition");
3284 };
3285
3286 let hook_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
3287 let hook_called_clone = hook_called.clone();
3288
3289 let spec = MobBootstrapSpec::ephemeral_with_hook(
3290 definition,
3291 meerkat_mob::MobStorage::in_memory(),
3292 store_path,
3293 4,
3294 None,
3295 move |_req: &mut CreateSessionRequest| {
3296 hook_called_clone.store(true, std::sync::atomic::Ordering::Relaxed);
3297 Box::pin(async { Ok(()) })
3298 },
3299 );
3300
3301 assert!(spec.runtime_adapter.is_none());
3303
3304 assert!(
3306 !hook_called.load(std::sync::atomic::Ordering::Relaxed),
3307 "hook must not be called before create_session"
3308 );
3309 }
3310
3311 #[tokio::test]
3315 async fn pre_build_hook_mutates_create_session_request() {
3316 use std::sync::Mutex;
3317
3318 let captured = Arc::new(Mutex::new(None::<(String, Option<String>)>));
3319 let captured_clone = captured.clone();
3320
3321 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3323 let factory = AgentFactory::new(dir.path()).builtins(true);
3324 let config = Config::default();
3325 let builder = FactoryAgentBuilder::new(factory, config);
3326 let inner: Arc<dyn MobSessionService> =
3327 Arc::new(meerkat_session::EphemeralSessionService::new(builder, 4));
3328
3329 let hook: PreBuildHook = Arc::new(move |req: &mut CreateSessionRequest| {
3331 req.model = "hooked-model".to_string();
3332 req.system_prompt = Some("injected-prompt".to_string());
3333 let labels = req.labels.get_or_insert_with(Default::default);
3334 labels.insert("hook_label".to_string(), "hook_value".to_string());
3335 let mut lock = captured_clone
3337 .lock()
3338 .unwrap_or_else(std::sync::PoisonError::into_inner);
3339 *lock = Some((req.model.clone(), req.system_prompt.clone()));
3340 Box::pin(async { Ok(()) })
3341 });
3342 let wrapped = PreBuildMobSessionService {
3343 inner,
3344 hook,
3345 after_create_hook: None,
3346 runtime_adapter_override: None,
3347 };
3348
3349 let req = CreateSessionRequest {
3350 model: "original-model".to_string(),
3351 prompt: meerkat_core::ContentInput::Text("test".to_string()),
3352 render_metadata: None,
3353 system_prompt: None,
3354 max_tokens: None,
3355 event_tx: None,
3356 skill_references: None,
3357 initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
3358 build: None,
3359 labels: None,
3360 deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
3361 };
3362
3363 let _ = meerkat_core::service::SessionService::create_session(&wrapped, req).await;
3365
3366 let (model, prompt) = captured
3367 .lock()
3368 .unwrap_or_else(std::sync::PoisonError::into_inner)
3369 .clone()
3370 .expect("hook must have been called");
3371 assert_eq!(model, "hooked-model", "hook must mutate the model");
3372 assert_eq!(
3373 prompt.as_deref(),
3374 Some("injected-prompt"),
3375 "hook must set the system prompt"
3376 );
3377 }
3378
3379 #[derive(Default)]
3380 struct ForwardingProbe {
3381 calls: Mutex<Vec<&'static str>>,
3382 }
3383
3384 impl ForwardingProbe {
3385 fn record(&self, call: &'static str) {
3386 self.calls
3387 .lock()
3388 .unwrap_or_else(std::sync::PoisonError::into_inner)
3389 .push(call);
3390 }
3391
3392 fn calls(&self) -> Vec<&'static str> {
3393 self.calls
3394 .lock()
3395 .unwrap_or_else(std::sync::PoisonError::into_inner)
3396 .clone()
3397 }
3398 }
3399
3400 #[async_trait]
3401 impl meerkat_core::service::SessionService for ForwardingProbe {
3402 async fn create_session(
3403 &self,
3404 _req: CreateSessionRequest,
3405 ) -> Result<meerkat_core::types::RunResult, SessionError> {
3406 Err(SessionError::Unsupported("create_session".to_string()))
3407 }
3408
3409 async fn start_turn(
3410 &self,
3411 _id: &meerkat_core::types::SessionId,
3412 _req: meerkat_core::service::StartTurnRequest,
3413 ) -> Result<meerkat_core::types::RunResult, SessionError> {
3414 Err(SessionError::Unsupported("start_turn".to_string()))
3415 }
3416
3417 async fn interrupt(
3418 &self,
3419 _id: &meerkat_core::types::SessionId,
3420 ) -> Result<(), SessionError> {
3421 self.record("interrupt");
3422 Ok(())
3423 }
3424
3425 async fn read(
3426 &self,
3427 id: &meerkat_core::types::SessionId,
3428 ) -> Result<meerkat_core::service::SessionView, SessionError> {
3429 Err(SessionError::NotFound { id: id.clone() })
3430 }
3431
3432 async fn list(
3433 &self,
3434 _query: meerkat_core::service::SessionQuery,
3435 ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
3436 Ok(Vec::new())
3437 }
3438
3439 async fn archive(&self, _id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
3440 self.record("archive");
3441 Ok(())
3442 }
3443 }
3444
3445 #[async_trait]
3446 impl meerkat_core::service::SessionServiceCommsExt for ForwardingProbe {}
3447
3448 #[async_trait]
3449 impl meerkat_core::service::SessionServiceControlExt for ForwardingProbe {
3450 async fn append_system_context(
3451 &self,
3452 _id: &meerkat_core::types::SessionId,
3453 _req: meerkat_core::service::AppendSystemContextRequest,
3454 ) -> Result<
3455 meerkat_core::service::AppendSystemContextResult,
3456 meerkat_core::service::SessionControlError,
3457 > {
3458 self.record("append_system_context");
3459 Ok(meerkat_core::service::AppendSystemContextResult {
3460 status: meerkat_core::service::AppendSystemContextStatus::Applied,
3461 })
3462 }
3463
3464 async fn stage_tool_results(
3465 &self,
3466 _id: &meerkat_core::types::SessionId,
3467 _req: meerkat_core::service::StageToolResultsRequest,
3468 ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
3469 self.record("stage_tool_results");
3470 Ok(meerkat_core::service::StageToolResultsResult {
3471 accepted_result_count: 7,
3472 })
3473 }
3474 }
3475
3476 #[async_trait]
3477 impl meerkat_core::service::SessionServiceHistoryExt for ForwardingProbe {
3478 async fn read_history(
3479 &self,
3480 id: &meerkat_core::types::SessionId,
3481 _query: meerkat_core::service::SessionHistoryQuery,
3482 ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
3483 Err(SessionError::NotFound { id: id.clone() })
3484 }
3485 }
3486
3487 #[async_trait]
3488 impl MobSessionService for ForwardingProbe {
3489 fn supports_persistent_sessions(&self) -> bool {
3490 true
3491 }
3492
3493 fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
3494 Some(Arc::new(meerkat_runtime::MeerkatMachine::ephemeral()))
3495 }
3496
3497 async fn archive_with_mob_lifecycle_authority(
3498 &self,
3499 _session_id: &meerkat_core::types::SessionId,
3500 ) -> Result<(), SessionError> {
3501 self.record("archive_with_mob_lifecycle_authority");
3502 Ok(())
3503 }
3504 }
3505
3506 #[tokio::test]
3507 async fn pre_build_wrapper_forwards_mob_authority_and_control_extensions() {
3508 let probe = Arc::new(ForwardingProbe::default());
3509 let inner: Arc<dyn MobSessionService> = probe.clone();
3510 let wrapped = PreBuildMobSessionService {
3511 inner,
3512 hook: no_op_pre_build_hook(),
3513 after_create_hook: None,
3514 runtime_adapter_override: Some(Arc::new(meerkat_runtime::MeerkatMachine::ephemeral())),
3515 };
3516 let session_id = meerkat_core::types::SessionId::new();
3517
3518 MobSessionService::archive_with_mob_lifecycle_authority(&wrapped, &session_id)
3519 .await
3520 .expect("archive_with_mob_lifecycle_authority should forward to inner service");
3521 let staged = meerkat_core::service::SessionServiceControlExt::stage_tool_results(
3522 &wrapped,
3523 &session_id,
3524 meerkat_core::service::StageToolResultsRequest {
3525 results: Vec::new(),
3526 },
3527 )
3528 .await
3529 .expect("stage_tool_results should forward to inner service");
3530
3531 assert_eq!(staged.accepted_result_count, 7);
3532 assert_eq!(
3533 probe.calls(),
3534 vec!["archive_with_mob_lifecycle_authority", "stage_tool_results",]
3535 );
3536 }
3537
3538 #[test]
3558 fn persistent_bootstrap_uses_sqlite_runtime_store() {
3559 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3560 let store_path = dir.path().to_path_buf();
3561 let Ok(sqlite) = meerkat_store::SqliteSessionStore::open(store_path.join("sessions.db"))
3562 else {
3563 panic!("failed to open sqlite session store");
3564 };
3565 let session_store: Arc<dyn SessionStore> = Arc::new(sqlite);
3566 let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
3567 panic!("failed to parse minimal mob definition");
3568 };
3569 let spec = MobBootstrapSpec::persistent(
3570 definition,
3571 meerkat_mob::MobStorage::in_memory(),
3572 store_path.clone(),
3573 4,
3574 session_store,
3575 );
3576 assert!(
3577 spec.runtime_adapter.is_some(),
3578 "persistent bootstrap must provide its own runtime adapter via spec.runtime_adapter"
3579 );
3580 assert!(
3581 spec.session_service.runtime_adapter().is_some(),
3582 "session service must own a runtime_store so archive/retire don't \
3583 hit the store-only-projection rejection"
3584 );
3585 assert!(
3586 store_path.join("runtime.sqlite").exists(),
3587 "persistent_inner must open a SqliteRuntimeStore at <store_path>/runtime.sqlite"
3588 );
3589 }
3590
3591 #[test]
3595 fn ephemeral_runtime_backed_uses_session_service_runtime_adapter() {
3596 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3597 let store_path = dir.path().to_path_buf();
3598 let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
3599 panic!("failed to parse minimal mob definition");
3600 };
3601 let spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
3602 definition,
3603 meerkat_mob::MobStorage::in_memory(),
3604 store_path,
3605 4,
3606 None,
3607 None,
3608 None,
3609 CapabilityFlags::default(),
3610 None,
3611 );
3612 assert!(
3613 spec.runtime_adapter.is_some(),
3614 "ephemeral_runtime_backed_inner must expose the shared runtime authority"
3615 );
3616 assert!(
3617 spec.session_service.runtime_adapter().is_some(),
3618 "session service must still expose a runtime adapter so autonomous-host comms can wire"
3619 );
3620 }
3621
3622 #[tokio::test]
3623 async fn ephemeral_runtime_backed_custom_session_store_persists_created_session() {
3624 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3625 let store_path = dir.path().to_path_buf();
3626 let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
3627 "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\n[profiles.worker.tools]\ncomms = true\n",
3628 ) else {
3629 panic!("failed to parse minimal mob definition");
3630 };
3631 let custom_store: Arc<dyn SessionStore> = Arc::new(meerkat_store::MemoryStore::new());
3632 let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
3633 definition,
3634 meerkat_mob::MobStorage::in_memory(),
3635 store_path,
3636 4,
3637 Some(custom_store.clone()),
3638 None,
3639 None,
3640 CapabilityFlags::default(),
3641 None,
3642 );
3643 spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
3644
3645 let runtime = MobRuntime::bootstrap(spec)
3646 .await
3647 .unwrap_or_else(|e| panic!("{e}"));
3648 let mid = meerkat_mob::ids::MeerkatId::from("worker:one");
3649 runtime
3650 .handle
3651 .spawn_spec(SpawnMemberSpec::new(
3652 ProfileName::from("worker"),
3653 mid.clone(),
3654 ))
3655 .await
3656 .unwrap_or_else(|e| panic!("{e}"));
3657 let session_id = runtime
3658 .handle
3659 .resolve_bridge_session_id(&mid)
3660 .await
3661 .unwrap_or_else(|| panic!("spawned worker has no bridge session id"));
3662
3663 let stored = custom_store
3664 .load(&session_id)
3665 .await
3666 .unwrap_or_else(|e| panic!("{e}"));
3667 assert!(
3668 stored.is_some(),
3669 "ephemeral runtime-backed builds with a custom store must persist through that store"
3670 );
3671 }
3672
3673 #[tokio::test]
3674 async fn ephemeral_runtime_backed_custom_session_store_resumes_after_runtime_restart() {
3675 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3676 let store_path = dir.path().to_path_buf();
3677 let definition_toml = "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\n[profiles.worker.tools]\ncomms = true\n";
3678 let Ok(definition) = meerkat_mob::MobDefinition::from_toml(definition_toml) else {
3679 panic!("failed to parse minimal mob definition");
3680 };
3681 let custom_store: Arc<dyn SessionStore> = Arc::new(meerkat_store::MemoryStore::new());
3682 let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
3683 definition,
3684 meerkat_mob::MobStorage::in_memory(),
3685 store_path.clone(),
3686 4,
3687 Some(custom_store.clone()),
3688 None,
3689 None,
3690 CapabilityFlags::default(),
3691 None,
3692 );
3693 spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
3694
3695 let runtime = MobRuntime::bootstrap(spec)
3696 .await
3697 .unwrap_or_else(|e| panic!("{e}"));
3698 let mid = meerkat_mob::ids::MeerkatId::from("worker:one");
3699 runtime
3700 .handle
3701 .spawn_spec(SpawnMemberSpec::new(
3702 ProfileName::from("worker"),
3703 mid.clone(),
3704 ))
3705 .await
3706 .unwrap_or_else(|e| panic!("{e}"));
3707 let session_id = runtime
3708 .handle
3709 .resolve_bridge_session_id(&mid)
3710 .await
3711 .unwrap_or_else(|| panic!("spawned worker has no bridge session id"));
3712 drop(runtime);
3713
3714 let Ok(definition) = meerkat_mob::MobDefinition::from_toml(definition_toml) else {
3715 panic!("failed to parse minimal mob definition");
3716 };
3717 let mut restarted_spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
3718 definition,
3719 meerkat_mob::MobStorage::in_memory(),
3720 store_path,
3721 4,
3722 Some(custom_store),
3723 None,
3724 None,
3725 CapabilityFlags::default(),
3726 None,
3727 );
3728 restarted_spec.options.default_llm_client =
3729 Some(Arc::new(meerkat_client::TestClient::default()));
3730
3731 let restarted = MobRuntime::bootstrap(restarted_spec)
3732 .await
3733 .unwrap_or_else(|e| panic!("{e}"));
3734 let mut resume_spec = SpawnMemberSpec::new(ProfileName::from("worker"), mid.clone());
3735 resume_spec.launch_mode = meerkat_mob::MemberLaunchMode::Resume {
3736 bridge_session_id: session_id.clone(),
3737 };
3738 restarted
3739 .handle
3740 .spawn_spec(resume_spec)
3741 .await
3742 .unwrap_or_else(|e| panic!("resume should load the external session snapshot: {e}"));
3743
3744 let resumed_session_id = restarted
3745 .handle
3746 .resolve_bridge_session_id(&mid)
3747 .await
3748 .unwrap_or_else(|| panic!("resumed worker has no bridge session id"));
3749 assert_eq!(resumed_session_id, session_id);
3750 }
3751
3752 #[test]
3755 fn ephemeral_bootstrap_without_image_generation_stays_direct() {
3756 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3757 let store_path = dir.path().to_path_buf();
3758 let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
3759 "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\nruntime_mode = \"autonomous_host\"\n[profiles.worker.tools]\ncomms = true\n",
3760 ) else {
3761 panic!("failed to parse definition");
3762 };
3763 let spec = MobBootstrapSpec::ephemeral_inner(
3764 definition,
3765 meerkat_mob::MobStorage::in_memory(),
3766 store_path,
3767 4,
3768 None,
3769 None,
3770 CapabilityFlags::default(),
3771 None,
3772 );
3773 assert!(
3774 spec.runtime_adapter.is_none(),
3775 "public ephemeral builds only need a runtime adapter when the definition may use image generation"
3776 );
3777 }
3778
3779 #[test]
3784 fn ephemeral_bootstrap_with_image_generation_shares_runtime_adapter() {
3785 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3786 let store_path = dir.path().to_path_buf();
3787 let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
3788 r#"
3789[mob]
3790id = "test"
3791
3792[profiles.commander]
3793model = "gpt-5.5"
3794
3795[profiles.commander.tools]
3796builtins = true
3797image_generation = true
3798"#,
3799 ) else {
3800 panic!("failed to parse image-generation definition");
3801 };
3802 let spec = MobBootstrapSpec::ephemeral(
3803 definition,
3804 meerkat_mob::MobStorage::in_memory(),
3805 store_path,
3806 4,
3807 None,
3808 );
3809 let spec_adapter = spec
3810 .runtime_adapter
3811 .as_ref()
3812 .expect("image-generation ephemeral builds must expose a runtime adapter");
3813 let service_adapter = spec
3814 .session_service
3815 .runtime_adapter()
3816 .expect("session service must expose the same runtime adapter");
3817 assert!(
3818 spec_adapter.shares_runtime_persistence_with(&service_adapter),
3819 "image-generation tool state and session state must share one runtime authority"
3820 );
3821 }
3822
3823 #[test]
3826 fn normalize_runtime_turn_request_strips_runtime_owned_semantics() {
3827 let req = meerkat_core::service::StartTurnRequest {
3828 prompt: meerkat_core::ContentInput::Text("checkpoint".to_string()),
3829 system_prompt: Some("system".to_string()),
3830 event_tx: None,
3831 runtime: meerkat_core::service::StartTurnRuntimeSemantics {
3832 render_metadata: Some(meerkat_core::types::RenderMetadata {
3833 class: meerkat_core::types::RenderClass::OpsProgress,
3834 salience: meerkat_core::types::RenderSalience::Urgent,
3835 }),
3836 handling_mode: meerkat_core::types::HandlingMode::Steer,
3837 skill_references: None,
3838 flow_tool_overlay: None,
3839 pre_turn_context_appends: Vec::new(),
3840 typed_turn_appends: Vec::new(),
3841 turn_metadata: None,
3842 },
3843 };
3844
3845 let expected_prompt = req.prompt.clone();
3846 let expected_system_prompt = req.system_prompt.clone();
3847
3848 let normalized = normalize_runtime_turn_request(req);
3849
3850 assert_eq!(
3851 normalized.runtime.handling_mode,
3852 meerkat_core::types::HandlingMode::Queue,
3853 "runtime-applied turns must downgrade Steer before reaching direct session services"
3854 );
3855 assert!(
3856 normalized.runtime.render_metadata.is_none(),
3857 "runtime-owned render metadata must not be forwarded through the direct agent path"
3858 );
3859 assert_eq!(normalized.prompt, expected_prompt);
3860 assert_eq!(normalized.system_prompt, expected_system_prompt);
3861 }
3862
3863 #[test]
3865 fn session_created_context_fields() {
3866 let ctx = SessionCreatedContext {
3867 model: "claude-sonnet-4-5".to_string(),
3868 labels: std::collections::BTreeMap::from([(
3869 "agent_type".to_string(),
3870 "lead".to_string(),
3871 )]),
3872 system_prompt: Some("You are a lead agent.".to_string()),
3873 };
3874 assert_eq!(ctx.model, "claude-sonnet-4-5");
3875 assert_eq!(ctx.labels["agent_type"], "lead");
3876 assert_eq!(ctx.system_prompt.as_deref(), Some("You are a lead agent."));
3877 }
3878
3879 #[tokio::test]
3881 async fn session_hook_default_impls_are_noop() {
3882 struct EmptyHook;
3883 #[async_trait]
3884 impl SessionHook for EmptyHook {}
3885
3886 let hook = EmptyHook;
3887 let mut req = CreateSessionRequest {
3888 model: "test".to_string(),
3889 prompt: meerkat_core::ContentInput::Text("test".to_string()),
3890 render_metadata: None,
3891 system_prompt: None,
3892 max_tokens: None,
3893 event_tx: None,
3894 skill_references: None,
3895 initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
3896 build: None,
3897 labels: None,
3898 deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
3899 };
3900 hook.before_create(&mut req).await.unwrap();
3902 let ctx = SessionCreatedContext {
3904 model: "test".to_string(),
3905 labels: Default::default(),
3906 system_prompt: None,
3907 };
3908 hook.after_create(&meerkat_core::types::SessionId::new(), &ctx)
3909 .await;
3910 }
3911
3912 #[tokio::test]
3914 async fn session_hook_before_create_can_abort() {
3915 struct AbortHook;
3916 #[async_trait]
3917 impl SessionHook for AbortHook {
3918 async fn before_create(
3919 &self,
3920 _req: &mut CreateSessionRequest,
3921 ) -> Result<(), SessionError> {
3922 Err(SessionError::Unsupported("hook abort".into()))
3923 }
3924 }
3925
3926 let hook = AbortHook;
3927 let mut req = CreateSessionRequest {
3928 model: "test".to_string(),
3929 prompt: meerkat_core::ContentInput::Text("test".to_string()),
3930 render_metadata: None,
3931 system_prompt: None,
3932 max_tokens: None,
3933 event_tx: None,
3934 skill_references: None,
3935 initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
3936 build: None,
3937 labels: None,
3938 deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
3939 };
3940 let result = hook.before_create(&mut req).await;
3941 assert!(result.is_err());
3942 }
3943
3944 #[tokio::test]
3946 async fn session_hook_before_create_mutates_request() {
3947 struct MutatingHook;
3948 #[async_trait]
3949 impl SessionHook for MutatingHook {
3950 async fn before_create(
3951 &self,
3952 req: &mut CreateSessionRequest,
3953 ) -> Result<(), SessionError> {
3954 req.model = "hook-overridden".to_string();
3955 req.system_prompt = Some("injected by hook".to_string());
3956 Ok(())
3957 }
3958 }
3959
3960 let hook = MutatingHook;
3961 let mut req = CreateSessionRequest {
3962 model: "original".to_string(),
3963 prompt: meerkat_core::ContentInput::Text("test".to_string()),
3964 render_metadata: None,
3965 system_prompt: None,
3966 max_tokens: None,
3967 event_tx: None,
3968 skill_references: None,
3969 initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
3970 build: None,
3971 labels: None,
3972 deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
3973 };
3974 hook.before_create(&mut req).await.unwrap();
3975 assert_eq!(req.model, "hook-overridden");
3976 assert_eq!(req.system_prompt.as_deref(), Some("injected by hook"));
3977 }
3978}