1use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use meerkat::{AgentFactory, Config, FactoryAgentBuilder, SessionStore};
10use meerkat_client::types::LlmStream;
11use meerkat_client::{LlmClient, LlmRequest};
12use meerkat_core::agent::CommsRuntime;
13use meerkat_core::service::{
14 CreateSessionRequest, SessionError, SessionHistoryPage, SessionHistoryQuery,
15 SessionServiceHistoryExt,
16};
17use meerkat_core::{AgentSessionStore, AssistantBlock, Message, Provider};
18use meerkat_mob::{
19 MobBuilder, MobDefinition, MobError, MobHandle, MobSessionService, MobStorage, Profile,
20 ProfileName, SpawnMemberSpec,
21};
22use meerkat_runtime::input_state::StoredInputState;
23use meerkat_runtime::store::MachineLifecycleCommit;
24use meerkat_store::StoreAdapter;
25use serde_json::Value;
26
27use crate::blob_store::{
28 Base64BlobStoreAdapter, BinaryBlobStore, BinaryBlobStoreAdapter, ObjectStoreBlobStore,
29};
30
31pub(crate) const DELEGATE_IDLE_RETIRE_SECS_LABEL: &str = "implicit_delegate_idle_retire_secs";
32pub(crate) const DELEGATE_IDLE_RETIRE_DISABLED_LABEL: &str = "disabled";
33
34#[cfg(test)]
35use std::sync::{Mutex, OnceLock};
36
37pub const MEMBER_STATE_ACTIVE: &str = "active";
39pub const MEMBER_STATE_RETIRING: &str = "retiring";
41
42#[derive(Clone, Default)]
44pub struct MobBootstrapOptions {
45 pub allow_ephemeral_sessions: bool,
46 pub notify_orchestrator_on_resume: bool,
47 pub default_llm_client: Option<Arc<dyn LlmClient>>,
48}
49
50pub struct ReplaySanitizingLlmClient {
54 inner: Arc<dyn LlmClient>,
55}
56
57type SharedDefaultLlmClientSlot = Arc<std::sync::RwLock<Option<Arc<dyn LlmClient>>>>;
58
59impl ReplaySanitizingLlmClient {
60 pub fn new(inner: Arc<dyn LlmClient>) -> Self {
61 Self { inner }
62 }
63
64 pub fn wrap(inner: Arc<dyn LlmClient>) -> Arc<dyn LlmClient> {
65 Arc::new(Self::new(inner))
66 }
67}
68
69pub struct ReplaySanitizingAgentLlmClient {
77 inner: Arc<dyn meerkat_core::AgentLlmClient>,
78}
79
80impl ReplaySanitizingAgentLlmClient {
81 pub fn new(inner: Arc<dyn meerkat_core::AgentLlmClient>) -> Self {
82 Self { inner }
83 }
84
85 pub fn wrap(
86 inner: Arc<dyn meerkat_core::AgentLlmClient>,
87 ) -> Arc<dyn meerkat_core::AgentLlmClient> {
88 Arc::new(Self::new(inner))
89 }
90}
91
92#[async_trait]
93impl meerkat_core::AgentLlmClient for ReplaySanitizingAgentLlmClient {
94 async fn stream_response(
95 &self,
96 messages: &[Message],
97 tools: &[Arc<meerkat_core::ToolDef>],
98 max_tokens: u32,
99 temperature: Option<f32>,
100 provider_params: Option<&meerkat_core::lifecycle::run_primitive::ProviderParamsOverride>,
101 ) -> Result<meerkat_core::agent::LlmStreamResult, meerkat_core::AgentError> {
102 let sanitized: Vec<Message> = messages
103 .iter()
104 .cloned()
105 .map(sanitize_message_for_stateless_replay)
106 .collect();
107 self.inner
108 .stream_response(&sanitized, tools, max_tokens, temperature, provider_params)
109 .await
110 }
111
112 fn provider(&self) -> &'static str {
113 self.inner.provider()
114 }
115
116 fn model(&self) -> &str {
117 self.inner.model()
118 }
119
120 fn compile_schema(
121 &self,
122 output_schema: &meerkat_core::OutputSchema,
123 ) -> Result<meerkat_core::schema::CompiledSchema, meerkat_core::schema::SchemaError> {
124 self.inner.compile_schema(output_schema)
125 }
126}
127
128#[async_trait]
129impl LlmClient for ReplaySanitizingLlmClient {
130 fn project_replay_messages(
131 &self,
132 messages: &[Message],
133 ) -> Result<Vec<Message>, meerkat_client::LlmError> {
134 let sanitized: Vec<Message> = messages
135 .iter()
136 .cloned()
137 .map(sanitize_message_for_stateless_replay)
138 .collect();
139 self.inner.project_replay_messages(&sanitized)
140 }
141
142 fn stream<'a>(&'a self, request: &'a LlmRequest) -> LlmStream<'a> {
143 let inner = Arc::clone(&self.inner);
144 let sanitized = sanitize_llm_request_for_stateless_replay(request);
145 Box::pin(async_stream::stream! {
146 let mut stream = inner.stream(&sanitized);
147 while let Some(event) = stream.next().await {
148 if runtime_turn_diagnostics_enabled()
149 && let Err(error) = &event
150 {
151 tracing::error!(
152 error = %error,
153 error_debug = ?error,
154 "mobkit llm client stream error"
155 );
156 }
157 yield event;
158 }
159 })
160 }
161
162 fn provider(&self) -> &'static str {
163 self.inner.provider()
164 }
165
166 async fn health_check(&self) -> Result<(), meerkat_client::LlmError> {
167 self.inner.health_check().await
168 }
169
170 fn compile_schema(
171 &self,
172 output_schema: &meerkat_core::OutputSchema,
173 ) -> Result<meerkat_core::schema::CompiledSchema, meerkat_core::schema::SchemaError> {
174 self.inner.compile_schema(output_schema)
175 }
176}
177
178pub(crate) type PreBuildHook = Arc<
206 dyn Fn(
207 &mut CreateSessionRequest,
208 ) -> std::pin::Pin<
209 Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
210 > + Send
211 + Sync,
212>;
213
214pub type AfterCreateHook = Arc<
216 dyn Fn(
217 meerkat_core::types::SessionId,
218 SessionCreatedContext,
219 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
220 + Send
221 + Sync,
222>;
223
224struct PreBuildMobSessionService {
231 inner: Arc<dyn MobSessionService>,
232 hook: PreBuildHook,
233 after_create_hook: Option<AfterCreateHook>,
234 runtime_adapter_override: Option<Arc<meerkat_runtime::MeerkatMachine>>,
235}
236
237fn no_op_pre_build_hook() -> PreBuildHook {
238 Arc::new(|_req: &mut CreateSessionRequest| Box::pin(async { Ok(()) }))
239}
240
241#[derive(Debug, Clone, Copy, PartialEq, Eq)]
242pub(crate) enum DelegateIdleRetireOverride {
243 Disabled,
244 Seconds(u64),
245}
246
247#[derive(Clone, Default)]
248pub(crate) struct ImplicitDelegateRetirementOverrides {
249 inner: Arc<tokio::sync::RwLock<BTreeMap<(String, String), DelegateIdleRetireOverride>>>,
250}
251
252impl ImplicitDelegateRetirementOverrides {
253 pub(crate) async fn set(
254 &self,
255 mob_id: impl Into<String>,
256 member_id: impl Into<String>,
257 override_policy: DelegateIdleRetireOverride,
258 ) {
259 self.inner
260 .write()
261 .await
262 .insert((mob_id.into(), member_id.into()), override_policy);
263 }
264
265 pub(crate) async fn get(
266 &self,
267 mob_id: &str,
268 member_id: &str,
269 ) -> Option<DelegateIdleRetireOverride> {
270 self.inner
271 .read()
272 .await
273 .get(&(mob_id.to_string(), member_id.to_string()))
274 .copied()
275 }
276}
277
278struct AutoWireParentMobToolsFactory {
279 inner: Arc<dyn meerkat_core::service::MobToolsFactory>,
280 implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides,
281}
282
283#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
284#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
285impl meerkat_core::service::MobToolsFactory for AutoWireParentMobToolsFactory {
286 async fn build_mob_tools(
287 &self,
288 args: meerkat_core::service::MobToolsBuildArgs,
289 ) -> Result<Arc<dyn meerkat_core::AgentToolDispatcher>, Box<dyn std::error::Error + Send + Sync>>
290 {
291 let inner = self.inner.build_mob_tools(args).await?;
292 Ok(Arc::new(AutoWireParentMobToolDispatcher {
293 inner,
294 implicit_delegate_retirement_overrides: self
295 .implicit_delegate_retirement_overrides
296 .clone(),
297 }))
298 }
299}
300
301struct AutoWireParentMobToolDispatcher {
302 inner: Arc<dyn meerkat_core::AgentToolDispatcher>,
303 implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides,
304}
305
306#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
307#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
308impl meerkat_core::AgentToolDispatcher for AutoWireParentMobToolDispatcher {
309 fn tools(&self) -> Arc<[Arc<meerkat_core::types::ToolDef>]> {
310 self.inner
311 .tools()
312 .iter()
313 .map(|tool| {
314 if tool.name == "delegate" {
315 Arc::new(delegate_tool_def_with_idle_retire_secs(tool))
316 } else if tool.name == "mob_spawn_member" {
317 Arc::new(mob_spawn_tool_def_with_idle_retire_secs(tool))
318 } else {
319 Arc::clone(tool)
320 }
321 })
322 .collect::<Vec<_>>()
323 .into()
324 }
325
326 async fn dispatch(
327 &self,
328 call: meerkat_core::types::ToolCallView<'_>,
329 ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
330 if call.name == "delegate" {
331 return self.dispatch_delegate(call).await;
332 }
333 if call.name != "mob_spawn_member" {
334 return self.inner.dispatch(call).await;
335 }
336 self.dispatch_mob_spawn_member(call).await
337 }
338
339 fn capabilities(&self) -> meerkat_core::agent::DispatcherCapabilities {
340 self.inner.capabilities()
341 }
342
343 fn bind_ops_lifecycle(
344 self: Arc<Self>,
345 registry: Arc<dyn meerkat_core::ops_lifecycle::OpsLifecycleRegistry>,
346 owner_bridge_session_id: meerkat_core::types::SessionId,
347 ) -> Result<meerkat_core::agent::BindOutcome, meerkat_core::agent::OpsLifecycleBindError> {
348 let owned = Arc::try_unwrap(self)
349 .map_err(|_| meerkat_core::agent::OpsLifecycleBindError::SharedOwnership)?;
350 let outcome = owned
351 .inner
352 .bind_ops_lifecycle(registry, owner_bridge_session_id)?;
353 let was_bound = outcome.was_bound();
354 let dispatcher = Arc::new(Self {
355 inner: outcome.into_dispatcher(),
356 implicit_delegate_retirement_overrides: owned.implicit_delegate_retirement_overrides,
357 });
358 Ok(if was_bound {
359 meerkat_core::agent::BindOutcome::Bound(dispatcher)
360 } else {
361 meerkat_core::agent::BindOutcome::Skipped(dispatcher)
362 })
363 }
364}
365
366impl AutoWireParentMobToolDispatcher {
367 async fn dispatch_mob_spawn_member(
368 &self,
369 call: meerkat_core::types::ToolCallView<'_>,
370 ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
371 let mut args = serde_json::from_str::<Value>(call.args.get()).map_err(|error| {
372 meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
373 })?;
374 let idle_retire_override = delegate_idle_retire_override_from_args(call.name, &mut args)?;
375 let idle_retire_targets = idle_retire_targets_from_spawn_args(&args);
376 if let Some(object) = args.as_object_mut() {
377 object
378 .entry("auto_wire_parent".to_string())
379 .or_insert(Value::Bool(true));
380 }
381 let args = serde_json::value::RawValue::from_string(args.to_string()).map_err(|error| {
382 meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
383 })?;
384 let call = meerkat_core::types::ToolCallView {
385 id: call.id,
386 name: call.name,
387 args: &args,
388 };
389 let outcome = self.inner.dispatch(call).await?;
390 self.register_idle_retire_override_from_outcome(
391 &outcome,
392 idle_retire_override,
393 &idle_retire_targets,
394 )
395 .await;
396
397 Ok(outcome)
398 }
399
400 async fn dispatch_delegate(
401 &self,
402 call: meerkat_core::types::ToolCallView<'_>,
403 ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
404 let mut args = serde_json::from_str::<Value>(call.args.get()).map_err(|error| {
405 meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
406 })?;
407 let idle_retire_override = delegate_idle_retire_override_from_args(call.name, &mut args)?;
408 let args = serde_json::value::RawValue::from_string(args.to_string()).map_err(|error| {
409 meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
410 })?;
411 let call = meerkat_core::types::ToolCallView {
412 id: call.id,
413 name: call.name,
414 args: &args,
415 };
416 let outcome = self.inner.dispatch(call).await?;
417
418 self.register_idle_retire_override_from_outcome(&outcome, idle_retire_override, &[])
419 .await;
420
421 Ok(outcome)
422 }
423
424 async fn register_idle_retire_override_from_outcome(
425 &self,
426 outcome: &meerkat_core::ToolDispatchOutcome,
427 idle_retire_override: Option<DelegateIdleRetireOverride>,
428 fallback_targets: &[IdleRetireTarget],
429 ) {
430 if outcome.result.is_error {
431 return;
432 }
433 let Some(override_policy) = idle_retire_override else {
434 return;
435 };
436 for target in
437 idle_retire_targets_from_outcome_text(&outcome.result.text_content(), fallback_targets)
438 {
439 self.implicit_delegate_retirement_overrides
440 .set(&target.mob_id, &target.member_id, override_policy)
441 .await;
442 }
443 }
444}
445
446#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
447struct IdleRetireTarget {
448 mob_id: String,
449 member_id: String,
450}
451
452fn text_field<'a>(value: &'a Value, key: &str) -> Option<&'a str> {
453 value
454 .get(key)
455 .and_then(Value::as_str)
456 .filter(|text| !text.is_empty())
457}
458
459fn member_identity_field(value: &Value) -> Option<&str> {
460 text_field(value, "agent_identity")
461 .or_else(|| text_field(value, "member_id"))
462 .or_else(|| text_field(value, "identity"))
463}
464
465fn target_from_value(value: &Value, default_mob_id: Option<&str>) -> Option<IdleRetireTarget> {
466 let mob_id = text_field(value, "mob_id").or(default_mob_id)?;
467 let member_id = member_identity_field(value)?;
468 Some(IdleRetireTarget {
469 mob_id: mob_id.to_string(),
470 member_id: member_id.to_string(),
471 })
472}
473
474fn idle_retire_targets_from_spawn_args(args: &Value) -> Vec<IdleRetireTarget> {
475 let default_mob_id = text_field(args, "mob_id");
476 let mut targets = BTreeSet::new();
477 if let Some(target) = target_from_value(args, default_mob_id) {
478 targets.insert(target);
479 }
480 for key in ["specs", "members"] {
481 let Some(values) = args.get(key).and_then(Value::as_array) else {
482 continue;
483 };
484 for value in values {
485 if let Some(target) = target_from_value(value, default_mob_id) {
486 targets.insert(target);
487 }
488 }
489 }
490 targets.into_iter().collect()
491}
492
493fn target_from_result_value(
494 value: &Value,
495 fallback_targets: &[IdleRetireTarget],
496) -> Option<IdleRetireTarget> {
497 if let Some(target) = target_from_value(value, None) {
498 return Some(target);
499 }
500 let member_id = member_identity_field(value)?;
501 let mut matches = fallback_targets
502 .iter()
503 .filter(|target| target.member_id == member_id);
504 let target = matches.next()?;
505 if matches.next().is_some() {
506 return None;
507 }
508 Some(target.clone())
509}
510
511fn collect_idle_retire_result_targets(
512 value: &Value,
513 fallback_targets: &[IdleRetireTarget],
514 targets: &mut BTreeSet<IdleRetireTarget>,
515) {
516 if let Some(target) = target_from_result_value(value, fallback_targets) {
517 targets.insert(target);
518 }
519 for key in ["members", "specs", "spawned", "results"] {
520 let Some(values) = value.get(key).and_then(Value::as_array) else {
521 continue;
522 };
523 for value in values {
524 collect_idle_retire_result_targets(value, fallback_targets, targets);
525 }
526 }
527}
528
529fn idle_retire_targets_from_outcome_text(
530 text: &str,
531 fallback_targets: &[IdleRetireTarget],
532) -> Vec<IdleRetireTarget> {
533 let Ok(payload) = serde_json::from_str::<Value>(text) else {
534 return fallback_targets.to_vec();
535 };
536 let mut targets = BTreeSet::new();
537 collect_idle_retire_result_targets(&payload, fallback_targets, &mut targets);
538 if targets.is_empty() {
539 targets.extend(fallback_targets.iter().cloned());
540 }
541 targets.into_iter().collect()
542}
543
544struct DefinitionSeededRealmProfileStore {
545 inner: Arc<dyn meerkat_mob::RealmProfileStore>,
546 profiles: BTreeMap<String, Profile>,
547}
548
549impl DefinitionSeededRealmProfileStore {
550 fn new(
551 definition: &MobDefinition,
552 inner: Arc<dyn meerkat_mob::RealmProfileStore>,
553 ) -> Option<Self> {
554 let profiles = definition
555 .profiles
556 .iter()
557 .filter_map(|(name, binding)| {
558 binding
559 .as_inline()
560 .cloned()
561 .map(|profile| (name.to_string(), profile))
562 })
563 .collect::<BTreeMap<_, _>>();
564
565 (!profiles.is_empty()).then_some(Self { inner, profiles })
566 }
567
568 fn stored(&self, name: &str, profile: &Profile) -> meerkat_mob::StoredRealmProfile {
569 let now = chrono::Utc::now();
570 meerkat_mob::StoredRealmProfile {
571 name: name.to_string(),
572 profile: profile.clone(),
573 revision: 0,
574 created_at: now,
575 updated_at: now,
576 }
577 }
578}
579
580#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
581#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
582impl meerkat_mob::RealmProfileStore for DefinitionSeededRealmProfileStore {
583 async fn create(
584 &self,
585 name: &str,
586 profile: &Profile,
587 ) -> Result<meerkat_mob::StoredRealmProfile, meerkat_mob::MobStoreError> {
588 if self.profiles.contains_key(name) {
589 return Err(meerkat_mob::MobStoreError::CasConflict(format!(
590 "realm profile already exists: {name}"
591 )));
592 }
593 self.inner.create(name, profile).await
594 }
595
596 async fn get(
597 &self,
598 name: &str,
599 ) -> Result<Option<meerkat_mob::StoredRealmProfile>, meerkat_mob::MobStoreError> {
600 if let Some(profile) = self.profiles.get(name) {
601 return Ok(Some(self.stored(name, profile)));
602 }
603 self.inner.get(name).await
604 }
605
606 async fn list(
607 &self,
608 ) -> Result<Vec<meerkat_mob::StoredRealmProfile>, meerkat_mob::MobStoreError> {
609 let mut merged = self.inner.list().await?;
610 merged.retain(|profile| !self.profiles.contains_key(profile.name.as_str()));
611 merged.extend(
612 self.profiles
613 .iter()
614 .map(|(name, profile)| self.stored(name, profile)),
615 );
616 merged.sort_by(|a, b| a.name.cmp(&b.name));
617 Ok(merged)
618 }
619
620 async fn update(
621 &self,
622 name: &str,
623 profile: &Profile,
624 expected_revision: u64,
625 ) -> Result<meerkat_mob::StoredRealmProfile, meerkat_mob::MobStoreError> {
626 if self.profiles.contains_key(name) {
627 return Err(meerkat_mob::MobStoreError::CasConflict(format!(
628 "realm profile '{name}' is provided by the mob definition"
629 )));
630 }
631 self.inner.update(name, profile, expected_revision).await
632 }
633
634 async fn delete(
635 &self,
636 name: &str,
637 expected_revision: u64,
638 ) -> Result<meerkat_mob::StoredRealmProfile, meerkat_mob::MobStoreError> {
639 if self.profiles.contains_key(name) {
640 return Err(meerkat_mob::MobStoreError::CasConflict(format!(
641 "realm profile '{name}' is provided by the mob definition"
642 )));
643 }
644 self.inner.delete(name, expected_revision).await
645 }
646}
647
648fn delegate_idle_retire_override_from_args(
649 tool_name: &str,
650 args: &mut Value,
651) -> Result<Option<DelegateIdleRetireOverride>, meerkat_core::ToolError> {
652 let Some(object) = args.as_object_mut() else {
653 return Ok(None);
654 };
655 let Some(value) = object.remove("idle_retire_secs") else {
656 return Ok(None);
657 };
658 if value.is_null() {
659 return Ok(Some(DelegateIdleRetireOverride::Disabled));
660 }
661 value
662 .as_u64()
663 .map(DelegateIdleRetireOverride::Seconds)
664 .map(Some)
665 .ok_or_else(|| {
666 meerkat_core::ToolError::invalid_arguments(
667 tool_name,
668 "idle_retire_secs must be a non-negative integer or null",
669 )
670 })
671}
672
673fn delegate_tool_def_with_idle_retire_secs(
674 tool: &meerkat_core::types::ToolDef,
675) -> meerkat_core::types::ToolDef {
676 let mut patched = tool.clone();
677 if !patched.description.contains("IDLE RETIREMENT:") {
678 patched.description.push_str(
679 "\n\nIDLE RETIREMENT:\n\
680 Omit idle_retire_secs to use the runtime default. Pass an integer \
681 number of seconds to override idle auto-retirement for this helper. \
682 Pass null to disable auto-retirement for this helper.",
683 );
684 }
685 if let Some(properties) = patched
686 .input_schema
687 .get_mut("properties")
688 .and_then(Value::as_object_mut)
689 {
690 properties
691 .entry("idle_retire_secs".to_string())
692 .or_insert_with(|| {
693 serde_json::json!({
694 "description": "Override idle auto-retirement for this helper. Omit to use the runtime default, use an integer number of seconds to override, or null to disable auto-retirement for this helper.",
695 "anyOf": [
696 {"type": "integer", "minimum": 0},
697 {"type": "null"}
698 ]
699 })
700 });
701 }
702 patched
703}
704
705fn mob_spawn_tool_def_with_idle_retire_secs(
706 tool: &meerkat_core::types::ToolDef,
707) -> meerkat_core::types::ToolDef {
708 let mut patched = tool.clone();
709 if !patched.description.contains("IDLE RETIREMENT:") {
710 patched.description.push_str(
711 "\n\nIDLE RETIREMENT:\n\
712 Omit idle_retire_secs to leave this spawned member out of auto-retirement. \
713 Pass an integer number of seconds to retire the member after it has been \
714 idle for that long. Pass null to explicitly disable auto-retirement.",
715 );
716 }
717 if let Some(properties) = patched
718 .input_schema
719 .get_mut("properties")
720 .and_then(Value::as_object_mut)
721 {
722 properties
723 .entry("idle_retire_secs".to_string())
724 .or_insert_with(|| {
725 serde_json::json!({
726 "description": "Opt this spawned member into idle auto-retirement. Omit to keep the member indefinitely, use an integer number of seconds to retire after that much idle time, or null to explicitly disable auto-retirement.",
727 "anyOf": [
728 {"type": "integer", "minimum": 0},
729 {"type": "null"}
730 ]
731 })
732 });
733 }
734 patched
735}
736
737fn install_agent_mob_tools(
738 definition: &MobDefinition,
739 slot: Arc<std::sync::RwLock<Option<Arc<dyn meerkat_core::service::MobToolsFactory>>>>,
740 session_service: Arc<dyn MobSessionService>,
741) -> (
742 Arc<meerkat_mob_mcp::MobMcpState>,
743 ImplicitDelegateRetirementOverrides,
744 SharedDefaultLlmClientSlot,
745) {
746 let default_llm_client_slot = Arc::new(std::sync::RwLock::new(None::<Arc<dyn LlmClient>>));
747 let default_llm_client_provider_slot = Arc::clone(&default_llm_client_slot);
748 let mut state = meerkat_mob_mcp::MobMcpState::new(session_service);
749 if let Some(base_store) = state.realm_profile_store().cloned()
750 && let Some(store) = DefinitionSeededRealmProfileStore::new(definition, base_store)
751 {
752 state = state.with_realm_profile_store(Some(Arc::new(store)));
753 }
754 state = state
755 .with_realm_skill_sources(definition.skills.clone())
756 .with_default_llm_client_provider(Some(Arc::new(move || {
757 default_llm_client_provider_slot
758 .read()
759 .unwrap_or_else(std::sync::PoisonError::into_inner)
760 .clone()
761 })));
762 let state = Arc::new(state);
763 let implicit_delegate_retirement_overrides = ImplicitDelegateRetirementOverrides::default();
764 let inner = Arc::new(meerkat_mob_mcp::AgentMobToolSurfaceFactory::new(
765 Arc::clone(&state),
766 ));
767 let factory = Arc::new(AutoWireParentMobToolsFactory {
768 inner,
769 implicit_delegate_retirement_overrides: implicit_delegate_retirement_overrides.clone(),
770 });
771 *slot
772 .write()
773 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(factory);
774 (
775 state,
776 implicit_delegate_retirement_overrides,
777 default_llm_client_slot,
778 )
779}
780
781#[cfg(test)]
782#[allow(dead_code)]
783#[derive(Debug, Clone)]
784pub(crate) struct RuntimeTurnTrace {
785 pub(crate) session_id: String,
786 pub(crate) boundary: String,
787 pub(crate) contributing_input_count: usize,
788 pub(crate) outcome: String,
789}
790
791fn is_replay_unsafe_server_tool_content(name: &str, content: &Value) -> bool {
792 name == "web_search_annotations"
793 || content
794 .get("type")
795 .and_then(Value::as_str)
796 .is_some_and(|kind| kind.starts_with("response."))
797}
798
799fn sanitize_llm_request_for_stateless_replay(request: &LlmRequest) -> LlmRequest {
800 let mut sanitized = request.clone();
801 sanitized.messages = request
802 .messages
803 .iter()
804 .cloned()
805 .map(sanitize_message_for_stateless_replay)
806 .collect();
807 sanitized
808}
809
810fn sanitize_create_session_request_llm_override(req: &mut CreateSessionRequest) {
811 let Some(build) = req.build.as_mut() else {
812 return;
813 };
814 let Some(client) = build
815 .llm_client_override
816 .as_ref()
817 .and_then(meerkat::decode_llm_client_override_from_service)
818 else {
819 return;
820 };
821 build.llm_client_override = Some(meerkat::encode_llm_client_override_for_service(
822 ReplaySanitizingLlmClient::wrap(client),
823 ));
824}
825
826fn sanitize_message_for_stateless_replay(message: Message) -> Message {
827 match message {
828 Message::BlockAssistant(mut assistant) => {
829 assistant.blocks = assistant
830 .blocks
831 .into_iter()
832 .filter_map(|block| match block {
833 AssistantBlock::ServerToolContent { name, content, .. }
834 if is_replay_unsafe_server_tool_content(&name, &content) =>
835 {
836 None
837 }
838 other => Some(other),
839 })
840 .collect();
841 Message::BlockAssistant(assistant)
842 }
843 other => other,
844 }
845}
846
847fn build_persistent_runtime_store(store_path: &Path) -> Arc<dyn meerkat_runtime::RuntimeStore> {
857 let runtime_db = store_path.join("runtime.sqlite");
858 match meerkat_runtime::store::SqliteRuntimeStore::new(&runtime_db) {
859 Ok(store) => Arc::new(store),
860 Err(err) => {
861 tracing::warn!(
862 path = %runtime_db.display(),
863 error = %err,
864 "failed to open SqliteRuntimeStore; falling back to InMemoryRuntimeStore. \
865 Sessions will not survive process restart and archive operations may fail.",
866 );
867 Arc::new(meerkat_runtime::InMemoryRuntimeStore::new())
868 }
869 }
870}
871
872struct SessionStoreBackedRuntimeStore {
880 inner: Arc<dyn meerkat_runtime::RuntimeStore>,
881}
882
883impl SessionStoreBackedRuntimeStore {
884 fn new(
885 inner: Arc<dyn meerkat_runtime::RuntimeStore>,
886 _session_store: Arc<dyn SessionStore>,
887 ) -> Self {
888 Self { inner }
889 }
890}
891
892#[async_trait]
893impl meerkat_runtime::RuntimeStore for SessionStoreBackedRuntimeStore {
894 fn auth_authority_key(&self) -> Option<String> {
895 self.inner.auth_authority_key()
896 }
897
898 fn persist_auth_oauth_flow_snapshot(
899 &self,
900 snapshot_json: &[u8],
901 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
902 self.inner.persist_auth_oauth_flow_snapshot(snapshot_json)
903 }
904
905 fn load_auth_oauth_flow_snapshot(
906 &self,
907 ) -> Result<Option<Vec<u8>>, meerkat_runtime::store::RuntimeStoreError> {
908 self.inner.load_auth_oauth_flow_snapshot()
909 }
910
911 fn update_auth_oauth_flow_snapshot(
912 &self,
913 update: &mut meerkat_runtime::store::AuthOAuthFlowSnapshotUpdate<'_>,
914 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
915 self.inner.update_auth_oauth_flow_snapshot(update)
916 }
917
918 async fn commit_session_snapshot(
919 &self,
920 runtime_id: &meerkat_runtime::LogicalRuntimeId,
921 session_delta: meerkat_runtime::store::SessionDelta,
922 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
923 self.inner
924 .commit_session_snapshot(runtime_id, session_delta)
925 .await
926 }
927
928 async fn commit_session_transcript_rewrite_snapshot(
929 &self,
930 runtime_id: &meerkat_runtime::LogicalRuntimeId,
931 session_delta: meerkat_runtime::store::SessionDelta,
932 commit: &meerkat_core::TranscriptRewriteCommit,
933 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
934 self.inner
935 .commit_session_transcript_rewrite_snapshot(runtime_id, session_delta, commit)
936 .await
937 }
938
939 async fn atomic_apply(
940 &self,
941 runtime_id: &meerkat_runtime::LogicalRuntimeId,
942 session_delta: Option<meerkat_runtime::store::SessionDelta>,
943 receipt: meerkat_core::lifecycle::RunBoundaryReceipt,
944 input_updates: Vec<StoredInputState>,
945 session_store_key: Option<meerkat_core::types::SessionId>,
946 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
947 self.inner
948 .atomic_apply(
949 runtime_id,
950 session_delta,
951 receipt,
952 input_updates,
953 session_store_key,
954 )
955 .await
956 }
957
958 async fn load_input_states(
959 &self,
960 runtime_id: &meerkat_runtime::LogicalRuntimeId,
961 ) -> Result<Vec<StoredInputState>, meerkat_runtime::store::RuntimeStoreError> {
962 self.inner.load_input_states(runtime_id).await
963 }
964
965 async fn load_boundary_receipt(
966 &self,
967 runtime_id: &meerkat_runtime::LogicalRuntimeId,
968 run_id: &meerkat_core::lifecycle::RunId,
969 sequence: u64,
970 ) -> Result<
971 Option<meerkat_core::lifecycle::RunBoundaryReceipt>,
972 meerkat_runtime::store::RuntimeStoreError,
973 > {
974 self.inner
975 .load_boundary_receipt(runtime_id, run_id, sequence)
976 .await
977 }
978
979 async fn load_session_snapshot(
980 &self,
981 runtime_id: &meerkat_runtime::LogicalRuntimeId,
982 ) -> Result<Option<Vec<u8>>, meerkat_runtime::store::RuntimeStoreError> {
983 self.inner.load_session_snapshot(runtime_id).await
984 }
985
986 async fn clear_session_snapshot(
987 &self,
988 runtime_id: &meerkat_runtime::LogicalRuntimeId,
989 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
990 self.inner.clear_session_snapshot(runtime_id).await
991 }
992
993 async fn replace_session_snapshot_if_current(
994 &self,
995 runtime_id: &meerkat_runtime::LogicalRuntimeId,
996 expected_current: &[u8],
997 replacement: Vec<u8>,
998 ) -> Result<bool, meerkat_runtime::store::RuntimeStoreError> {
999 self.inner
1000 .replace_session_snapshot_if_current(runtime_id, expected_current, replacement)
1001 .await
1002 }
1003
1004 async fn clear_session_snapshot_if_current(
1005 &self,
1006 runtime_id: &meerkat_runtime::LogicalRuntimeId,
1007 expected_current: &[u8],
1008 ) -> Result<bool, meerkat_runtime::store::RuntimeStoreError> {
1009 self.inner
1010 .clear_session_snapshot_if_current(runtime_id, expected_current)
1011 .await
1012 }
1013
1014 async fn persist_input_state(
1015 &self,
1016 runtime_id: &meerkat_runtime::LogicalRuntimeId,
1017 state: &StoredInputState,
1018 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
1019 self.inner.persist_input_state(runtime_id, state).await
1020 }
1021
1022 async fn load_input_state(
1023 &self,
1024 runtime_id: &meerkat_runtime::LogicalRuntimeId,
1025 input_id: &meerkat_core::lifecycle::InputId,
1026 ) -> Result<Option<StoredInputState>, meerkat_runtime::store::RuntimeStoreError> {
1027 self.inner.load_input_state(runtime_id, input_id).await
1028 }
1029
1030 async fn load_runtime_state(
1031 &self,
1032 runtime_id: &meerkat_runtime::LogicalRuntimeId,
1033 ) -> Result<Option<meerkat_runtime::RuntimeState>, meerkat_runtime::store::RuntimeStoreError>
1034 {
1035 self.inner.load_runtime_state(runtime_id).await
1036 }
1037
1038 async fn commit_machine_lifecycle(
1039 &self,
1040 runtime_id: &meerkat_runtime::LogicalRuntimeId,
1041 commit: MachineLifecycleCommit,
1042 input_states: &[StoredInputState],
1043 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
1044 self.inner
1045 .commit_machine_lifecycle(runtime_id, commit, input_states)
1046 .await
1047 }
1048
1049 async fn persist_ops_lifecycle(
1050 &self,
1051 runtime_id: &meerkat_runtime::LogicalRuntimeId,
1052 snapshot: &meerkat_runtime::ops_lifecycle::PersistedOpsSnapshot,
1053 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
1054 self.inner.persist_ops_lifecycle(runtime_id, snapshot).await
1055 }
1056
1057 async fn load_ops_lifecycle(
1058 &self,
1059 runtime_id: &meerkat_runtime::LogicalRuntimeId,
1060 ) -> Result<
1061 Option<meerkat_runtime::ops_lifecycle::PersistedOpsSnapshot>,
1062 meerkat_runtime::store::RuntimeStoreError,
1063 > {
1064 self.inner.load_ops_lifecycle(runtime_id).await
1065 }
1066}
1067
1068#[cfg(test)]
1069static RUNTIME_TURN_TRACES: OnceLock<Mutex<Vec<RuntimeTurnTrace>>> = OnceLock::new();
1070
1071#[cfg(test)]
1072fn runtime_turn_traces() -> &'static Mutex<Vec<RuntimeTurnTrace>> {
1073 RUNTIME_TURN_TRACES.get_or_init(|| Mutex::new(Vec::new()))
1074}
1075
1076#[cfg(test)]
1077#[allow(clippy::expect_used)]
1078fn record_runtime_turn_trace(trace: RuntimeTurnTrace) {
1079 runtime_turn_traces()
1080 .lock()
1081 .expect("runtime turn traces mutex")
1082 .push(trace);
1083}
1084
1085#[cfg(test)]
1086#[allow(dead_code)]
1087#[allow(clippy::expect_used)]
1088pub(crate) fn take_runtime_turn_traces() -> Vec<RuntimeTurnTrace> {
1089 std::mem::take(
1090 &mut *runtime_turn_traces()
1091 .lock()
1092 .expect("runtime turn traces mutex"),
1093 )
1094}
1095
1096#[cfg(not(test))]
1097#[allow(dead_code)]
1098fn record_runtime_turn_trace(_trace: ()) {}
1099
1100fn runtime_turn_diagnostics_enabled() -> bool {
1101 std::env::var_os("MOBKIT_TRACE_RUNTIME_TURNS").is_some()
1102}
1103
1104fn summarize_runtime_prompt(prompt: &meerkat_core::ContentInput) -> String {
1105 match prompt {
1106 meerkat_core::ContentInput::Text(text) => {
1107 text.lines().take(6).collect::<Vec<_>>().join(" ")
1108 }
1109 meerkat_core::ContentInput::Blocks(blocks) => blocks
1110 .iter()
1111 .map(|block| block.text_projection().to_string())
1112 .collect::<Vec<_>>()
1113 .join(" ")
1114 .lines()
1115 .take(6)
1116 .collect::<Vec<_>>()
1117 .join(" "),
1118 }
1119}
1120
1121pub fn mob_definition_may_use_image_generation(definition: &MobDefinition) -> bool {
1127 definition.profiles.values().any(|binding| {
1128 binding
1129 .as_inline()
1130 .is_none_or(|profile| profile.tools.image_generation)
1131 })
1132}
1133
1134fn normalize_runtime_turn_request(
1135 mut req: meerkat_core::service::StartTurnRequest,
1136) -> meerkat_core::service::StartTurnRequest {
1137 req.runtime.handling_mode = meerkat_core::types::HandlingMode::Queue;
1143 req.runtime.render_metadata = None;
1144 req
1145}
1146
1147macro_rules! delegate_mob_session_service {
1150 ($wrapper:ty) => {
1151 #[async_trait]
1152 impl meerkat_core::service::SessionService for $wrapper {
1153 async fn create_session(
1154 &self,
1155 mut req: CreateSessionRequest,
1156 ) -> Result<meerkat_core::types::RunResult, SessionError> {
1157 (self.hook)(&mut req).await?;
1158 sanitize_create_session_request_llm_override(&mut req);
1159
1160 let ctx = SessionCreatedContext {
1162 model: req.model.clone(),
1163 labels: req.labels.clone().unwrap_or_default(),
1164 system_prompt: req.system_prompt.clone(),
1165 };
1166 let result = self.inner.create_session(req).await?;
1167
1168 if let Some(ref after_hook) = self.after_create_hook {
1170 after_hook(result.session_id.clone(), ctx).await;
1171 }
1172
1173 Ok(result)
1174 }
1175 async fn start_turn(
1176 &self,
1177 id: &meerkat_core::types::SessionId,
1178 req: meerkat_core::service::StartTurnRequest,
1179 ) -> Result<meerkat_core::types::RunResult, SessionError> {
1180 self.inner.start_turn(id, req).await
1181 }
1182 async fn interrupt(
1183 &self,
1184 id: &meerkat_core::types::SessionId,
1185 ) -> Result<(), SessionError> {
1186 self.inner.interrupt(id).await
1187 }
1188 async fn cancel_after_boundary(
1189 &self,
1190 id: &meerkat_core::types::SessionId,
1191 ) -> Result<(), SessionError> {
1192 self.inner.cancel_after_boundary(id).await
1193 }
1194 async fn set_session_client(
1195 &self,
1196 id: &meerkat_core::types::SessionId,
1197 client: Arc<dyn meerkat_core::AgentLlmClient>,
1198 ) -> Result<(), SessionError> {
1199 self.inner
1200 .set_session_client(id, ReplaySanitizingAgentLlmClient::wrap(client))
1201 .await
1202 }
1203 async fn hot_swap_session_llm_identity(
1204 &self,
1205 id: &meerkat_core::types::SessionId,
1206 client: Arc<dyn meerkat_core::AgentLlmClient>,
1207 identity: meerkat_core::session::SessionLlmIdentity,
1208 request_policy: meerkat_core::SessionLlmRequestPolicy,
1209 ) -> Result<(), SessionError> {
1210 self.inner
1211 .hot_swap_session_llm_identity(
1212 id,
1213 ReplaySanitizingAgentLlmClient::wrap(client),
1214 identity,
1215 request_policy,
1216 )
1217 .await
1218 }
1219 async fn update_session_keep_alive(
1220 &self,
1221 id: &meerkat_core::types::SessionId,
1222 keep_alive: bool,
1223 ) -> Result<(), SessionError> {
1224 self.inner.update_session_keep_alive(id, keep_alive).await
1225 }
1226 async fn update_session_mob_authority_context(
1227 &self,
1228 id: &meerkat_core::types::SessionId,
1229 authority_context: Option<meerkat_core::service::MobToolAuthorityContext>,
1230 ) -> Result<(), SessionError> {
1231 self.inner
1232 .update_session_mob_authority_context(id, authority_context)
1233 .await
1234 }
1235 async fn has_live_session(
1236 &self,
1237 id: &meerkat_core::types::SessionId,
1238 ) -> Result<bool, SessionError> {
1239 self.inner.has_live_session(id).await
1240 }
1241 async fn set_session_tool_visibility_state(
1242 &self,
1243 id: &meerkat_core::types::SessionId,
1244 state: Option<meerkat_core::SessionToolVisibilityState>,
1245 ) -> Result<(), SessionError> {
1246 self.inner
1247 .set_session_tool_visibility_state(id, state)
1248 .await
1249 }
1250 async fn set_session_tool_filter(
1251 &self,
1252 id: &meerkat_core::types::SessionId,
1253 filter: meerkat_core::ToolFilter,
1254 ) -> Result<(), SessionError> {
1255 self.inner.set_session_tool_filter(id, filter).await
1256 }
1257 async fn read(
1258 &self,
1259 id: &meerkat_core::types::SessionId,
1260 ) -> Result<meerkat_core::service::SessionView, SessionError> {
1261 self.inner.read(id).await
1262 }
1263 async fn list(
1264 &self,
1265 query: meerkat_core::service::SessionQuery,
1266 ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
1267 self.inner.list(query).await
1268 }
1269 async fn archive(
1270 &self,
1271 id: &meerkat_core::types::SessionId,
1272 ) -> Result<(), SessionError> {
1273 self.inner.archive(id).await
1274 }
1275 async fn subscribe_session_events(
1276 &self,
1277 id: &meerkat_core::types::SessionId,
1278 ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1279 meerkat_core::service::SessionService::subscribe_session_events(
1280 self.inner.as_ref(),
1281 id,
1282 )
1283 .await
1284 }
1285 }
1286
1287 #[async_trait]
1288 impl meerkat_core::service::SessionServiceCommsExt for $wrapper {
1289 async fn comms_runtime(
1290 &self,
1291 id: &meerkat_core::types::SessionId,
1292 ) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
1293 self.inner.comms_runtime(id).await
1294 }
1295
1296 async fn event_injector(
1297 &self,
1298 id: &meerkat_core::types::SessionId,
1299 ) -> Option<Arc<dyn meerkat_core::EventInjector>> {
1300 self.inner.event_injector(id).await
1301 }
1302
1303 async fn interaction_event_injector(
1304 &self,
1305 id: &meerkat_core::types::SessionId,
1306 ) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
1307 self.inner.interaction_event_injector(id).await
1308 }
1309 }
1310
1311 #[async_trait]
1312 impl meerkat_core::service::SessionServiceControlExt for $wrapper {
1313 async fn append_system_context(
1314 &self,
1315 id: &meerkat_core::types::SessionId,
1316 req: meerkat_core::service::AppendSystemContextRequest,
1317 ) -> Result<
1318 meerkat_core::service::AppendSystemContextResult,
1319 meerkat_core::service::SessionControlError,
1320 > {
1321 self.inner.append_system_context(id, req).await
1322 }
1323 async fn stage_tool_results(
1324 &self,
1325 id: &meerkat_core::types::SessionId,
1326 req: meerkat_core::service::StageToolResultsRequest,
1327 ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
1328 self.inner.stage_tool_results(id, req).await
1329 }
1330 }
1331
1332 #[async_trait]
1333 impl meerkat_core::service::SessionServiceHistoryExt for $wrapper {
1334 async fn read_history(
1335 &self,
1336 id: &meerkat_core::types::SessionId,
1337 query: meerkat_core::service::SessionHistoryQuery,
1338 ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
1339 self.inner.read_history(id, query).await
1340 }
1341 }
1342
1343 #[async_trait]
1344 impl MobSessionService for $wrapper {
1345 fn supports_persistent_sessions(&self) -> bool {
1346 self.inner.supports_persistent_sessions()
1347 }
1348 fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
1349 self.runtime_adapter_override
1350 .clone()
1351 .or_else(|| self.inner.runtime_adapter())
1352 }
1353 async fn interrupt_with_machine_authority(
1354 &self,
1355 session_id: &meerkat_core::types::SessionId,
1356 authority: meerkat_runtime::MachineSessionControlAuthority,
1357 ) -> Result<(), SessionError> {
1358 self.inner
1359 .interrupt_with_machine_authority(session_id, authority)
1360 .await
1361 }
1362 async fn cancel_after_boundary_with_machine_authority(
1363 &self,
1364 session_id: &meerkat_core::types::SessionId,
1365 authority: meerkat_runtime::MachineSessionControlAuthority,
1366 ) -> Result<(), SessionError> {
1367 self.inner
1368 .cancel_after_boundary_with_machine_authority(session_id, authority)
1369 .await
1370 }
1371 async fn session_belongs_to_mob(
1372 &self,
1373 session_id: &meerkat_core::types::SessionId,
1374 mob_id: &meerkat_mob::MobId,
1375 ) -> bool {
1376 self.inner.session_belongs_to_mob(session_id, mob_id).await
1377 }
1378 async fn load_persisted_session(
1379 &self,
1380 session_id: &meerkat_core::types::SessionId,
1381 ) -> Result<Option<meerkat_core::session::Session>, SessionError> {
1382 self.inner.load_persisted_session(session_id).await
1383 }
1384 async fn subscribe_session_events(
1385 &self,
1386 session_id: &meerkat_core::types::SessionId,
1387 ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1388 meerkat_mob::MobSessionService::subscribe_session_events(
1389 self.inner.as_ref(),
1390 session_id,
1391 )
1392 .await
1393 }
1394 async fn archive_with_mob_lifecycle_authority(
1395 &self,
1396 session_id: &meerkat_core::types::SessionId,
1397 ) -> Result<(), SessionError> {
1398 self.inner
1399 .archive_with_mob_lifecycle_authority(session_id)
1400 .await
1401 }
1402 async fn execution_snapshot(
1403 &self,
1404 session_id: &meerkat_core::types::SessionId,
1405 ) -> Result<Option<meerkat_core::agent::AgentExecutionSnapshot>, SessionError> {
1406 self.inner.execution_snapshot(session_id).await
1407 }
1408 async fn tool_scope_snapshot(
1409 &self,
1410 session_id: &meerkat_core::types::SessionId,
1411 ) -> Result<Option<meerkat_core::ToolScopeSnapshot>, SessionError> {
1412 self.inner.tool_scope_snapshot(session_id).await
1413 }
1414 async fn external_tool_surface_snapshot(
1415 &self,
1416 session_id: &meerkat_core::types::SessionId,
1417 ) -> Result<Option<meerkat_core::ExternalToolSurfaceSnapshot>, SessionError> {
1418 self.inner.external_tool_surface_snapshot(session_id).await
1419 }
1420 async fn peer_ingress_runtime_snapshot(
1421 &self,
1422 session_id: &meerkat_core::types::SessionId,
1423 ) -> Result<Option<meerkat_core::PeerIngressRuntimeSnapshot>, SessionError> {
1424 self.inner.peer_ingress_runtime_snapshot(session_id).await
1425 }
1426 async fn apply_runtime_turn(
1427 &self,
1428 session_id: &meerkat_core::types::SessionId,
1429 run_id: meerkat_core::lifecycle::RunId,
1430 req: meerkat_core::service::StartTurnRequest,
1431 boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1432 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1433 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1434 #[cfg(test)]
1435 let boundary_name = format!("{boundary:?}");
1436 #[cfg(test)]
1437 let contributing_count = contributing_input_ids.len();
1438 let run_id_for_log = run_id.to_string();
1439 let prompt_summary = if runtime_turn_diagnostics_enabled() {
1440 Some(summarize_runtime_prompt(&req.prompt))
1441 } else {
1442 None
1443 };
1444 if let Some(summary) = prompt_summary.as_ref() {
1445 tracing::warn!(
1446 session_id = %session_id,
1447 run_id = %run_id_for_log,
1448 boundary = ?boundary,
1449 contributing_inputs = contributing_input_ids.len(),
1450 prompt = %summary,
1451 runtime = ?req.runtime,
1452 "mobkit runtime turn start"
1453 );
1454 }
1455 let result = self
1456 .inner
1457 .apply_runtime_turn(
1458 session_id,
1459 run_id,
1460 normalize_runtime_turn_request(req),
1461 boundary,
1462 contributing_input_ids,
1463 )
1464 .await;
1465 #[cfg(test)]
1466 record_runtime_turn_trace(RuntimeTurnTrace {
1467 session_id: session_id.to_string(),
1468 boundary: boundary_name,
1469 contributing_input_count: contributing_count,
1470 outcome: match &result {
1471 Ok(_) => "ok".to_string(),
1472 Err(error) => format!("err:{error}"),
1473 },
1474 });
1475 if runtime_turn_diagnostics_enabled() {
1476 match &result {
1477 Ok(_) => tracing::warn!(
1478 session_id = %session_id,
1479 run_id = %run_id_for_log,
1480 "mobkit runtime turn ok"
1481 ),
1482 Err(error) => tracing::error!(
1483 session_id = %session_id,
1484 run_id = %run_id_for_log,
1485 error = %error,
1486 error_debug = ?error,
1487 "mobkit runtime turn error"
1488 ),
1489 }
1490 }
1491 result
1492 }
1493 async fn apply_runtime_context_appends(
1494 &self,
1495 session_id: &meerkat_core::types::SessionId,
1496 run_id: meerkat_core::lifecycle::RunId,
1497 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1498 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1499 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1500 self.inner
1501 .apply_runtime_context_appends(
1502 session_id,
1503 run_id,
1504 appends,
1505 contributing_input_ids,
1506 )
1507 .await
1508 }
1509 async fn apply_runtime_context_appends_with_boundary(
1510 &self,
1511 session_id: &meerkat_core::types::SessionId,
1512 run_id: meerkat_core::lifecycle::RunId,
1513 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1514 boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1515 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1516 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1517 self.inner
1518 .apply_runtime_context_appends_with_boundary(
1519 session_id,
1520 run_id,
1521 appends,
1522 boundary,
1523 contributing_input_ids,
1524 )
1525 .await
1526 }
1527 async fn apply_runtime_system_context_for_turn(
1528 &self,
1529 session_id: &meerkat_core::types::SessionId,
1530 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1531 ) -> Result<(), SessionError> {
1532 self.inner
1533 .apply_runtime_system_context_for_turn(session_id, appends)
1534 .await
1535 }
1536 async fn stage_runtime_system_context_for_active_turn(
1537 &self,
1538 session_id: &meerkat_core::types::SessionId,
1539 expected_run_id: &meerkat_core::lifecycle::RunId,
1540 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1541 ) -> Result<Option<Vec<u8>>, SessionError> {
1542 self.inner
1543 .stage_runtime_system_context_for_active_turn(
1544 session_id,
1545 expected_run_id,
1546 appends,
1547 )
1548 .await
1549 }
1550 async fn discard_runtime_system_context_for_active_turn(
1551 &self,
1552 session_id: &meerkat_core::types::SessionId,
1553 expected_run_id: &meerkat_core::lifecycle::RunId,
1554 idempotency_keys: Vec<String>,
1555 ) -> Result<(), SessionError> {
1556 self.inner
1557 .discard_runtime_system_context_for_active_turn(
1558 session_id,
1559 expected_run_id,
1560 idempotency_keys,
1561 )
1562 .await
1563 }
1564 async fn active_turn_system_context_boundary_available(
1565 &self,
1566 session_id: &meerkat_core::types::SessionId,
1567 ) -> Result<Option<bool>, SessionError> {
1568 self.inner
1569 .active_turn_system_context_boundary_available(session_id)
1570 .await
1571 }
1572 async fn discard_live_session(
1573 &self,
1574 session_id: &meerkat_core::types::SessionId,
1575 ) -> Result<(), SessionError> {
1576 self.inner.discard_live_session(session_id).await
1577 }
1578 async fn checkpoint_committed_runtime_session_snapshot(
1579 &self,
1580 session_id: &meerkat_core::types::SessionId,
1581 session_snapshot: &[u8],
1582 ) -> Result<(), SessionError> {
1583 self.inner
1584 .checkpoint_committed_runtime_session_snapshot(session_id, session_snapshot)
1585 .await
1586 }
1587 async fn cancel_all_checkpointers(&self) {
1588 self.inner.cancel_all_checkpointers().await;
1589 }
1590 async fn rearm_all_checkpointers(&self) {
1591 self.inner.rearm_all_checkpointers().await;
1592 }
1593 }
1594 };
1595}
1596
1597delegate_mob_session_service!(PreBuildMobSessionService);
1598
1599struct AfterCreateMobSessionService {
1604 inner: Arc<dyn MobSessionService>,
1605 after_hook: AfterCreateHook,
1606}
1607
1608#[async_trait]
1609impl meerkat_core::service::SessionService for AfterCreateMobSessionService {
1610 async fn create_session(
1611 &self,
1612 mut req: CreateSessionRequest,
1613 ) -> Result<meerkat_core::types::RunResult, SessionError> {
1614 sanitize_create_session_request_llm_override(&mut req);
1615 let ctx = SessionCreatedContext {
1625 model: req.model.clone(),
1626 labels: req.labels.clone().unwrap_or_default(),
1627 system_prompt: req.system_prompt.clone(),
1628 };
1629 let result = self.inner.create_session(req).await?;
1630 (self.after_hook)(result.session_id.clone(), ctx).await;
1631 Ok(result)
1632 }
1633 async fn start_turn(
1634 &self,
1635 id: &meerkat_core::types::SessionId,
1636 req: meerkat_core::service::StartTurnRequest,
1637 ) -> Result<meerkat_core::types::RunResult, SessionError> {
1638 self.inner.start_turn(id, req).await
1639 }
1640 async fn interrupt(&self, id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
1641 self.inner.interrupt(id).await
1642 }
1643 async fn cancel_after_boundary(
1644 &self,
1645 id: &meerkat_core::types::SessionId,
1646 ) -> Result<(), SessionError> {
1647 self.inner.cancel_after_boundary(id).await
1648 }
1649 async fn set_session_client(
1650 &self,
1651 id: &meerkat_core::types::SessionId,
1652 client: Arc<dyn meerkat_core::AgentLlmClient>,
1653 ) -> Result<(), SessionError> {
1654 self.inner
1655 .set_session_client(id, ReplaySanitizingAgentLlmClient::wrap(client))
1656 .await
1657 }
1658 async fn hot_swap_session_llm_identity(
1659 &self,
1660 id: &meerkat_core::types::SessionId,
1661 client: Arc<dyn meerkat_core::AgentLlmClient>,
1662 identity: meerkat_core::session::SessionLlmIdentity,
1663 request_policy: meerkat_core::SessionLlmRequestPolicy,
1664 ) -> Result<(), SessionError> {
1665 self.inner
1666 .hot_swap_session_llm_identity(
1667 id,
1668 ReplaySanitizingAgentLlmClient::wrap(client),
1669 identity,
1670 request_policy,
1671 )
1672 .await
1673 }
1674 async fn update_session_keep_alive(
1675 &self,
1676 id: &meerkat_core::types::SessionId,
1677 keep_alive: bool,
1678 ) -> Result<(), SessionError> {
1679 self.inner.update_session_keep_alive(id, keep_alive).await
1680 }
1681 async fn update_session_mob_authority_context(
1682 &self,
1683 id: &meerkat_core::types::SessionId,
1684 authority_context: Option<meerkat_core::service::MobToolAuthorityContext>,
1685 ) -> Result<(), SessionError> {
1686 self.inner
1687 .update_session_mob_authority_context(id, authority_context)
1688 .await
1689 }
1690 async fn has_live_session(
1691 &self,
1692 id: &meerkat_core::types::SessionId,
1693 ) -> Result<bool, SessionError> {
1694 self.inner.has_live_session(id).await
1695 }
1696 async fn set_session_tool_visibility_state(
1697 &self,
1698 id: &meerkat_core::types::SessionId,
1699 state: Option<meerkat_core::SessionToolVisibilityState>,
1700 ) -> Result<(), SessionError> {
1701 self.inner
1702 .set_session_tool_visibility_state(id, state)
1703 .await
1704 }
1705 async fn set_session_tool_filter(
1706 &self,
1707 id: &meerkat_core::types::SessionId,
1708 filter: meerkat_core::ToolFilter,
1709 ) -> Result<(), SessionError> {
1710 self.inner.set_session_tool_filter(id, filter).await
1711 }
1712 async fn read(
1713 &self,
1714 id: &meerkat_core::types::SessionId,
1715 ) -> Result<meerkat_core::service::SessionView, SessionError> {
1716 self.inner.read(id).await
1717 }
1718 async fn list(
1719 &self,
1720 query: meerkat_core::service::SessionQuery,
1721 ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
1722 self.inner.list(query).await
1723 }
1724 async fn archive(&self, id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
1725 self.inner.archive(id).await
1726 }
1727 async fn subscribe_session_events(
1728 &self,
1729 id: &meerkat_core::types::SessionId,
1730 ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1731 meerkat_core::service::SessionService::subscribe_session_events(self.inner.as_ref(), id)
1732 .await
1733 }
1734}
1735
1736#[async_trait]
1737impl meerkat_core::service::SessionServiceCommsExt for AfterCreateMobSessionService {
1738 async fn comms_runtime(
1739 &self,
1740 id: &meerkat_core::types::SessionId,
1741 ) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
1742 self.inner.comms_runtime(id).await
1743 }
1744
1745 async fn event_injector(
1746 &self,
1747 id: &meerkat_core::types::SessionId,
1748 ) -> Option<Arc<dyn meerkat_core::EventInjector>> {
1749 self.inner.event_injector(id).await
1750 }
1751
1752 async fn interaction_event_injector(
1753 &self,
1754 id: &meerkat_core::types::SessionId,
1755 ) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
1756 self.inner.interaction_event_injector(id).await
1757 }
1758}
1759
1760#[async_trait]
1761impl meerkat_core::service::SessionServiceControlExt for AfterCreateMobSessionService {
1762 async fn append_system_context(
1763 &self,
1764 id: &meerkat_core::types::SessionId,
1765 req: meerkat_core::service::AppendSystemContextRequest,
1766 ) -> Result<
1767 meerkat_core::service::AppendSystemContextResult,
1768 meerkat_core::service::SessionControlError,
1769 > {
1770 self.inner.append_system_context(id, req).await
1771 }
1772 async fn stage_tool_results(
1773 &self,
1774 id: &meerkat_core::types::SessionId,
1775 req: meerkat_core::service::StageToolResultsRequest,
1776 ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
1777 self.inner.stage_tool_results(id, req).await
1778 }
1779}
1780
1781#[async_trait]
1782impl meerkat_core::service::SessionServiceHistoryExt for AfterCreateMobSessionService {
1783 async fn read_history(
1784 &self,
1785 id: &meerkat_core::types::SessionId,
1786 query: meerkat_core::service::SessionHistoryQuery,
1787 ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
1788 self.inner.read_history(id, query).await
1789 }
1790}
1791
1792#[async_trait]
1793impl MobSessionService for AfterCreateMobSessionService {
1794 fn supports_persistent_sessions(&self) -> bool {
1795 self.inner.supports_persistent_sessions()
1796 }
1797 fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
1798 self.inner.runtime_adapter()
1799 }
1800 async fn interrupt_with_machine_authority(
1801 &self,
1802 session_id: &meerkat_core::types::SessionId,
1803 authority: meerkat_runtime::MachineSessionControlAuthority,
1804 ) -> Result<(), SessionError> {
1805 self.inner
1806 .interrupt_with_machine_authority(session_id, authority)
1807 .await
1808 }
1809 async fn cancel_after_boundary_with_machine_authority(
1810 &self,
1811 session_id: &meerkat_core::types::SessionId,
1812 authority: meerkat_runtime::MachineSessionControlAuthority,
1813 ) -> Result<(), SessionError> {
1814 self.inner
1815 .cancel_after_boundary_with_machine_authority(session_id, authority)
1816 .await
1817 }
1818 async fn session_belongs_to_mob(
1819 &self,
1820 session_id: &meerkat_core::types::SessionId,
1821 mob_id: &meerkat_mob::MobId,
1822 ) -> bool {
1823 self.inner.session_belongs_to_mob(session_id, mob_id).await
1824 }
1825 async fn load_persisted_session(
1826 &self,
1827 session_id: &meerkat_core::types::SessionId,
1828 ) -> Result<Option<meerkat_core::session::Session>, SessionError> {
1829 self.inner.load_persisted_session(session_id).await
1830 }
1831 async fn subscribe_session_events(
1832 &self,
1833 session_id: &meerkat_core::types::SessionId,
1834 ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1835 meerkat_mob::MobSessionService::subscribe_session_events(self.inner.as_ref(), session_id)
1836 .await
1837 }
1838 async fn archive_with_mob_lifecycle_authority(
1839 &self,
1840 session_id: &meerkat_core::types::SessionId,
1841 ) -> Result<(), SessionError> {
1842 self.inner
1843 .archive_with_mob_lifecycle_authority(session_id)
1844 .await
1845 }
1846 async fn execution_snapshot(
1847 &self,
1848 session_id: &meerkat_core::types::SessionId,
1849 ) -> Result<Option<meerkat_core::agent::AgentExecutionSnapshot>, SessionError> {
1850 self.inner.execution_snapshot(session_id).await
1851 }
1852 async fn tool_scope_snapshot(
1853 &self,
1854 session_id: &meerkat_core::types::SessionId,
1855 ) -> Result<Option<meerkat_core::ToolScopeSnapshot>, SessionError> {
1856 self.inner.tool_scope_snapshot(session_id).await
1857 }
1858 async fn external_tool_surface_snapshot(
1859 &self,
1860 session_id: &meerkat_core::types::SessionId,
1861 ) -> Result<Option<meerkat_core::ExternalToolSurfaceSnapshot>, SessionError> {
1862 self.inner.external_tool_surface_snapshot(session_id).await
1863 }
1864 async fn peer_ingress_runtime_snapshot(
1865 &self,
1866 session_id: &meerkat_core::types::SessionId,
1867 ) -> Result<Option<meerkat_core::PeerIngressRuntimeSnapshot>, SessionError> {
1868 self.inner.peer_ingress_runtime_snapshot(session_id).await
1869 }
1870 async fn apply_runtime_turn(
1871 &self,
1872 session_id: &meerkat_core::types::SessionId,
1873 run_id: meerkat_core::lifecycle::RunId,
1874 req: meerkat_core::service::StartTurnRequest,
1875 boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1876 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1877 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1878 self.inner
1879 .apply_runtime_turn(session_id, run_id, req, boundary, contributing_input_ids)
1880 .await
1881 }
1882 async fn apply_runtime_context_appends(
1883 &self,
1884 session_id: &meerkat_core::types::SessionId,
1885 run_id: meerkat_core::lifecycle::RunId,
1886 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1887 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1888 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1889 self.inner
1890 .apply_runtime_context_appends(session_id, run_id, appends, contributing_input_ids)
1891 .await
1892 }
1893 async fn apply_runtime_context_appends_with_boundary(
1894 &self,
1895 session_id: &meerkat_core::types::SessionId,
1896 run_id: meerkat_core::lifecycle::RunId,
1897 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1898 boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1899 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1900 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1901 self.inner
1902 .apply_runtime_context_appends_with_boundary(
1903 session_id,
1904 run_id,
1905 appends,
1906 boundary,
1907 contributing_input_ids,
1908 )
1909 .await
1910 }
1911 async fn apply_runtime_system_context_for_turn(
1912 &self,
1913 session_id: &meerkat_core::types::SessionId,
1914 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1915 ) -> Result<(), SessionError> {
1916 self.inner
1917 .apply_runtime_system_context_for_turn(session_id, appends)
1918 .await
1919 }
1920 async fn stage_runtime_system_context_for_active_turn(
1921 &self,
1922 session_id: &meerkat_core::types::SessionId,
1923 expected_run_id: &meerkat_core::lifecycle::RunId,
1924 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1925 ) -> Result<Option<Vec<u8>>, SessionError> {
1926 self.inner
1927 .stage_runtime_system_context_for_active_turn(session_id, expected_run_id, appends)
1928 .await
1929 }
1930 async fn discard_runtime_system_context_for_active_turn(
1931 &self,
1932 session_id: &meerkat_core::types::SessionId,
1933 expected_run_id: &meerkat_core::lifecycle::RunId,
1934 idempotency_keys: Vec<String>,
1935 ) -> Result<(), SessionError> {
1936 self.inner
1937 .discard_runtime_system_context_for_active_turn(
1938 session_id,
1939 expected_run_id,
1940 idempotency_keys,
1941 )
1942 .await
1943 }
1944 async fn active_turn_system_context_boundary_available(
1945 &self,
1946 session_id: &meerkat_core::types::SessionId,
1947 ) -> Result<Option<bool>, SessionError> {
1948 self.inner
1949 .active_turn_system_context_boundary_available(session_id)
1950 .await
1951 }
1952 async fn discard_live_session(
1953 &self,
1954 session_id: &meerkat_core::types::SessionId,
1955 ) -> Result<(), SessionError> {
1956 self.inner.discard_live_session(session_id).await
1957 }
1958 async fn checkpoint_committed_runtime_session_snapshot(
1959 &self,
1960 session_id: &meerkat_core::types::SessionId,
1961 session_snapshot: &[u8],
1962 ) -> Result<(), SessionError> {
1963 self.inner
1964 .checkpoint_committed_runtime_session_snapshot(session_id, session_snapshot)
1965 .await
1966 }
1967 async fn cancel_all_checkpointers(&self) {
1968 self.inner.cancel_all_checkpointers().await;
1969 }
1970 async fn rearm_all_checkpointers(&self) {
1971 self.inner.rearm_all_checkpointers().await;
1972 }
1973}
1974
1975pub struct MobBootstrapSpec {
1977 pub definition: MobDefinition,
1978 pub storage: MobStorage,
1979 pub session_service: Arc<dyn MobSessionService>,
1980 pub binary_blob_store: Option<Arc<dyn BinaryBlobStore>>,
1981 pub(crate) agent_mob_mcp_state: Option<Arc<meerkat_mob_mcp::MobMcpState>>,
1982 pub(crate) implicit_delegate_retirement_overrides: Option<ImplicitDelegateRetirementOverrides>,
1983 pub(crate) agent_mob_default_llm_client_slot: Option<SharedDefaultLlmClientSlot>,
1984 pub options: MobBootstrapOptions,
1985 pub runtime_adapter: Option<Arc<meerkat_runtime::MeerkatMachine>>,
1991 pub(crate) _ephemeral_dir: Option<Arc<tempfile::TempDir>>,
1994}
1995
1996impl MobBootstrapSpec {
1997 pub fn new(
1998 definition: MobDefinition,
1999 storage: MobStorage,
2000 session_service: Arc<dyn MobSessionService>,
2001 ) -> Self {
2002 let session_service = Arc::new(PreBuildMobSessionService {
2003 inner: session_service,
2004 hook: no_op_pre_build_hook(),
2005 after_create_hook: None,
2006 runtime_adapter_override: None,
2007 }) as Arc<dyn MobSessionService>;
2008 Self {
2009 definition,
2010 storage,
2011 session_service,
2012 binary_blob_store: None,
2013 agent_mob_mcp_state: None,
2014 implicit_delegate_retirement_overrides: None,
2015 agent_mob_default_llm_client_slot: None,
2016 options: MobBootstrapOptions {
2017 allow_ephemeral_sessions: true,
2018 notify_orchestrator_on_resume: true,
2019 default_llm_client: None,
2020 },
2021 runtime_adapter: None,
2022 _ephemeral_dir: None,
2023 }
2024 }
2025
2026 pub fn with_options(mut self, options: MobBootstrapOptions) -> Self {
2027 self.options = options;
2028 self
2029 }
2030
2031 pub fn with_session_runtime_adapter(
2039 mut self,
2040 adapter: Arc<meerkat_runtime::MeerkatMachine>,
2041 ) -> Self {
2042 self.session_service = Arc::new(PreBuildMobSessionService {
2043 inner: self.session_service,
2044 hook: no_op_pre_build_hook(),
2045 after_create_hook: None,
2046 runtime_adapter_override: Some(adapter),
2047 });
2048 self
2049 }
2050
2051 pub fn with_after_create_hook(mut self, hook: AfterCreateHook) -> Self {
2057 self.session_service = Arc::new(AfterCreateMobSessionService {
2058 inner: self.session_service,
2059 after_hook: hook,
2060 });
2061 self
2062 }
2063
2064 pub fn ephemeral(
2069 definition: MobDefinition,
2070 storage: MobStorage,
2071 store_path: PathBuf,
2072 max_sessions: usize,
2073 session_store: Option<Arc<dyn AgentSessionStore>>,
2074 ) -> Self {
2075 Self::ephemeral_inner(
2076 definition,
2077 storage,
2078 store_path,
2079 max_sessions,
2080 session_store,
2081 None,
2082 CapabilityFlags::default(),
2083 None,
2084 None,
2085 )
2086 }
2087
2088 pub fn ephemeral_with_hook(
2092 definition: MobDefinition,
2093 storage: MobStorage,
2094 store_path: PathBuf,
2095 max_sessions: usize,
2096 session_store: Option<Arc<dyn AgentSessionStore>>,
2097 hook: impl Fn(
2098 &mut CreateSessionRequest,
2099 ) -> std::pin::Pin<
2100 Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
2101 > + Send
2102 + Sync
2103 + 'static,
2104 ) -> Self {
2105 Self::ephemeral_inner(
2106 definition,
2107 storage,
2108 store_path,
2109 max_sessions,
2110 session_store,
2111 Some(Arc::new(hook)),
2112 CapabilityFlags::default(),
2113 None,
2114 None,
2115 )
2116 }
2117
2118 #[allow(clippy::too_many_arguments)]
2119 pub(crate) fn ephemeral_inner(
2120 definition: MobDefinition,
2121 storage: MobStorage,
2122 store_path: PathBuf,
2123 max_sessions: usize,
2124 session_store: Option<Arc<dyn AgentSessionStore>>,
2125 hook: Option<PreBuildHook>,
2126 mut caps: CapabilityFlags,
2127 after_create_hook: Option<AfterCreateHook>,
2128 agent_config: Option<Config>,
2129 ) -> Self {
2130 caps.image_generation |= mob_definition_may_use_image_generation(&definition);
2131 let binary_blob_store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
2132 let blob_store: Arc<dyn meerkat_core::BlobStore> =
2133 Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
2134 let runtime_adapter = if caps.image_generation {
2135 let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2136 Arc::new(meerkat_runtime::InMemoryRuntimeStore::new());
2137 Some(Arc::new(meerkat_runtime::MeerkatMachine::persistent(
2138 runtime_store,
2139 Arc::clone(&blob_store),
2140 )))
2141 } else {
2142 None
2143 };
2144 let mut factory = AgentFactory::new(&store_path)
2145 .builtins(caps.builtins)
2146 .shell(caps.shell)
2147 .mob(caps.mob)
2148 .comms(caps.comms)
2149 .memory(caps.memory);
2150 if let Some(machine) = runtime_adapter.clone() {
2151 factory = factory.with_image_generation_machine(machine);
2152 }
2153 let config = agent_config.unwrap_or_default();
2154 let mut builder = FactoryAgentBuilder::new(factory, config);
2155 builder.default_blob_store = Some(blob_store);
2156 if let Some(store) = session_store {
2157 builder.default_session_store = Some(store);
2158 }
2159 let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
2160 let session_service: Arc<dyn MobSessionService> = Arc::new(
2161 meerkat_session::EphemeralSessionService::new(builder, max_sessions),
2162 );
2163 let hook = hook.unwrap_or_else(no_op_pre_build_hook);
2164 let after_create_hook = if let Some(runtime_adapter) = runtime_adapter.clone() {
2165 let user_after_create_hook = after_create_hook.clone();
2166 Some(Arc::new(
2167 move |session_id: meerkat_core::types::SessionId, ctx: SessionCreatedContext| {
2168 let runtime_adapter = runtime_adapter.clone();
2169 let user_after_create_hook = user_after_create_hook.clone();
2170 Box::pin(async move {
2171 runtime_adapter.register_session(session_id.clone()).await;
2172 if let Some(user_after_create_hook) = user_after_create_hook {
2173 user_after_create_hook(session_id, ctx).await;
2174 }
2175 })
2176 as std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
2177 },
2178 ) as AfterCreateHook)
2179 } else {
2180 after_create_hook
2181 };
2182 let session_service = Arc::new(PreBuildMobSessionService {
2183 inner: session_service,
2184 hook,
2185 after_create_hook,
2186 runtime_adapter_override: runtime_adapter.clone(),
2187 }) as Arc<dyn MobSessionService>;
2188 let (
2189 agent_mob_mcp_state,
2190 implicit_delegate_retirement_overrides,
2191 agent_mob_default_llm_client_slot,
2192 ) = install_agent_mob_tools(&definition, mob_tools_slot, Arc::clone(&session_service));
2193 let mut spec = Self::new(definition, storage, session_service);
2194 spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
2195 spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
2196 spec.agent_mob_default_llm_client_slot = Some(agent_mob_default_llm_client_slot);
2197 spec.runtime_adapter = runtime_adapter;
2198 spec.binary_blob_store = Some(binary_blob_store);
2199 spec
2200 }
2201
2202 pub fn persistent(
2209 definition: MobDefinition,
2210 storage: MobStorage,
2211 store_path: PathBuf,
2212 max_sessions: usize,
2213 session_store: Arc<dyn SessionStore>,
2214 ) -> Self {
2215 Self::persistent_inner(
2216 definition,
2217 storage,
2218 store_path,
2219 max_sessions,
2220 session_store,
2221 None,
2222 None,
2223 CapabilityFlags::default(),
2224 None,
2225 None,
2226 )
2227 }
2228
2229 pub fn persistent_with_hook(
2233 definition: MobDefinition,
2234 storage: MobStorage,
2235 store_path: PathBuf,
2236 max_sessions: usize,
2237 session_store: Arc<dyn SessionStore>,
2238 hook: impl Fn(
2239 &mut CreateSessionRequest,
2240 ) -> std::pin::Pin<
2241 Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
2242 > + Send
2243 + Sync
2244 + 'static,
2245 ) -> Self {
2246 Self::persistent_inner(
2247 definition,
2248 storage,
2249 store_path,
2250 max_sessions,
2251 session_store,
2252 None,
2253 Some(Arc::new(hook)),
2254 CapabilityFlags::default(),
2255 None,
2256 None,
2257 )
2258 }
2259
2260 #[allow(clippy::too_many_arguments)]
2261 pub(crate) fn persistent_inner(
2262 definition: MobDefinition,
2263 storage: MobStorage,
2264 store_path: PathBuf,
2265 max_sessions: usize,
2266 session_store: Arc<dyn SessionStore>,
2267 custom_blob_store: Option<Arc<dyn meerkat_core::BlobStore>>,
2268 hook: Option<PreBuildHook>,
2269 mut caps: CapabilityFlags,
2270 after_create_hook: Option<AfterCreateHook>,
2271 agent_config: Option<Config>,
2272 ) -> Self {
2273 caps.image_generation |= mob_definition_may_use_image_generation(&definition);
2274 let (binary_blob_store, blob_store): (
2275 Arc<dyn BinaryBlobStore>,
2276 Arc<dyn meerkat_core::BlobStore>,
2277 ) = if let Some(blob_store) = custom_blob_store {
2278 (
2279 Arc::new(BinaryBlobStoreAdapter::new(blob_store.clone())),
2280 blob_store,
2281 )
2282 } else {
2283 let binary_blob_store: Arc<dyn BinaryBlobStore> = match ObjectStoreBlobStore::local(
2284 store_path.join("blobs"),
2285 ) {
2286 Ok(store) => Arc::new(store),
2287 Err(err) => {
2288 tracing::warn!(
2289 error = %err,
2290 "failed to initialize persistent binary blob store; falling back to in-memory blobs"
2291 );
2292 Arc::new(ObjectStoreBlobStore::memory())
2293 }
2294 };
2295 let blob_store: Arc<dyn meerkat_core::BlobStore> =
2296 Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
2297 (binary_blob_store, blob_store)
2298 };
2299 let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2309 build_persistent_runtime_store(&store_path);
2310 let runtime_adapter = Arc::new(meerkat_runtime::MeerkatMachine::persistent(
2311 Arc::clone(&runtime_store),
2312 Arc::clone(&blob_store),
2313 ));
2314 let mut factory = AgentFactory::new(&store_path)
2315 .builtins(caps.builtins)
2316 .shell(caps.shell)
2317 .mob(caps.mob)
2318 .comms(caps.comms)
2319 .memory(caps.memory);
2320 if caps.image_generation {
2321 factory = factory.with_image_generation_machine(runtime_adapter.clone());
2322 }
2323 let config = agent_config.unwrap_or_default();
2324 let mut builder = FactoryAgentBuilder::new(factory, config);
2325 builder.default_session_store = Some(Arc::new(StoreAdapter::new(session_store.clone())));
2326 builder.default_blob_store = Some(blob_store.clone());
2327 let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
2328 let session_service: Arc<dyn MobSessionService> =
2329 Arc::new(meerkat_session::PersistentSessionService::new(
2330 builder,
2331 max_sessions,
2332 session_store,
2333 Some(runtime_store),
2334 blob_store,
2335 ));
2336 let hook = hook.unwrap_or_else(no_op_pre_build_hook);
2337 let session_service = Arc::new(PreBuildMobSessionService {
2338 inner: session_service,
2339 hook,
2340 after_create_hook,
2341 runtime_adapter_override: None,
2342 }) as Arc<dyn MobSessionService>;
2343 let (
2344 agent_mob_mcp_state,
2345 implicit_delegate_retirement_overrides,
2346 agent_mob_default_llm_client_slot,
2347 ) = install_agent_mob_tools(&definition, mob_tools_slot, Arc::clone(&session_service));
2348 let mut spec = Self::new(definition, storage, session_service);
2349 spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
2350 spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
2351 spec.agent_mob_default_llm_client_slot = Some(agent_mob_default_llm_client_slot);
2352 spec.runtime_adapter = Some(runtime_adapter);
2353 spec.binary_blob_store = Some(binary_blob_store);
2354 spec
2355 }
2356
2357 #[allow(clippy::too_many_arguments)]
2358 pub(crate) fn ephemeral_runtime_backed_inner(
2359 definition: MobDefinition,
2360 storage: MobStorage,
2361 store_path: PathBuf,
2362 max_sessions: usize,
2363 custom_session_store: Option<Arc<dyn SessionStore>>,
2364 custom_blob_store: Option<Arc<dyn meerkat_core::BlobStore>>,
2365 hook: Option<PreBuildHook>,
2366 mut caps: CapabilityFlags,
2367 after_create_hook: Option<AfterCreateHook>,
2368 agent_config: Option<Config>,
2369 ) -> Self {
2370 caps.image_generation |= mob_definition_may_use_image_generation(&definition);
2371 let config = agent_config.unwrap_or_default();
2372 let session_store: Arc<dyn SessionStore> = custom_session_store
2373 .clone()
2374 .unwrap_or_else(|| Arc::new(meerkat_store::MemoryStore::new()));
2375 let (binary_blob_store, blob_store): (
2376 Arc<dyn BinaryBlobStore>,
2377 Arc<dyn meerkat_core::BlobStore>,
2378 ) = if let Some(blob_store) = custom_blob_store {
2379 (
2380 Arc::new(BinaryBlobStoreAdapter::new(blob_store.clone())),
2381 blob_store,
2382 )
2383 } else {
2384 let binary_blob_store: Arc<dyn BinaryBlobStore> =
2385 Arc::new(ObjectStoreBlobStore::memory());
2386 let blob_store: Arc<dyn meerkat_core::BlobStore> =
2387 Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
2388 (binary_blob_store, blob_store)
2389 };
2390 let base_runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2398 Arc::new(meerkat_runtime::InMemoryRuntimeStore::new());
2399 let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2400 if let Some(custom_session_store) = custom_session_store.clone() {
2401 Arc::new(SessionStoreBackedRuntimeStore::new(
2402 Arc::clone(&base_runtime_store),
2403 custom_session_store,
2404 ))
2405 } else {
2406 Arc::clone(&base_runtime_store)
2407 };
2408 let runtime_adapter = Arc::new(meerkat_runtime::MeerkatMachine::persistent(
2409 Arc::clone(&runtime_store),
2410 Arc::clone(&blob_store),
2411 ));
2412 let mut factory = AgentFactory::new(&store_path)
2413 .builtins(caps.builtins)
2414 .shell(caps.shell)
2415 .mob(caps.mob)
2416 .comms(caps.comms)
2417 .memory(caps.memory);
2418 if caps.image_generation {
2419 factory = factory.with_image_generation_machine(runtime_adapter.clone());
2420 }
2421 let mut builder = FactoryAgentBuilder::new(factory, config);
2422 builder.default_session_store = Some(Arc::new(StoreAdapter::new(session_store.clone())));
2423 builder.default_blob_store = Some(blob_store.clone());
2424 let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
2425 let session_service: Arc<dyn MobSessionService> =
2426 if let Some(custom_session_store) = custom_session_store {
2427 Arc::new(meerkat_session::PersistentSessionService::new(
2428 builder,
2429 max_sessions,
2430 custom_session_store,
2431 Some(runtime_store.clone()),
2432 blob_store,
2433 ))
2434 } else {
2435 Arc::new(meerkat_session::EphemeralSessionService::new(
2436 builder,
2437 max_sessions,
2438 ))
2439 };
2440 let hook = hook.unwrap_or_else(no_op_pre_build_hook);
2441 let runtime_adapter_for_after_create = runtime_adapter.clone();
2442 let combined_after_create_hook: AfterCreateHook = Arc::new(move |session_id, ctx| {
2443 let runtime_adapter = runtime_adapter_for_after_create.clone();
2444 let after_create_hook = after_create_hook.clone();
2445 Box::pin(async move {
2446 runtime_adapter.register_session(session_id.clone()).await;
2447 if let Some(after_create_hook) = after_create_hook {
2448 after_create_hook(session_id, ctx).await;
2449 }
2450 })
2451 });
2452 let session_service = Arc::new(PreBuildMobSessionService {
2453 inner: session_service,
2454 hook,
2455 after_create_hook: Some(combined_after_create_hook),
2456 runtime_adapter_override: Some(runtime_adapter.clone()),
2457 }) as Arc<dyn MobSessionService>;
2458 let (
2459 agent_mob_mcp_state,
2460 implicit_delegate_retirement_overrides,
2461 agent_mob_default_llm_client_slot,
2462 ) = install_agent_mob_tools(&definition, mob_tools_slot, Arc::clone(&session_service));
2463 let mut spec = Self::new(definition, storage, session_service);
2464 spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
2465 spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
2466 spec.agent_mob_default_llm_client_slot = Some(agent_mob_default_llm_client_slot);
2467 spec.runtime_adapter = Some(runtime_adapter);
2468 spec.binary_blob_store = Some(binary_blob_store);
2469 spec
2470 }
2471}
2472
2473#[derive(Debug)]
2475pub enum MobRuntimeError {
2476 Mob(MobError),
2477 InvalidInput(&'static str),
2478 InvalidConfig(String),
2479}
2480
2481impl std::fmt::Display for MobRuntimeError {
2482 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2483 match self {
2484 Self::Mob(err) => write!(f, "{err}"),
2485 Self::InvalidInput(message) => write!(f, "{message}"),
2486 Self::InvalidConfig(message) => write!(f, "{message}"),
2487 }
2488 }
2489}
2490
2491impl std::error::Error for MobRuntimeError {}
2492
2493impl From<MobError> for MobRuntimeError {
2494 fn from(value: MobError) -> Self {
2495 Self::Mob(value)
2496 }
2497}
2498
2499#[derive(Clone, Debug)]
2508pub struct SessionCreatedContext {
2509 pub model: String,
2510 pub labels: std::collections::BTreeMap<String, String>,
2511 pub system_prompt: Option<String>,
2512}
2513
2514#[async_trait]
2521pub trait SessionHook: Send + Sync {
2522 async fn before_create(&self, _req: &mut CreateSessionRequest) -> Result<(), SessionError> {
2526 Ok(())
2527 }
2528
2529 async fn after_create(
2532 &self,
2533 _session_id: &meerkat_core::types::SessionId,
2534 _ctx: &SessionCreatedContext,
2535 ) {
2536 }
2537}
2538
2539#[derive(Clone, Copy, Debug)]
2541pub struct CapabilityFlags {
2542 pub builtins: bool,
2543 pub shell: bool,
2544 pub mob: bool,
2545 pub comms: bool,
2546 pub memory: bool,
2547 pub image_generation: bool,
2548}
2549
2550impl Default for CapabilityFlags {
2551 fn default() -> Self {
2552 Self {
2553 builtins: true,
2554 shell: true,
2555 mob: true,
2556 comms: true,
2557 memory: true,
2558 image_generation: false,
2559 }
2560 }
2561}
2562
2563pub type RealMobRuntime = MobRuntime;
2565
2566#[derive(Clone)]
2568pub struct MobRuntime {
2569 handle: MobHandle,
2570 session_service: Option<Arc<dyn MobSessionService>>,
2571 agent_mob_mcp_state: Option<Arc<meerkat_mob_mcp::MobMcpState>>,
2572 implicit_delegate_retirement_overrides: Option<ImplicitDelegateRetirementOverrides>,
2573 binary_blob_store: Option<Arc<dyn BinaryBlobStore>>,
2574 baseline_member_specs: Arc<tokio::sync::RwLock<Vec<SpawnMemberSpec>>>,
2575 _ephemeral_dir: Option<Arc<tempfile::TempDir>>,
2578}
2579
2580impl MobRuntime {
2581 pub async fn bootstrap(spec: MobBootstrapSpec) -> Result<Self, MobRuntimeError> {
2582 let ephemeral_dir = spec._ephemeral_dir.clone();
2583 let session_service = spec.session_service.clone();
2584 let binary_blob_store = spec.binary_blob_store.clone();
2585 let mob_id = spec.definition.id.clone();
2586 let agent_mob_mcp_state = spec.agent_mob_mcp_state.clone();
2587 let implicit_delegate_retirement_overrides =
2588 spec.implicit_delegate_retirement_overrides.clone();
2589 let default_llm_client = spec
2590 .options
2591 .default_llm_client
2592 .clone()
2593 .map(ReplaySanitizingLlmClient::wrap);
2594 if let Some(slot) = spec.agent_mob_default_llm_client_slot.as_ref() {
2595 *slot
2596 .write()
2597 .unwrap_or_else(std::sync::PoisonError::into_inner) = default_llm_client.clone();
2598 }
2599 let effective_runtime_adapter = spec
2600 .runtime_adapter
2601 .clone()
2602 .or_else(|| session_service.runtime_adapter());
2603
2604 let mut builder = MobBuilder::new(spec.definition, spec.storage);
2605
2606 if let Some(adapter) = effective_runtime_adapter {
2613 builder = builder.with_runtime_adapter(adapter);
2614 }
2615
2616 builder = builder
2617 .with_session_service(session_service.clone())
2618 .allow_ephemeral_sessions(spec.options.allow_ephemeral_sessions)
2619 .notify_orchestrator_on_resume(spec.options.notify_orchestrator_on_resume);
2620
2621 if let Some(client) = default_llm_client {
2622 builder = builder.with_default_llm_client(client);
2623 }
2624
2625 let handle = builder.create().await?;
2626 if let Some(state) = agent_mob_mcp_state.as_ref() {
2627 state.mob_insert_handle(mob_id, handle.clone()).await;
2628 }
2629 Ok(Self {
2630 handle,
2631 session_service: Some(session_service),
2632 agent_mob_mcp_state,
2633 implicit_delegate_retirement_overrides,
2634 binary_blob_store,
2635 baseline_member_specs: Arc::new(tokio::sync::RwLock::new(Vec::new())),
2636 _ephemeral_dir: ephemeral_dir,
2637 })
2638 }
2639
2640 pub fn from_handle(handle: MobHandle) -> Self {
2641 Self {
2642 handle,
2643 session_service: None,
2644 agent_mob_mcp_state: None,
2645 implicit_delegate_retirement_overrides: None,
2646 binary_blob_store: None,
2647 baseline_member_specs: Arc::new(tokio::sync::RwLock::new(Vec::new())),
2648 _ephemeral_dir: None,
2649 }
2650 }
2651
2652 pub fn handle(&self) -> MobHandle {
2653 self.handle.clone()
2654 }
2655
2656 pub fn agent_mob_mcp_state(&self) -> Option<Arc<meerkat_mob_mcp::MobMcpState>> {
2657 self.agent_mob_mcp_state.clone()
2658 }
2659
2660 pub(crate) fn implicit_delegate_retirement_overrides(
2661 &self,
2662 ) -> Option<ImplicitDelegateRetirementOverrides> {
2663 self.implicit_delegate_retirement_overrides.clone()
2664 }
2665
2666 pub async fn set_baseline_member_specs(&self, specs: Vec<SpawnMemberSpec>) {
2667 *self.baseline_member_specs.write().await = specs;
2668 }
2669
2670 pub async fn baseline_member_specs(&self) -> Vec<SpawnMemberSpec> {
2671 self.baseline_member_specs.read().await.clone()
2672 }
2673
2674 pub async fn read_session_history(
2675 &self,
2676 session_id_str: &str,
2677 offset: usize,
2678 limit: Option<usize>,
2679 ) -> Result<SessionHistoryPage, MobRuntimeError> {
2680 if session_id_str.trim().is_empty() {
2681 return Err(MobRuntimeError::InvalidInput(
2682 "session_id must not be empty",
2683 ));
2684 }
2685 let Some(session_service) = self.session_service.as_ref() else {
2686 return Err(MobRuntimeError::InvalidInput(
2687 "session history unavailable for this runtime",
2688 ));
2689 };
2690 let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2691 .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2692 SessionServiceHistoryExt::read_history(
2693 session_service.as_ref(),
2694 &session_id,
2695 SessionHistoryQuery { offset, limit },
2696 )
2697 .await
2698 .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))
2699 }
2700
2701 #[allow(dead_code)]
2702 pub(crate) async fn runtime_state_for_session(
2703 &self,
2704 session_id_str: &str,
2705 ) -> Result<Option<meerkat_runtime::RuntimeState>, MobRuntimeError> {
2706 if session_id_str.trim().is_empty() {
2707 return Err(MobRuntimeError::InvalidInput(
2708 "session_id must not be empty",
2709 ));
2710 }
2711 let Some(session_service) = self.session_service.as_ref() else {
2712 return Ok(None);
2713 };
2714 let Some(runtime_adapter) = session_service.runtime_adapter() else {
2715 return Ok(None);
2716 };
2717 let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2718 .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2719 let state = meerkat_runtime::service_ext::SessionServiceRuntimeExt::runtime_state(
2720 runtime_adapter.as_ref(),
2721 &session_id,
2722 )
2723 .await
2724 .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))?;
2725 Ok(Some(state))
2726 }
2727
2728 #[allow(dead_code)]
2729 pub(crate) async fn comms_runtime_for_session(
2730 &self,
2731 session_id_str: &str,
2732 ) -> Result<Option<Arc<dyn CommsRuntime>>, MobRuntimeError> {
2733 if session_id_str.trim().is_empty() {
2734 return Err(MobRuntimeError::InvalidInput(
2735 "session_id must not be empty",
2736 ));
2737 }
2738 let Some(session_service) = self.session_service.as_ref() else {
2739 return Ok(None);
2740 };
2741 let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2742 .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2743 Ok(
2744 meerkat_core::service::SessionServiceCommsExt::comms_runtime(
2745 session_service.as_ref(),
2746 &session_id,
2747 )
2748 .await,
2749 )
2750 }
2751
2752 #[allow(dead_code)]
2753 pub(crate) async fn active_input_ids_for_session(
2754 &self,
2755 session_id_str: &str,
2756 ) -> Result<Option<Vec<String>>, MobRuntimeError> {
2757 if session_id_str.trim().is_empty() {
2758 return Err(MobRuntimeError::InvalidInput(
2759 "session_id must not be empty",
2760 ));
2761 }
2762 let Some(session_service) = self.session_service.as_ref() else {
2763 return Ok(None);
2764 };
2765 let Some(runtime_adapter) = session_service.runtime_adapter() else {
2766 return Ok(None);
2767 };
2768 let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2769 .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2770 let input_ids = meerkat_runtime::service_ext::SessionServiceRuntimeExt::list_active_inputs(
2771 runtime_adapter.as_ref(),
2772 &session_id,
2773 )
2774 .await
2775 .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))?;
2776 Ok(Some(
2777 input_ids.into_iter().map(|id| id.to_string()).collect(),
2778 ))
2779 }
2780
2781 #[allow(dead_code)]
2782 pub(crate) async fn ensure_comms_drain_for_session(
2783 &self,
2784 session_id_str: &str,
2785 ) -> Result<Option<bool>, MobRuntimeError> {
2786 if session_id_str.trim().is_empty() {
2787 return Err(MobRuntimeError::InvalidInput(
2788 "session_id must not be empty",
2789 ));
2790 }
2791 let Some(session_service) = self.session_service.as_ref() else {
2792 return Ok(None);
2793 };
2794 let Some(runtime_adapter) = session_service.runtime_adapter() else {
2795 return Ok(None);
2796 };
2797 let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2798 .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2799 let comms_runtime = meerkat_core::service::SessionServiceCommsExt::comms_runtime(
2800 session_service.as_ref(),
2801 &session_id,
2802 )
2803 .await;
2804 if let Some(comms) = comms_runtime {
2805 let _handle = meerkat_runtime::comms_drain::spawn_comms_drain(
2806 runtime_adapter.clone(),
2807 session_id,
2808 comms,
2809 None,
2810 );
2811 Ok(Some(true))
2812 } else {
2813 Ok(Some(false))
2814 }
2815 }
2816
2817 pub fn session_service(&self) -> Option<&Arc<dyn MobSessionService>> {
2823 self.session_service.as_ref()
2824 }
2825
2826 pub fn binary_blob_store(&self) -> Option<Arc<dyn BinaryBlobStore>> {
2827 self.binary_blob_store.clone()
2828 }
2829}
2830
2831pub fn member_entry_to_json(entry: &meerkat_mob::runtime::MobMemberListEntry) -> serde_json::Value {
2838 serde_json::to_value(entry).unwrap_or(serde_json::Value::Null)
2839}
2840
2841pub fn content_input_has_images(content: &meerkat_core::ContentInput) -> bool {
2842 match content {
2843 meerkat_core::ContentInput::Text(_) => false,
2844 meerkat_core::ContentInput::Blocks(blocks) => blocks
2845 .iter()
2846 .any(|block| matches!(block, meerkat_core::ContentBlock::Image { .. })),
2847 }
2848}
2849
2850pub fn model_capabilities_for_model(
2851 provider: Provider,
2852 model: &str,
2853) -> crate::runtime::ConsoleModelCapabilities {
2854 let image_input = meerkat_core::model_profile::profile_for(provider, model)
2855 .map(|profile| profile.vision)
2856 .unwrap_or(false);
2857 crate::runtime::ConsoleModelCapabilities { image_input }
2858}
2859
2860pub fn model_capabilities_for_profile(
2861 profile: &Profile,
2862) -> crate::runtime::ConsoleModelCapabilities {
2863 let image_input = Provider::infer_from_model(&profile.model)
2864 .and_then(|provider| meerkat_core::model_profile::profile_for(provider, &profile.model))
2865 .map(|profile| profile.vision)
2866 .unwrap_or(false);
2867 crate::runtime::ConsoleModelCapabilities { image_input }
2868}
2869
2870pub fn model_capabilities_for_role(
2871 definition: &MobDefinition,
2872 role: &str,
2873) -> crate::runtime::ConsoleModelCapabilities {
2874 let profile_name = ProfileName::from(role);
2875 definition
2876 .resolve_inline_profile(&profile_name)
2877 .map(model_capabilities_for_profile)
2878 .unwrap_or(crate::runtime::ConsoleModelCapabilities { image_input: false })
2879}
2880
2881pub fn model_capabilities_for_member_entry(
2882 definition: &MobDefinition,
2883 entry: &meerkat_mob::runtime::MobMemberListEntry,
2884) -> crate::runtime::ConsoleModelCapabilities {
2885 model_capabilities_for_role(definition, entry.role.as_str())
2886}
2887
2888pub async fn model_capabilities_for_member(
2889 handle: &MobHandle,
2890 session_service: Option<&Arc<dyn MobSessionService>>,
2891 member_id: &meerkat_mob::ids::MeerkatId,
2892) -> crate::runtime::ConsoleModelCapabilities {
2893 if let Some(service) = session_service
2894 && let Some(session_id) = handle.resolve_bridge_session_id(member_id).await
2895 && let Ok(view) = service.read(&session_id).await
2896 {
2897 return model_capabilities_for_model(view.state.provider, &view.state.model);
2898 }
2899
2900 handle
2901 .get_member(member_id)
2902 .await
2903 .map(|member| model_capabilities_for_role(handle.definition(), member.role.as_str()))
2904 .unwrap_or(crate::runtime::ConsoleModelCapabilities { image_input: false })
2905}
2906
2907pub async fn assert_member_accepts_images(
2908 handle: &MobHandle,
2909 session_service: Option<&Arc<dyn MobSessionService>>,
2910 member_id: &str,
2911 content: &meerkat_core::ContentInput,
2912) -> Result<(), MobRuntimeError> {
2913 if !content_input_has_images(content) {
2914 return Ok(());
2915 }
2916 let mid = meerkat_mob::ids::MeerkatId::from(member_id);
2917 let Some(member) = handle.get_member(&mid).await else {
2918 return Err(MobRuntimeError::InvalidInput("member not found"));
2919 };
2920 let caps = model_capabilities_for_member(handle, session_service, &member.agent_identity).await;
2921 if caps.image_input {
2922 Ok(())
2923 } else {
2924 Err(MobRuntimeError::InvalidInput(
2925 "target member model cannot accept image input",
2926 ))
2927 }
2928}
2929
2930pub async fn send_message_on_mob(
2940 handle: &MobHandle,
2941 member_id: &str,
2942 content: impl Into<meerkat_core::ContentInput>,
2943) -> Result<String, MobRuntimeError> {
2944 send_message_on_mob_with_mode(
2945 handle,
2946 member_id,
2947 content,
2948 meerkat_core::types::HandlingMode::Queue,
2949 )
2950 .await
2951}
2952
2953pub async fn send_message_on_mob_with_mode(
2956 handle: &MobHandle,
2957 member_id: &str,
2958 content: impl Into<meerkat_core::ContentInput>,
2959 handling_mode: meerkat_core::types::HandlingMode,
2960) -> Result<String, MobRuntimeError> {
2961 if member_id.trim().is_empty() {
2962 return Err(MobRuntimeError::InvalidInput("member_id must not be empty"));
2963 }
2964 let content = content.into();
2965 let is_empty = match &content {
2966 meerkat_core::ContentInput::Text(s) => s.trim().is_empty(),
2967 meerkat_core::ContentInput::Blocks(blocks) => blocks.is_empty(),
2968 };
2969 if is_empty {
2970 return Err(MobRuntimeError::InvalidInput("content must not be empty"));
2971 }
2972 let mid = meerkat_mob::ids::MeerkatId::from(member_id);
2973 let _receipt = handle
2974 .member(&mid)
2975 .await?
2976 .send(content, handling_mode)
2977 .await?;
2978 let session_id = handle
2979 .resolve_bridge_session_id(&mid)
2980 .await
2981 .ok_or_else(|| {
2982 MobRuntimeError::Mob(MobError::Internal(
2983 "member has no bridge session after send".to_string(),
2984 ))
2985 })?;
2986 Ok(session_id.to_string())
2987}
2988
2989#[cfg(test)]
2990#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2991mod tests {
2992 use super::*;
2993
2994 struct EmptyDispatcher;
2995
2996 #[async_trait::async_trait]
2997 impl meerkat_core::AgentToolDispatcher for EmptyDispatcher {
2998 fn tools(&self) -> Arc<[Arc<meerkat_core::types::ToolDef>]> {
2999 Vec::<Arc<meerkat_core::types::ToolDef>>::new().into()
3000 }
3001
3002 async fn dispatch(
3003 &self,
3004 call: meerkat_core::types::ToolCallView<'_>,
3005 ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
3006 Err(meerkat_core::ToolError::not_found(call.name))
3007 }
3008
3009 fn capabilities(&self) -> meerkat_core::agent::DispatcherCapabilities {
3010 meerkat_core::agent::DispatcherCapabilities::default()
3011 }
3012 }
3013
3014 fn wrapper_with_overrides(
3015 overrides: ImplicitDelegateRetirementOverrides,
3016 ) -> AutoWireParentMobToolDispatcher {
3017 AutoWireParentMobToolDispatcher {
3018 inner: Arc::new(EmptyDispatcher),
3019 implicit_delegate_retirement_overrides: overrides,
3020 }
3021 }
3022
3023 #[test]
3024 fn delegate_tool_schema_exposes_idle_retire_secs() {
3025 let tool = meerkat_core::types::ToolDef::new(
3026 "delegate",
3027 "Delegate work",
3028 serde_json::json!({
3029 "type": "object",
3030 "properties": {
3031 "task": {"type": "string"}
3032 },
3033 "required": ["task"]
3034 }),
3035 );
3036
3037 let patched = delegate_tool_def_with_idle_retire_secs(&tool);
3038 let idle_retire_secs = &patched.input_schema["properties"]["idle_retire_secs"];
3039
3040 assert!(patched.description.contains("IDLE RETIREMENT:"));
3041 assert_eq!(idle_retire_secs["anyOf"][0]["type"], "integer");
3042 assert_eq!(idle_retire_secs["anyOf"][0]["minimum"], 0);
3043 assert_eq!(idle_retire_secs["anyOf"][1]["type"], "null");
3044 }
3045
3046 #[test]
3047 fn mob_spawn_tool_schema_exposes_opt_in_idle_retire_secs() {
3048 let tool = meerkat_core::types::ToolDef::new(
3049 "mob_spawn_member",
3050 "Spawn member",
3051 serde_json::json!({
3052 "type": "object",
3053 "properties": {
3054 "profile": {"type": "string"},
3055 "member_id": {"type": "string"}
3056 },
3057 "required": ["profile", "member_id"]
3058 }),
3059 );
3060
3061 let patched = mob_spawn_tool_def_with_idle_retire_secs(&tool);
3062 let idle_retire_secs = &patched.input_schema["properties"]["idle_retire_secs"];
3063
3064 assert!(
3065 patched
3066 .description
3067 .contains("Omit idle_retire_secs to leave this spawned member out")
3068 );
3069 assert_eq!(idle_retire_secs["anyOf"][0]["type"], "integer");
3070 assert_eq!(idle_retire_secs["anyOf"][0]["minimum"], 0);
3071 assert_eq!(idle_retire_secs["anyOf"][1]["type"], "null");
3072 }
3073
3074 #[tokio::test]
3075 async fn auto_wire_wrapper_preserves_ops_lifecycle_binding() {
3076 use meerkat_core::AgentToolDispatcher;
3077 use std::sync::atomic::{AtomicBool, Ordering};
3078
3079 struct BindAwareDispatcher {
3080 bound: Arc<AtomicBool>,
3081 }
3082
3083 #[async_trait::async_trait]
3084 impl meerkat_core::AgentToolDispatcher for BindAwareDispatcher {
3085 fn tools(&self) -> Arc<[Arc<meerkat_core::types::ToolDef>]> {
3086 Vec::<Arc<meerkat_core::types::ToolDef>>::new().into()
3087 }
3088
3089 async fn dispatch(
3090 &self,
3091 call: meerkat_core::types::ToolCallView<'_>,
3092 ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
3093 Err(meerkat_core::ToolError::not_found(call.name))
3094 }
3095
3096 fn capabilities(&self) -> meerkat_core::agent::DispatcherCapabilities {
3097 meerkat_core::agent::DispatcherCapabilities {
3098 ops_lifecycle: true,
3099 }
3100 }
3101
3102 fn bind_ops_lifecycle(
3103 self: Arc<Self>,
3104 _registry: Arc<dyn meerkat_core::ops_lifecycle::OpsLifecycleRegistry>,
3105 _owner_bridge_session_id: meerkat_core::types::SessionId,
3106 ) -> Result<meerkat_core::agent::BindOutcome, meerkat_core::agent::OpsLifecycleBindError>
3107 {
3108 self.bound.store(true, Ordering::SeqCst);
3109 Ok(meerkat_core::agent::BindOutcome::Bound(self))
3110 }
3111 }
3112
3113 let bound = Arc::new(AtomicBool::new(false));
3114 let dispatcher = Arc::new(AutoWireParentMobToolDispatcher {
3115 inner: Arc::new(BindAwareDispatcher {
3116 bound: Arc::clone(&bound),
3117 }),
3118 implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides::default(),
3119 });
3120
3121 assert!(dispatcher.capabilities().ops_lifecycle);
3122 let outcome = dispatcher
3123 .bind_ops_lifecycle(
3124 Arc::new(meerkat_runtime::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
3125 meerkat_core::types::SessionId::new(),
3126 )
3127 .expect("wrapper should delegate ops lifecycle binding");
3128
3129 assert!(outcome.was_bound());
3130 assert!(bound.load(Ordering::SeqCst));
3131 assert!(outcome.into_dispatcher().capabilities().ops_lifecycle);
3132 }
3133
3134 #[test]
3135 fn delegate_idle_retire_secs_arg_is_stripped_and_parsed() {
3136 let mut args = serde_json::json!({
3137 "task": "inspect",
3138 "idle_retire_secs": 42
3139 });
3140
3141 let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
3142 .expect("valid idle retire arg");
3143
3144 assert_eq!(parsed, Some(DelegateIdleRetireOverride::Seconds(42)));
3145 assert!(args.get("idle_retire_secs").is_none());
3146 }
3147
3148 #[test]
3149 fn delegate_idle_retire_secs_null_disables_member_retirement() {
3150 let mut args = serde_json::json!({
3151 "task": "inspect",
3152 "idle_retire_secs": null
3153 });
3154
3155 let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
3156 .expect("valid idle retire arg");
3157
3158 assert_eq!(parsed, Some(DelegateIdleRetireOverride::Disabled));
3159 assert!(args.get("idle_retire_secs").is_none());
3160 }
3161
3162 #[test]
3163 fn delegate_idle_retire_secs_omitted_inherits_runtime_default() {
3164 let mut args = serde_json::json!({"task": "inspect"});
3165
3166 let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
3167 .expect("omitted idle retire arg");
3168
3169 assert_eq!(parsed, None);
3170 assert_eq!(args, serde_json::json!({"task": "inspect"}));
3171 }
3172
3173 #[test]
3174 fn delegate_idle_retire_secs_rejects_negative_or_fractional_values() {
3175 let mut negative = serde_json::json!({"task": "inspect", "idle_retire_secs": -1});
3176 let mut fractional = serde_json::json!({"task": "inspect", "idle_retire_secs": 1.5});
3177
3178 assert!(delegate_idle_retire_override_from_args("delegate", &mut negative).is_err());
3179 assert!(delegate_idle_retire_override_from_args("delegate", &mut fractional).is_err());
3180 }
3181
3182 #[test]
3183 fn mob_spawn_idle_retire_targets_use_args_when_result_omits_mob_id() {
3184 let args = serde_json::json!({
3185 "mob_id": "ob3",
3186 "profile": "review-worker",
3187 "member_id": "review-worker-vibe-forward",
3188 });
3189 let fallback_targets = idle_retire_targets_from_spawn_args(&args);
3190
3191 assert_eq!(
3192 fallback_targets,
3193 vec![IdleRetireTarget {
3194 mob_id: "ob3".to_string(),
3195 member_id: "review-worker-vibe-forward".to_string(),
3196 }]
3197 );
3198 assert_eq!(
3199 idle_retire_targets_from_outcome_text(
3200 r#"{"agent_identity":"review-worker-vibe-forward","member_ref":"opaque"}"#,
3201 &fallback_targets,
3202 ),
3203 fallback_targets
3204 );
3205 }
3206
3207 #[test]
3208 fn mob_spawn_idle_retire_targets_support_canonical_specs_shape() {
3209 let args = serde_json::json!({
3210 "mob_id": "ob3",
3211 "specs": [
3212 {"profile": "person-worker", "agent_identity": "person-worker-a"},
3213 {"profile": "person-worker", "member_id": "person-worker-b", "mob_id": "other"}
3214 ]
3215 });
3216 let fallback_targets = idle_retire_targets_from_spawn_args(&args);
3217
3218 assert_eq!(
3219 fallback_targets,
3220 vec![
3221 IdleRetireTarget {
3222 mob_id: "ob3".to_string(),
3223 member_id: "person-worker-a".to_string(),
3224 },
3225 IdleRetireTarget {
3226 mob_id: "other".to_string(),
3227 member_id: "person-worker-b".to_string(),
3228 },
3229 ]
3230 );
3231 assert_eq!(
3232 idle_retire_targets_from_outcome_text(
3233 r#"{"members":[{"agent_identity":"person-worker-a"},{"agent_identity":"person-worker-b","mob_id":"other"}]}"#,
3234 &fallback_targets,
3235 ),
3236 fallback_targets
3237 );
3238 }
3239
3240 #[tokio::test]
3241 async fn implicit_delegate_retirement_overrides_round_trip_per_member() {
3242 let overrides = ImplicitDelegateRetirementOverrides::default();
3243
3244 overrides
3245 .set("mob-a", "worker-1", DelegateIdleRetireOverride::Seconds(12))
3246 .await;
3247 overrides
3248 .set("mob-a", "worker-2", DelegateIdleRetireOverride::Disabled)
3249 .await;
3250
3251 assert_eq!(
3252 overrides.get("mob-a", "worker-1").await,
3253 Some(DelegateIdleRetireOverride::Seconds(12))
3254 );
3255 assert_eq!(
3256 overrides.get("mob-a", "worker-2").await,
3257 Some(DelegateIdleRetireOverride::Disabled)
3258 );
3259 assert_eq!(overrides.get("mob-a", "worker-3").await, None);
3260 }
3261
3262 #[tokio::test]
3263 async fn mob_spawn_idle_retire_registration_uses_spawn_args_when_result_omits_mob_id() {
3264 let overrides = ImplicitDelegateRetirementOverrides::default();
3265 let dispatcher = wrapper_with_overrides(overrides.clone());
3266 let fallback_targets = idle_retire_targets_from_spawn_args(&serde_json::json!({
3267 "mob_id": "ob3",
3268 "member_id": "review-worker-vibe-forward",
3269 }));
3270 let outcome =
3271 meerkat_core::ToolDispatchOutcome::sync_result(meerkat_core::types::ToolResult::new(
3272 "spawn-1".to_string(),
3273 r#"{"agent_identity":"review-worker-vibe-forward","member_ref":"opaque"}"#
3274 .to_string(),
3275 false,
3276 ));
3277
3278 dispatcher
3279 .register_idle_retire_override_from_outcome(
3280 &outcome,
3281 Some(DelegateIdleRetireOverride::Seconds(900)),
3282 &fallback_targets,
3283 )
3284 .await;
3285
3286 assert_eq!(
3287 overrides.get("ob3", "review-worker-vibe-forward").await,
3288 Some(DelegateIdleRetireOverride::Seconds(900))
3289 );
3290 }
3291
3292 #[test]
3293 fn image_generation_substrate_defaults_off_for_inline_profiles() {
3294 let definition = meerkat_mob::MobDefinition::from_toml(
3295 r#"
3296[mob]
3297id = "test"
3298
3299[profiles.worker]
3300model = "gpt-5.5"
3301
3302[profiles.worker.tools]
3303builtins = true
3304"#,
3305 )
3306 .unwrap_or_else(|e| panic!("{e}"));
3307
3308 assert!(
3309 !mob_definition_may_use_image_generation(&definition),
3310 "inline profiles should not wire the image substrate unless a profile opts in"
3311 );
3312 }
3313
3314 #[test]
3315 fn image_generation_substrate_follows_profile_tool_config() {
3316 let definition = meerkat_mob::MobDefinition::from_toml(
3317 r#"
3318[mob]
3319id = "test"
3320
3321[profiles.commander]
3322model = "gpt-5.5"
3323
3324[profiles.commander.tools]
3325builtins = true
3326image_generation = true
3327
3328[profiles.investigator]
3329model = "gpt-5.5"
3330
3331[profiles.investigator.tools]
3332builtins = true
3333image_generation = false
3334"#,
3335 )
3336 .unwrap_or_else(|e| panic!("{e}"));
3337
3338 let commander = definition.profiles["commander"].as_inline().unwrap();
3339 let investigator = definition.profiles["investigator"].as_inline().unwrap();
3340 assert!(commander.tools.image_generation);
3341 assert!(!investigator.tools.image_generation);
3342 assert!(
3343 mob_definition_may_use_image_generation(&definition),
3344 "one opt-in profile is enough to wire substrate; Meerkat gates visibility per profile"
3345 );
3346 }
3347
3348 #[test]
3349 fn image_generation_profiles_can_disable_builtins_with_meerkat_062() {
3350 let definition = meerkat_mob::MobDefinition::from_toml(
3351 r#"
3352[mob]
3353id = "test"
3354
3355[profiles.commander]
3356model = "gpt-5.5"
3357
3358[profiles.commander.tools]
3359builtins = false
3360image_generation = true
3361"#,
3362 )
3363 .unwrap_or_else(|e| panic!("{e}"));
3364
3365 let commander = definition.profiles["commander"].as_inline().unwrap();
3366 assert!(!commander.tools.builtins);
3367 assert!(commander.tools.image_generation);
3368 assert!(
3369 mob_definition_may_use_image_generation(&definition),
3370 "image generation now has its own Meerkat tool gate"
3371 );
3372 }
3373
3374 #[test]
3375 fn image_generation_substrate_is_conservative_for_realm_profile_refs() {
3376 let definition = meerkat_mob::MobDefinition::from_toml(
3377 r#"
3378[mob]
3379id = "test"
3380
3381[profiles.worker]
3382realm_profile = "worker-v2"
3383"#,
3384 )
3385 .unwrap_or_else(|e| panic!("{e}"));
3386
3387 assert!(
3388 mob_definition_may_use_image_generation(&definition),
3389 "realm profiles resolve at spawn time, so MobKit wires substrate and lets Meerkat enforce profile policy"
3390 );
3391 }
3392
3393 #[test]
3394 fn sanitize_llm_request_drops_replay_unsafe_server_tool_blocks() {
3395 let request = meerkat_client::LlmRequest::new(
3396 "gpt-5.5",
3397 vec![meerkat_core::Message::BlockAssistant(
3398 meerkat_core::BlockAssistantMessage::new(
3399 vec![
3400 meerkat_core::AssistantBlock::Text {
3401 text: "done".to_string(),
3402 meta: None,
3403 },
3404 meerkat_core::AssistantBlock::ServerToolContent {
3405 id: Some("ws-stream".to_string()),
3406 name: "web_search".to_string(),
3407 content: serde_json::json!({
3408 "type": "response.web_search_call.searching",
3409 "item_id": "ws_123"
3410 }),
3411 meta: None,
3412 },
3413 meerkat_core::AssistantBlock::ServerToolContent {
3414 id: Some("ws_123".to_string()),
3415 name: "web_search_call".to_string(),
3416 content: serde_json::json!({
3417 "type": "web_search_call",
3418 "id": "ws_123",
3419 "status": "completed"
3420 }),
3421 meta: None,
3422 },
3423 meerkat_core::AssistantBlock::ServerToolContent {
3424 id: None,
3425 name: "web_search_annotations".to_string(),
3426 content: serde_json::json!({
3427 "type": "message_annotations",
3428 "annotations": []
3429 }),
3430 meta: None,
3431 },
3432 ],
3433 meerkat_core::StopReason::EndTurn,
3434 ),
3435 )],
3436 );
3437
3438 let sanitized = sanitize_llm_request_for_stateless_replay(&request);
3439 let meerkat_core::Message::BlockAssistant(assistant) = &sanitized.messages[0] else {
3440 panic!("expected block assistant");
3441 };
3442
3443 assert_eq!(assistant.blocks.len(), 2);
3444 assert!(matches!(
3445 assistant.blocks[0],
3446 meerkat_core::AssistantBlock::Text { .. }
3447 ));
3448 assert!(matches!(
3449 assistant.blocks[1],
3450 meerkat_core::AssistantBlock::ServerToolContent { ref name, .. }
3451 if name == "web_search_call"
3452 ));
3453 }
3454
3455 #[test]
3456 fn sanitize_llm_request_preserves_generated_images_for_meerkat_062() {
3457 let request = meerkat_client::LlmRequest::new(
3458 "gpt-5.5",
3459 vec![meerkat_core::Message::BlockAssistant(
3460 meerkat_core::BlockAssistantMessage::new(
3461 vec![
3462 meerkat_core::AssistantBlock::Text {
3463 text: "visible".to_string(),
3464 meta: None,
3465 },
3466 generated_image_block_for_test(),
3467 ],
3468 meerkat_core::StopReason::EndTurn,
3469 ),
3470 )],
3471 );
3472
3473 let sanitized = sanitize_llm_request_for_stateless_replay(&request);
3474
3475 let meerkat_core::Message::BlockAssistant(original_assistant) = &request.messages[0] else {
3476 panic!("expected original block assistant");
3477 };
3478 assert!(
3479 original_assistant
3480 .blocks
3481 .iter()
3482 .any(|block| matches!(block, meerkat_core::AssistantBlock::Image { .. })),
3483 "request-view sanitization must not rewrite canonical caller-owned messages"
3484 );
3485
3486 let meerkat_core::Message::BlockAssistant(sanitized_assistant) = &sanitized.messages[0]
3487 else {
3488 panic!("expected sanitized block assistant");
3489 };
3490 assert!(
3491 sanitized_assistant
3492 .blocks
3493 .iter()
3494 .any(|block| matches!(block, meerkat_core::AssistantBlock::Image { .. })),
3495 "Meerkat 0.6.2 owns provider replay projection for generated images"
3496 );
3497 }
3498
3499 #[derive(Default)]
3500 struct CapturingLlmClient {
3501 projected_messages: std::sync::Mutex<Vec<meerkat_core::Message>>,
3502 }
3503
3504 #[async_trait]
3505 impl LlmClient for CapturingLlmClient {
3506 fn project_replay_messages(
3507 &self,
3508 messages: &[meerkat_core::Message],
3509 ) -> Result<Vec<meerkat_core::Message>, meerkat_client::LlmError> {
3510 *self
3511 .projected_messages
3512 .lock()
3513 .unwrap_or_else(std::sync::PoisonError::into_inner) = messages.to_vec();
3514 Ok(messages.to_vec())
3515 }
3516
3517 fn stream<'a>(&'a self, _request: &'a LlmRequest) -> LlmStream<'a> {
3518 Box::pin(futures::stream::iter([Ok(
3519 meerkat_client::LlmEvent::Done {
3520 outcome: meerkat_client::LlmDoneOutcome::Success {
3521 stop_reason: meerkat_core::StopReason::EndTurn,
3522 },
3523 },
3524 )]))
3525 }
3526
3527 fn provider(&self) -> &'static str {
3528 "openai"
3529 }
3530
3531 async fn health_check(&self) -> Result<(), meerkat_client::LlmError> {
3532 Ok(())
3533 }
3534 }
3535
3536 #[test]
3537 fn replay_sanitizing_llm_client_delegates_provider_projection() {
3538 let capture = Arc::new(CapturingLlmClient::default());
3539 let inner: Arc<dyn LlmClient> = capture.clone();
3540 let wrapped = ReplaySanitizingLlmClient::new(inner);
3541 let messages = vec![meerkat_core::Message::BlockAssistant(
3542 meerkat_core::BlockAssistantMessage::new(
3543 vec![
3544 meerkat_core::AssistantBlock::Text {
3545 text: "visible".to_string(),
3546 meta: None,
3547 },
3548 meerkat_core::AssistantBlock::ServerToolContent {
3549 id: Some("ws-stream".to_string()),
3550 name: "web_search".to_string(),
3551 content: serde_json::json!({
3552 "type": "response.web_search_call.searching",
3553 "item_id": "ws_123"
3554 }),
3555 meta: None,
3556 },
3557 ],
3558 meerkat_core::StopReason::EndTurn,
3559 ),
3560 )];
3561
3562 let projected = wrapped
3563 .project_replay_messages(&messages)
3564 .expect("wrapped client should delegate provider projection");
3565
3566 let seen = capture
3567 .projected_messages
3568 .lock()
3569 .unwrap_or_else(std::sync::PoisonError::into_inner)
3570 .clone();
3571 let meerkat_core::Message::BlockAssistant(assistant) = &seen[0] else {
3572 panic!("expected block assistant");
3573 };
3574 assert_eq!(
3575 assistant.blocks.len(),
3576 1,
3577 "MobKit sanitization must happen before Meerkat provider projection"
3578 );
3579 assert!(matches!(
3580 assistant.blocks[0],
3581 meerkat_core::AssistantBlock::Text { .. }
3582 ));
3583 assert_eq!(
3584 serde_json::to_value(&projected).expect("projected messages serialize"),
3585 serde_json::to_value(&seen).expect("seen messages serialize")
3586 );
3587 }
3588
3589 #[derive(Default)]
3590 struct CapturingAgentLlmClient {
3591 seen_messages: std::sync::Mutex<Vec<meerkat_core::Message>>,
3592 }
3593
3594 #[async_trait]
3595 impl meerkat_core::AgentLlmClient for CapturingAgentLlmClient {
3596 async fn stream_response(
3597 &self,
3598 messages: &[meerkat_core::Message],
3599 _tools: &[Arc<meerkat_core::ToolDef>],
3600 _max_tokens: u32,
3601 _temperature: Option<f32>,
3602 _provider_params: Option<
3603 &meerkat_core::lifecycle::run_primitive::ProviderParamsOverride,
3604 >,
3605 ) -> Result<meerkat_core::agent::LlmStreamResult, meerkat_core::AgentError> {
3606 *self
3607 .seen_messages
3608 .lock()
3609 .unwrap_or_else(std::sync::PoisonError::into_inner) = messages.to_vec();
3610 Ok(meerkat_core::agent::LlmStreamResult::new(
3611 Vec::new(),
3612 meerkat_core::StopReason::EndTurn,
3613 meerkat_core::Usage::default(),
3614 ))
3615 }
3616
3617 fn provider(&self) -> &'static str {
3618 "openai"
3619 }
3620
3621 fn model(&self) -> &'static str {
3622 "gpt-5.5"
3623 }
3624 }
3625
3626 #[tokio::test]
3627 async fn sanitize_agent_llm_client_drops_replay_unsafe_server_tool_blocks() {
3628 let capture = Arc::new(CapturingAgentLlmClient::default());
3629 let inner: Arc<dyn meerkat_core::AgentLlmClient> = capture.clone();
3630 let wrapped = ReplaySanitizingAgentLlmClient::wrap(inner);
3631 let messages = vec![meerkat_core::Message::BlockAssistant(
3632 meerkat_core::BlockAssistantMessage::new(
3633 vec![
3634 meerkat_core::AssistantBlock::Text {
3635 text: "visible".to_string(),
3636 meta: None,
3637 },
3638 meerkat_core::AssistantBlock::ServerToolContent {
3639 id: Some("ws-stream".to_string()),
3640 name: "web_search".to_string(),
3641 content: serde_json::json!({
3642 "type": "response.web_search_call.searching",
3643 "item_id": "ws_123"
3644 }),
3645 meta: None,
3646 },
3647 meerkat_core::AssistantBlock::ServerToolContent {
3648 id: Some("ok".to_string()),
3649 name: "web_search_call".to_string(),
3650 content: serde_json::json!({
3651 "type": "web_search_call",
3652 "id": "ws_123",
3653 "status": "completed"
3654 }),
3655 meta: None,
3656 },
3657 ],
3658 meerkat_core::StopReason::EndTurn,
3659 ),
3660 )];
3661 let tools: Vec<Arc<meerkat_core::ToolDef>> = Vec::new();
3662
3663 wrapped
3664 .stream_response(&messages, &tools, 512, None, None)
3665 .await
3666 .expect("wrapped client should delegate");
3667
3668 let seen = capture
3669 .seen_messages
3670 .lock()
3671 .unwrap_or_else(std::sync::PoisonError::into_inner)
3672 .clone();
3673 let meerkat_core::Message::BlockAssistant(assistant) = &seen[0] else {
3674 panic!("expected block assistant");
3675 };
3676 assert_eq!(assistant.blocks.len(), 2);
3677 assert!(matches!(
3678 assistant.blocks[0],
3679 meerkat_core::AssistantBlock::Text { .. }
3680 ));
3681 assert!(matches!(
3682 assistant.blocks[1],
3683 meerkat_core::AssistantBlock::ServerToolContent { ref name, .. }
3684 if name == "web_search_call"
3685 ));
3686 }
3687
3688 fn generated_image_block_for_test() -> meerkat_core::AssistantBlock {
3689 serde_json::from_value(serde_json::json!({
3690 "block_type": "image",
3691 "data": {
3692 "image_id": "00000000-0000-0000-0000-000000000051",
3693 "blob_ref": {
3694 "blob_id": "sha256:test-generated-image",
3695 "media_type": "image/png"
3696 },
3697 "media_type": "image/png",
3698 "width": 1024,
3699 "height": 1024,
3700 "revised_prompt": { "disposition": "not_requested" },
3701 "meta": { "provider": "not_emitted" }
3702 }
3703 }))
3704 .expect("test image block should deserialize")
3705 }
3706
3707 #[test]
3708 fn sanitize_message_preserves_assistant_image_blocks() {
3709 let message =
3710 meerkat_core::Message::BlockAssistant(meerkat_core::BlockAssistantMessage::new(
3711 vec![
3712 meerkat_core::AssistantBlock::Text {
3713 text: "Here is the image.".to_string(),
3714 meta: None,
3715 },
3716 generated_image_block_for_test(),
3717 ],
3718 meerkat_core::StopReason::EndTurn,
3719 ));
3720
3721 let sanitized = sanitize_message_for_stateless_replay(message);
3722 let meerkat_core::Message::BlockAssistant(assistant) = sanitized else {
3723 panic!("expected block assistant");
3724 };
3725
3726 assert_eq!(assistant.blocks.len(), 2);
3727 assert!(matches!(
3728 assistant.blocks[0],
3729 meerkat_core::AssistantBlock::Text { .. }
3730 ));
3731 assert!(
3732 matches!(
3733 assistant.blocks[1],
3734 meerkat_core::AssistantBlock::Image { .. }
3735 ),
3736 "generated image blocks should reach Meerkat's provider projection"
3737 );
3738 }
3739
3740 #[test]
3743 fn persistent_with_hook_wraps_session_service() {
3744 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3745 let store_path = dir.path().to_path_buf();
3746 let Ok(sqlite) = meerkat_store::SqliteSessionStore::open(store_path.join("sessions.db"))
3747 else {
3748 panic!("failed to open sqlite session store");
3749 };
3750 let session_store: Arc<dyn SessionStore> = Arc::new(sqlite);
3751 let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
3752 panic!("failed to parse minimal mob definition");
3753 };
3754
3755 let hook_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
3756 let hook_called_clone = hook_called.clone();
3757
3758 let spec = MobBootstrapSpec::persistent_with_hook(
3759 definition,
3760 meerkat_mob::MobStorage::in_memory(),
3761 store_path.clone(),
3762 4,
3763 session_store,
3764 move |_req: &mut CreateSessionRequest| {
3765 hook_called_clone.store(true, std::sync::atomic::Ordering::Relaxed);
3766 Box::pin(async { Ok(()) })
3767 },
3768 );
3769
3770 assert!(
3778 spec.runtime_adapter.is_some(),
3779 "persistent_with_hook must provide a runtime adapter via spec.runtime_adapter"
3780 );
3781 assert!(
3782 spec.session_service.runtime_adapter().is_some(),
3783 "session service must own a runtime_store so archive/retire don't \
3784 hit the store-only-projection rejection in meerkat-session"
3785 );
3786 assert!(
3787 store_path.join("runtime.sqlite").exists(),
3788 "persistent_inner must open a SqliteRuntimeStore at <store_path>/runtime.sqlite"
3789 );
3790
3791 assert!(
3796 !hook_called.load(std::sync::atomic::Ordering::Relaxed),
3797 "hook must not be called before create_session"
3798 );
3799 }
3800
3801 #[test]
3803 fn ephemeral_with_hook_creates_spec() {
3804 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3805 let store_path = dir.path().to_path_buf();
3806 let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
3807 panic!("failed to parse minimal mob definition");
3808 };
3809
3810 let hook_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
3811 let hook_called_clone = hook_called.clone();
3812
3813 let spec = MobBootstrapSpec::ephemeral_with_hook(
3814 definition,
3815 meerkat_mob::MobStorage::in_memory(),
3816 store_path,
3817 4,
3818 None,
3819 move |_req: &mut CreateSessionRequest| {
3820 hook_called_clone.store(true, std::sync::atomic::Ordering::Relaxed);
3821 Box::pin(async { Ok(()) })
3822 },
3823 );
3824
3825 assert!(spec.runtime_adapter.is_none());
3827
3828 assert!(
3830 !hook_called.load(std::sync::atomic::Ordering::Relaxed),
3831 "hook must not be called before create_session"
3832 );
3833 }
3834
3835 #[tokio::test]
3839 async fn pre_build_hook_mutates_create_session_request() {
3840 use std::sync::Mutex;
3841
3842 let captured = Arc::new(Mutex::new(None::<(String, Option<String>)>));
3843 let captured_clone = captured.clone();
3844
3845 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3847 let factory = AgentFactory::new(dir.path()).builtins(true);
3848 let config = Config::default();
3849 let builder = FactoryAgentBuilder::new(factory, config);
3850 let inner: Arc<dyn MobSessionService> =
3851 Arc::new(meerkat_session::EphemeralSessionService::new(builder, 4));
3852
3853 let hook: PreBuildHook = Arc::new(move |req: &mut CreateSessionRequest| {
3855 req.model = "hooked-model".to_string();
3856 req.system_prompt = Some("injected-prompt".to_string());
3857 let labels = req.labels.get_or_insert_with(Default::default);
3858 labels.insert("hook_label".to_string(), "hook_value".to_string());
3859 let mut lock = captured_clone
3861 .lock()
3862 .unwrap_or_else(std::sync::PoisonError::into_inner);
3863 *lock = Some((req.model.clone(), req.system_prompt.clone()));
3864 Box::pin(async { Ok(()) })
3865 });
3866 let wrapped = PreBuildMobSessionService {
3867 inner,
3868 hook,
3869 after_create_hook: None,
3870 runtime_adapter_override: None,
3871 };
3872
3873 let req = CreateSessionRequest {
3874 model: "original-model".to_string(),
3875 prompt: meerkat_core::ContentInput::Text("test".to_string()),
3876 render_metadata: None,
3877 system_prompt: None,
3878 max_tokens: None,
3879 event_tx: None,
3880 skill_references: None,
3881 initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
3882 build: None,
3883 labels: None,
3884 deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
3885 };
3886
3887 let _ = meerkat_core::service::SessionService::create_session(&wrapped, req).await;
3889
3890 let (model, prompt) = captured
3891 .lock()
3892 .unwrap_or_else(std::sync::PoisonError::into_inner)
3893 .clone()
3894 .expect("hook must have been called");
3895 assert_eq!(model, "hooked-model", "hook must mutate the model");
3896 assert_eq!(
3897 prompt.as_deref(),
3898 Some("injected-prompt"),
3899 "hook must set the system prompt"
3900 );
3901 }
3902
3903 #[derive(Default)]
3904 struct ForwardingProbe {
3905 calls: Mutex<Vec<&'static str>>,
3906 }
3907
3908 impl ForwardingProbe {
3909 fn record(&self, call: &'static str) {
3910 self.calls
3911 .lock()
3912 .unwrap_or_else(std::sync::PoisonError::into_inner)
3913 .push(call);
3914 }
3915
3916 fn calls(&self) -> Vec<&'static str> {
3917 self.calls
3918 .lock()
3919 .unwrap_or_else(std::sync::PoisonError::into_inner)
3920 .clone()
3921 }
3922 }
3923
3924 #[async_trait]
3925 impl meerkat_core::service::SessionService for ForwardingProbe {
3926 async fn create_session(
3927 &self,
3928 _req: CreateSessionRequest,
3929 ) -> Result<meerkat_core::types::RunResult, SessionError> {
3930 Err(SessionError::Unsupported("create_session".to_string()))
3931 }
3932
3933 async fn start_turn(
3934 &self,
3935 _id: &meerkat_core::types::SessionId,
3936 _req: meerkat_core::service::StartTurnRequest,
3937 ) -> Result<meerkat_core::types::RunResult, SessionError> {
3938 Err(SessionError::Unsupported("start_turn".to_string()))
3939 }
3940
3941 async fn interrupt(
3942 &self,
3943 _id: &meerkat_core::types::SessionId,
3944 ) -> Result<(), SessionError> {
3945 self.record("interrupt");
3946 Ok(())
3947 }
3948
3949 async fn read(
3950 &self,
3951 id: &meerkat_core::types::SessionId,
3952 ) -> Result<meerkat_core::service::SessionView, SessionError> {
3953 Err(SessionError::NotFound { id: id.clone() })
3954 }
3955
3956 async fn list(
3957 &self,
3958 _query: meerkat_core::service::SessionQuery,
3959 ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
3960 Ok(Vec::new())
3961 }
3962
3963 async fn archive(&self, _id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
3964 self.record("archive");
3965 Ok(())
3966 }
3967 }
3968
3969 #[async_trait]
3970 impl meerkat_core::service::SessionServiceCommsExt for ForwardingProbe {}
3971
3972 #[async_trait]
3973 impl meerkat_core::service::SessionServiceControlExt for ForwardingProbe {
3974 async fn append_system_context(
3975 &self,
3976 _id: &meerkat_core::types::SessionId,
3977 _req: meerkat_core::service::AppendSystemContextRequest,
3978 ) -> Result<
3979 meerkat_core::service::AppendSystemContextResult,
3980 meerkat_core::service::SessionControlError,
3981 > {
3982 self.record("append_system_context");
3983 Ok(meerkat_core::service::AppendSystemContextResult {
3984 status: meerkat_core::service::AppendSystemContextStatus::Applied,
3985 })
3986 }
3987
3988 async fn stage_tool_results(
3989 &self,
3990 _id: &meerkat_core::types::SessionId,
3991 _req: meerkat_core::service::StageToolResultsRequest,
3992 ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
3993 self.record("stage_tool_results");
3994 Ok(meerkat_core::service::StageToolResultsResult {
3995 accepted_result_count: 7,
3996 })
3997 }
3998 }
3999
4000 #[async_trait]
4001 impl meerkat_core::service::SessionServiceHistoryExt for ForwardingProbe {
4002 async fn read_history(
4003 &self,
4004 id: &meerkat_core::types::SessionId,
4005 _query: meerkat_core::service::SessionHistoryQuery,
4006 ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
4007 Err(SessionError::NotFound { id: id.clone() })
4008 }
4009 }
4010
4011 #[async_trait]
4012 impl MobSessionService for ForwardingProbe {
4013 fn supports_persistent_sessions(&self) -> bool {
4014 true
4015 }
4016
4017 fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
4018 Some(Arc::new(meerkat_runtime::MeerkatMachine::ephemeral()))
4019 }
4020
4021 async fn archive_with_mob_lifecycle_authority(
4022 &self,
4023 _session_id: &meerkat_core::types::SessionId,
4024 ) -> Result<(), SessionError> {
4025 self.record("archive_with_mob_lifecycle_authority");
4026 Ok(())
4027 }
4028
4029 async fn stage_runtime_system_context_for_active_turn(
4030 &self,
4031 _session_id: &meerkat_core::types::SessionId,
4032 _expected_run_id: &meerkat_core::lifecycle::RunId,
4033 _appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
4034 ) -> Result<Option<Vec<u8>>, SessionError> {
4035 self.record("stage_runtime_system_context_for_active_turn");
4036 Ok(Some(b"snapshot".to_vec()))
4037 }
4038
4039 async fn discard_runtime_system_context_for_active_turn(
4040 &self,
4041 _session_id: &meerkat_core::types::SessionId,
4042 _expected_run_id: &meerkat_core::lifecycle::RunId,
4043 _idempotency_keys: Vec<String>,
4044 ) -> Result<(), SessionError> {
4045 self.record("discard_runtime_system_context_for_active_turn");
4046 Ok(())
4047 }
4048
4049 async fn active_turn_system_context_boundary_available(
4050 &self,
4051 _session_id: &meerkat_core::types::SessionId,
4052 ) -> Result<Option<bool>, SessionError> {
4053 self.record("active_turn_system_context_boundary_available");
4054 Ok(Some(true))
4055 }
4056 }
4057
4058 #[tokio::test]
4059 async fn pre_build_wrapper_forwards_mob_authority_and_control_extensions() {
4060 let probe = Arc::new(ForwardingProbe::default());
4061 let inner: Arc<dyn MobSessionService> = probe.clone();
4062 let wrapped = PreBuildMobSessionService {
4063 inner,
4064 hook: no_op_pre_build_hook(),
4065 after_create_hook: None,
4066 runtime_adapter_override: Some(Arc::new(meerkat_runtime::MeerkatMachine::ephemeral())),
4067 };
4068 let session_id = meerkat_core::types::SessionId::new();
4069
4070 MobSessionService::archive_with_mob_lifecycle_authority(&wrapped, &session_id)
4071 .await
4072 .expect("archive_with_mob_lifecycle_authority should forward to inner service");
4073 let staged = meerkat_core::service::SessionServiceControlExt::stage_tool_results(
4074 &wrapped,
4075 &session_id,
4076 meerkat_core::service::StageToolResultsRequest {
4077 results: Vec::new(),
4078 },
4079 )
4080 .await
4081 .expect("stage_tool_results should forward to inner service");
4082
4083 assert_eq!(staged.accepted_result_count, 7);
4084 let boundary_available = wrapped
4085 .active_turn_system_context_boundary_available(&session_id)
4086 .await
4087 .expect("active-turn boundary probe should forward");
4088 assert_eq!(boundary_available, Some(true));
4089 let snapshot = wrapped
4090 .stage_runtime_system_context_for_active_turn(
4091 &session_id,
4092 &meerkat_core::lifecycle::RunId::new(),
4093 vec![meerkat_core::session::PendingSystemContextAppend {
4094 text: "steer".to_string(),
4095 source: Some("test".to_string()),
4096 idempotency_key: Some("test".to_string()),
4097 accepted_at: meerkat_core::time_compat::SystemTime::now(),
4098 }],
4099 )
4100 .await
4101 .expect("active-turn staging should forward");
4102 assert_eq!(snapshot.as_deref(), Some(&b"snapshot"[..]));
4103 wrapped
4104 .discard_runtime_system_context_for_active_turn(
4105 &session_id,
4106 &meerkat_core::lifecycle::RunId::new(),
4107 vec!["test".to_string()],
4108 )
4109 .await
4110 .expect("active-turn rollback should forward");
4111 assert_eq!(
4112 probe.calls(),
4113 vec![
4114 "archive_with_mob_lifecycle_authority",
4115 "stage_tool_results",
4116 "active_turn_system_context_boundary_available",
4117 "stage_runtime_system_context_for_active_turn",
4118 "discard_runtime_system_context_for_active_turn",
4119 ]
4120 );
4121 }
4122
4123 #[test]
4143 fn persistent_bootstrap_uses_sqlite_runtime_store() {
4144 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4145 let store_path = dir.path().to_path_buf();
4146 let Ok(sqlite) = meerkat_store::SqliteSessionStore::open(store_path.join("sessions.db"))
4147 else {
4148 panic!("failed to open sqlite session store");
4149 };
4150 let session_store: Arc<dyn SessionStore> = Arc::new(sqlite);
4151 let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
4152 panic!("failed to parse minimal mob definition");
4153 };
4154 let spec = MobBootstrapSpec::persistent(
4155 definition,
4156 meerkat_mob::MobStorage::in_memory(),
4157 store_path.clone(),
4158 4,
4159 session_store,
4160 );
4161 assert!(
4162 spec.runtime_adapter.is_some(),
4163 "persistent bootstrap must provide its own runtime adapter via spec.runtime_adapter"
4164 );
4165 assert!(
4166 spec.session_service.runtime_adapter().is_some(),
4167 "session service must own a runtime_store so archive/retire don't \
4168 hit the store-only-projection rejection"
4169 );
4170 assert!(
4171 store_path.join("runtime.sqlite").exists(),
4172 "persistent_inner must open a SqliteRuntimeStore at <store_path>/runtime.sqlite"
4173 );
4174 }
4175
4176 #[test]
4180 fn ephemeral_runtime_backed_uses_session_service_runtime_adapter() {
4181 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4182 let store_path = dir.path().to_path_buf();
4183 let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
4184 panic!("failed to parse minimal mob definition");
4185 };
4186 let spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4187 definition,
4188 meerkat_mob::MobStorage::in_memory(),
4189 store_path,
4190 4,
4191 None,
4192 None,
4193 None,
4194 CapabilityFlags::default(),
4195 None,
4196 None,
4197 );
4198 assert!(
4199 spec.runtime_adapter.is_some(),
4200 "ephemeral_runtime_backed_inner must expose the shared runtime authority"
4201 );
4202 assert!(
4203 spec.session_service.runtime_adapter().is_some(),
4204 "session service must still expose a runtime adapter so autonomous-host comms can wire"
4205 );
4206 }
4207
4208 #[tokio::test]
4209 async fn agent_mob_tools_expose_definition_profiles_as_realm_profiles() {
4210 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4211 let store_path = dir.path().to_path_buf();
4212 let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
4213 "[mob]\nid = \"test\"\n\n[profiles.investigation-worker]\nmodel = \"gpt-5.5\"\n[profiles.investigation-worker.tools]\ncomms = true\nmob = true\n\n[profiles.person-worker]\nmodel = \"gpt-5.5\"\n[profiles.person-worker.tools]\ncomms = true\n",
4214 ) else {
4215 panic!("failed to parse mob definition with worker profiles");
4216 };
4217
4218 let spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4219 definition,
4220 meerkat_mob::MobStorage::in_memory(),
4221 store_path,
4222 4,
4223 None,
4224 None,
4225 None,
4226 CapabilityFlags::default(),
4227 None,
4228 None,
4229 );
4230 let state = spec
4231 .agent_mob_mcp_state
4232 .expect("agent mob MCP state should be installed");
4233
4234 let profiles = state
4235 .realm_profile_list()
4236 .await
4237 .expect("definition profiles should list through agent mob tools");
4238 let names = profiles
4239 .iter()
4240 .map(|profile| profile.name.as_str())
4241 .collect::<Vec<_>>();
4242 assert!(
4243 names.contains(&"investigation-worker"),
4244 "definition profiles must be visible to mob_profile_list so agents can create mobs that reference them"
4245 );
4246 assert!(names.contains(&"person-worker"));
4247
4248 let worker = state
4249 .realm_profile_get("investigation-worker")
4250 .await
4251 .expect("definition profile lookup should succeed")
4252 .expect("definition profile should exist");
4253 assert_eq!(worker.profile.model, "gpt-5.5");
4254 assert_eq!(
4255 worker.revision, 0,
4256 "definition-backed profiles are immutable runtime seeds, not persisted realm revisions"
4257 );
4258 }
4259
4260 #[tokio::test]
4261 async fn agent_created_mobs_can_spawn_definition_seeded_realm_profiles() {
4262 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4263 let store_path = dir.path().to_path_buf();
4264 let Ok(parent_definition) = meerkat_mob::MobDefinition::from_toml(
4265 "[mob]\nid = \"parent\"\n\n[profiles.investigation-worker]\nmodel = \"gpt-5.5\"\n[profiles.investigation-worker.tools]\ncomms = true\nmob = true\n",
4266 ) else {
4267 panic!("failed to parse parent mob definition");
4268 };
4269
4270 let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4271 parent_definition,
4272 meerkat_mob::MobStorage::in_memory(),
4273 store_path,
4274 4,
4275 None,
4276 None,
4277 None,
4278 CapabilityFlags::default(),
4279 None,
4280 None,
4281 );
4282 spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
4283 let runtime = MobRuntime::bootstrap(spec)
4284 .await
4285 .unwrap_or_else(|e| panic!("{e}"));
4286 let state = runtime
4287 .agent_mob_mcp_state
4288 .clone()
4289 .expect("agent mob MCP state should be installed");
4290 let Ok(child_definition) = meerkat_mob::MobDefinition::from_toml(
4291 "[mob]\nid = \"child\"\n\n[profiles.investigation-worker]\nrealm_profile = \"investigation-worker\"\n",
4292 ) else {
4293 panic!("failed to parse child mob definition");
4294 };
4295
4296 let mob_id = state
4297 .mob_create_definition(child_definition)
4298 .await
4299 .expect("child mob should be created");
4300 state
4301 .mob_spawn_spec(
4302 &mob_id,
4303 SpawnMemberSpec::new(
4304 ProfileName::from("investigation-worker"),
4305 meerkat_mob::AgentIdentity::from("investigation-worker:one"),
4306 ),
4307 )
4308 .await
4309 .expect("created mob should resolve definition-seeded realm profile at spawn time");
4310 }
4311
4312 #[tokio::test]
4313 async fn ephemeral_runtime_backed_custom_session_store_persists_created_session() {
4314 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4315 let store_path = dir.path().to_path_buf();
4316 let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
4317 "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\n[profiles.worker.tools]\ncomms = true\n",
4318 ) else {
4319 panic!("failed to parse minimal mob definition");
4320 };
4321 let custom_store: Arc<dyn SessionStore> = Arc::new(meerkat_store::MemoryStore::new());
4322 let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4323 definition,
4324 meerkat_mob::MobStorage::in_memory(),
4325 store_path,
4326 4,
4327 Some(custom_store.clone()),
4328 None,
4329 None,
4330 CapabilityFlags::default(),
4331 None,
4332 None,
4333 );
4334 spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
4335
4336 let runtime = MobRuntime::bootstrap(spec)
4337 .await
4338 .unwrap_or_else(|e| panic!("{e}"));
4339 let mid = meerkat_mob::ids::MeerkatId::from("worker:one");
4340 runtime
4341 .handle
4342 .spawn_spec(SpawnMemberSpec::new(
4343 ProfileName::from("worker"),
4344 mid.clone(),
4345 ))
4346 .await
4347 .unwrap_or_else(|e| panic!("{e}"));
4348 let session_id = runtime
4349 .handle
4350 .resolve_bridge_session_id(&mid)
4351 .await
4352 .unwrap_or_else(|| panic!("spawned worker has no bridge session id"));
4353
4354 let stored = custom_store
4355 .load(&session_id)
4356 .await
4357 .unwrap_or_else(|e| panic!("{e}"));
4358 assert!(
4359 stored.is_some(),
4360 "ephemeral runtime-backed builds with a custom store must persist through that store"
4361 );
4362 }
4363
4364 #[tokio::test]
4365 async fn ephemeral_runtime_backed_custom_session_store_resumes_after_runtime_restart() {
4366 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4367 let store_path = dir.path().to_path_buf();
4368 let definition_toml = "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\n[profiles.worker.tools]\ncomms = true\n";
4369 let Ok(definition) = meerkat_mob::MobDefinition::from_toml(definition_toml) else {
4370 panic!("failed to parse minimal mob definition");
4371 };
4372 let custom_store: Arc<dyn SessionStore> = Arc::new(meerkat_store::MemoryStore::new());
4373 let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4374 definition,
4375 meerkat_mob::MobStorage::in_memory(),
4376 store_path.clone(),
4377 4,
4378 Some(custom_store.clone()),
4379 None,
4380 None,
4381 CapabilityFlags::default(),
4382 None,
4383 None,
4384 );
4385 spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
4386
4387 let runtime = MobRuntime::bootstrap(spec)
4388 .await
4389 .unwrap_or_else(|e| panic!("{e}"));
4390 let mid = meerkat_mob::ids::MeerkatId::from("worker:one");
4391 runtime
4392 .handle
4393 .spawn_spec(SpawnMemberSpec::new(
4394 ProfileName::from("worker"),
4395 mid.clone(),
4396 ))
4397 .await
4398 .unwrap_or_else(|e| panic!("{e}"));
4399 let session_id = runtime
4400 .handle
4401 .resolve_bridge_session_id(&mid)
4402 .await
4403 .unwrap_or_else(|| panic!("spawned worker has no bridge session id"));
4404 drop(runtime);
4405
4406 let Ok(definition) = meerkat_mob::MobDefinition::from_toml(definition_toml) else {
4407 panic!("failed to parse minimal mob definition");
4408 };
4409 let mut restarted_spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4410 definition,
4411 meerkat_mob::MobStorage::in_memory(),
4412 store_path,
4413 4,
4414 Some(custom_store),
4415 None,
4416 None,
4417 CapabilityFlags::default(),
4418 None,
4419 None,
4420 );
4421 restarted_spec.options.default_llm_client =
4422 Some(Arc::new(meerkat_client::TestClient::default()));
4423
4424 let restarted = MobRuntime::bootstrap(restarted_spec)
4425 .await
4426 .unwrap_or_else(|e| panic!("{e}"));
4427 let mut resume_spec = SpawnMemberSpec::new(ProfileName::from("worker"), mid.clone());
4428 resume_spec.launch_mode = meerkat_mob::MemberLaunchMode::Resume {
4429 bridge_session_id: session_id.clone(),
4430 };
4431 restarted
4432 .handle
4433 .spawn_spec(resume_spec)
4434 .await
4435 .unwrap_or_else(|e| panic!("resume should load the external session snapshot: {e}"));
4436
4437 let resumed_session_id = restarted
4438 .handle
4439 .resolve_bridge_session_id(&mid)
4440 .await
4441 .unwrap_or_else(|| panic!("resumed worker has no bridge session id"));
4442 assert_eq!(resumed_session_id, session_id);
4443 }
4444
4445 #[test]
4448 fn ephemeral_bootstrap_without_image_generation_stays_direct() {
4449 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4450 let store_path = dir.path().to_path_buf();
4451 let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
4452 "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\nruntime_mode = \"autonomous_host\"\n[profiles.worker.tools]\ncomms = true\n",
4453 ) else {
4454 panic!("failed to parse definition");
4455 };
4456 let spec = MobBootstrapSpec::ephemeral_inner(
4457 definition,
4458 meerkat_mob::MobStorage::in_memory(),
4459 store_path,
4460 4,
4461 None,
4462 None,
4463 CapabilityFlags::default(),
4464 None,
4465 None,
4466 );
4467 assert!(
4468 spec.runtime_adapter.is_none(),
4469 "public ephemeral builds only need a runtime adapter when the definition may use image generation"
4470 );
4471 }
4472
4473 #[test]
4478 fn ephemeral_bootstrap_with_image_generation_shares_runtime_adapter() {
4479 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4480 let store_path = dir.path().to_path_buf();
4481 let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
4482 r#"
4483[mob]
4484id = "test"
4485
4486[profiles.commander]
4487model = "gpt-5.5"
4488
4489[profiles.commander.tools]
4490builtins = true
4491image_generation = true
4492"#,
4493 ) else {
4494 panic!("failed to parse image-generation definition");
4495 };
4496 let spec = MobBootstrapSpec::ephemeral(
4497 definition,
4498 meerkat_mob::MobStorage::in_memory(),
4499 store_path,
4500 4,
4501 None,
4502 );
4503 let spec_adapter = spec
4504 .runtime_adapter
4505 .as_ref()
4506 .expect("image-generation ephemeral builds must expose a runtime adapter");
4507 let service_adapter = spec
4508 .session_service
4509 .runtime_adapter()
4510 .expect("session service must expose the same runtime adapter");
4511 assert!(
4512 spec_adapter.shares_runtime_persistence_with(&service_adapter),
4513 "image-generation tool state and session state must share one runtime authority"
4514 );
4515 }
4516
4517 #[test]
4520 fn normalize_runtime_turn_request_strips_runtime_owned_semantics() {
4521 let req = meerkat_core::service::StartTurnRequest {
4522 prompt: meerkat_core::ContentInput::Text("checkpoint".to_string()),
4523 system_prompt: Some("system".to_string()),
4524 event_tx: None,
4525 runtime: meerkat_core::service::StartTurnRuntimeSemantics {
4526 render_metadata: Some(meerkat_core::types::RenderMetadata {
4527 class: meerkat_core::types::RenderClass::OpsProgress,
4528 salience: meerkat_core::types::RenderSalience::Urgent,
4529 }),
4530 handling_mode: meerkat_core::types::HandlingMode::Steer,
4531 skill_references: None,
4532 flow_tool_overlay: None,
4533 pre_turn_context_appends: Vec::new(),
4534 typed_turn_appends: Vec::new(),
4535 turn_metadata: None,
4536 },
4537 };
4538
4539 let expected_prompt = req.prompt.clone();
4540 let expected_system_prompt = req.system_prompt.clone();
4541
4542 let normalized = normalize_runtime_turn_request(req);
4543
4544 assert_eq!(
4545 normalized.runtime.handling_mode,
4546 meerkat_core::types::HandlingMode::Queue,
4547 "runtime-applied turns must downgrade Steer before reaching direct session services"
4548 );
4549 assert!(
4550 normalized.runtime.render_metadata.is_none(),
4551 "runtime-owned render metadata must not be forwarded through the direct agent path"
4552 );
4553 assert_eq!(normalized.prompt, expected_prompt);
4554 assert_eq!(normalized.system_prompt, expected_system_prompt);
4555 }
4556
4557 #[test]
4559 fn session_created_context_fields() {
4560 let ctx = SessionCreatedContext {
4561 model: "claude-sonnet-4-5".to_string(),
4562 labels: std::collections::BTreeMap::from([(
4563 "agent_type".to_string(),
4564 "lead".to_string(),
4565 )]),
4566 system_prompt: Some("You are a lead agent.".to_string()),
4567 };
4568 assert_eq!(ctx.model, "claude-sonnet-4-5");
4569 assert_eq!(ctx.labels["agent_type"], "lead");
4570 assert_eq!(ctx.system_prompt.as_deref(), Some("You are a lead agent."));
4571 }
4572
4573 #[tokio::test]
4575 async fn session_hook_default_impls_are_noop() {
4576 struct EmptyHook;
4577 #[async_trait]
4578 impl SessionHook for EmptyHook {}
4579
4580 let hook = EmptyHook;
4581 let mut req = CreateSessionRequest {
4582 model: "test".to_string(),
4583 prompt: meerkat_core::ContentInput::Text("test".to_string()),
4584 render_metadata: None,
4585 system_prompt: None,
4586 max_tokens: None,
4587 event_tx: None,
4588 skill_references: None,
4589 initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
4590 build: None,
4591 labels: None,
4592 deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
4593 };
4594 hook.before_create(&mut req).await.unwrap();
4596 let ctx = SessionCreatedContext {
4598 model: "test".to_string(),
4599 labels: Default::default(),
4600 system_prompt: None,
4601 };
4602 hook.after_create(&meerkat_core::types::SessionId::new(), &ctx)
4603 .await;
4604 }
4605
4606 #[tokio::test]
4608 async fn session_hook_before_create_can_abort() {
4609 struct AbortHook;
4610 #[async_trait]
4611 impl SessionHook for AbortHook {
4612 async fn before_create(
4613 &self,
4614 _req: &mut CreateSessionRequest,
4615 ) -> Result<(), SessionError> {
4616 Err(SessionError::Unsupported("hook abort".into()))
4617 }
4618 }
4619
4620 let hook = AbortHook;
4621 let mut req = CreateSessionRequest {
4622 model: "test".to_string(),
4623 prompt: meerkat_core::ContentInput::Text("test".to_string()),
4624 render_metadata: None,
4625 system_prompt: None,
4626 max_tokens: None,
4627 event_tx: None,
4628 skill_references: None,
4629 initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
4630 build: None,
4631 labels: None,
4632 deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
4633 };
4634 let result = hook.before_create(&mut req).await;
4635 assert!(result.is_err());
4636 }
4637
4638 #[tokio::test]
4640 async fn session_hook_before_create_mutates_request() {
4641 struct MutatingHook;
4642 #[async_trait]
4643 impl SessionHook for MutatingHook {
4644 async fn before_create(
4645 &self,
4646 req: &mut CreateSessionRequest,
4647 ) -> Result<(), SessionError> {
4648 req.model = "hook-overridden".to_string();
4649 req.system_prompt = Some("injected by hook".to_string());
4650 Ok(())
4651 }
4652 }
4653
4654 let hook = MutatingHook;
4655 let mut req = CreateSessionRequest {
4656 model: "original".to_string(),
4657 prompt: meerkat_core::ContentInput::Text("test".to_string()),
4658 render_metadata: None,
4659 system_prompt: None,
4660 max_tokens: None,
4661 event_tx: None,
4662 skill_references: None,
4663 initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
4664 build: None,
4665 labels: None,
4666 deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
4667 };
4668 hook.before_create(&mut req).await.unwrap();
4669 assert_eq!(req.model, "hook-overridden");
4670 assert_eq!(req.system_prompt.as_deref(), Some("injected by hook"));
4671 }
4672}