1use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use meerkat::{AgentFactory, Config, FactoryAgentBuilder, SessionStore};
10use meerkat_client::types::LlmStream;
11use meerkat_client::{LlmClient, LlmRequest};
12use meerkat_core::agent::CommsRuntime;
13use meerkat_core::service::{
14 CreateSessionRequest, SessionError, SessionHistoryPage, SessionHistoryQuery,
15 SessionServiceHistoryExt,
16};
17use meerkat_core::{AgentSessionStore, AssistantBlock, Message, Provider};
18use meerkat_mob::{
19 MobBuilder, MobDefinition, MobError, MobHandle, MobSessionService, MobStorage, Profile,
20 ProfileName, SpawnMemberSpec,
21};
22use meerkat_runtime::input_state::StoredInputState;
23use meerkat_runtime::store::MachineLifecycleCommit;
24use meerkat_store::StoreAdapter;
25use serde_json::Value;
26
27use crate::blob_store::{
28 Base64BlobStoreAdapter, BinaryBlobStore, BinaryBlobStoreAdapter, ObjectStoreBlobStore,
29};
30
31pub(crate) const DELEGATE_IDLE_RETIRE_SECS_LABEL: &str = "implicit_delegate_idle_retire_secs";
32pub(crate) const DELEGATE_IDLE_RETIRE_DISABLED_LABEL: &str = "disabled";
33
34pub(crate) fn is_previous_member_cleanup_ambiguous_error(error: &str) -> bool {
35 error.contains("previous member cleanup ambiguous for member ")
36}
37
38pub(crate) fn is_recoverable_lifecycle_cleanup_error(error: &str) -> bool {
39 is_previous_member_cleanup_ambiguous_error(error)
40 || (error.contains("disposal completed but ArchiveSession failed")
41 && error.contains("cancel-before-retire failed")
42 && error.contains("Runtime not ready: running"))
43}
44
45#[cfg(test)]
46use std::sync::{Mutex, OnceLock};
47
48pub const MEMBER_STATE_ACTIVE: &str = "active";
50pub const MEMBER_STATE_RETIRING: &str = "retiring";
52
53#[derive(Clone, Default)]
55pub struct MobBootstrapOptions {
56 pub allow_ephemeral_sessions: bool,
57 pub notify_orchestrator_on_resume: bool,
58 pub default_llm_client: Option<Arc<dyn LlmClient>>,
59}
60
61pub struct ReplaySanitizingLlmClient {
65 inner: Arc<dyn LlmClient>,
66}
67
68type SharedDefaultLlmClientSlot = Arc<std::sync::RwLock<Option<Arc<dyn LlmClient>>>>;
69
70impl ReplaySanitizingLlmClient {
71 pub fn new(inner: Arc<dyn LlmClient>) -> Self {
72 Self { inner }
73 }
74
75 pub fn wrap(inner: Arc<dyn LlmClient>) -> Arc<dyn LlmClient> {
76 Arc::new(Self::new(inner))
77 }
78}
79
80pub struct ReplaySanitizingAgentLlmClient {
88 inner: Arc<dyn meerkat_core::AgentLlmClient>,
89}
90
91impl ReplaySanitizingAgentLlmClient {
92 pub fn new(inner: Arc<dyn meerkat_core::AgentLlmClient>) -> Self {
93 Self { inner }
94 }
95
96 pub fn wrap(
97 inner: Arc<dyn meerkat_core::AgentLlmClient>,
98 ) -> Arc<dyn meerkat_core::AgentLlmClient> {
99 Arc::new(Self::new(inner))
100 }
101}
102
103#[async_trait]
104impl meerkat_core::AgentLlmClient for ReplaySanitizingAgentLlmClient {
105 async fn stream_response(
106 &self,
107 messages: &[Message],
108 tools: &[Arc<meerkat_core::ToolDef>],
109 max_tokens: u32,
110 temperature: Option<f32>,
111 provider_params: Option<&meerkat_core::lifecycle::run_primitive::ProviderParamsOverride>,
112 ) -> Result<meerkat_core::agent::LlmStreamResult, meerkat_core::AgentError> {
113 let sanitized: Vec<Message> = messages
114 .iter()
115 .cloned()
116 .map(sanitize_message_for_stateless_replay)
117 .collect();
118 self.inner
119 .stream_response(&sanitized, tools, max_tokens, temperature, provider_params)
120 .await
121 }
122
123 fn provider(&self) -> &'static str {
124 self.inner.provider()
125 }
126
127 fn model(&self) -> &str {
128 self.inner.model()
129 }
130
131 fn compile_schema(
132 &self,
133 output_schema: &meerkat_core::OutputSchema,
134 ) -> Result<meerkat_core::schema::CompiledSchema, meerkat_core::schema::SchemaError> {
135 self.inner.compile_schema(output_schema)
136 }
137}
138
139#[async_trait]
140impl LlmClient for ReplaySanitizingLlmClient {
141 fn project_replay_messages(
142 &self,
143 messages: &[Message],
144 ) -> Result<Vec<Message>, meerkat_client::LlmError> {
145 let sanitized: Vec<Message> = messages
146 .iter()
147 .cloned()
148 .map(sanitize_message_for_stateless_replay)
149 .collect();
150 self.inner.project_replay_messages(&sanitized)
151 }
152
153 fn stream<'a>(&'a self, request: &'a LlmRequest) -> LlmStream<'a> {
154 let inner = Arc::clone(&self.inner);
155 let sanitized = sanitize_llm_request_for_stateless_replay(request);
156 Box::pin(async_stream::stream! {
157 let mut stream = inner.stream(&sanitized);
158 while let Some(event) = stream.next().await {
159 if runtime_turn_diagnostics_enabled()
160 && let Err(error) = &event
161 {
162 tracing::error!(
163 error = %error,
164 error_debug = ?error,
165 "mobkit llm client stream error"
166 );
167 }
168 yield event;
169 }
170 })
171 }
172
173 fn provider(&self) -> &'static str {
174 self.inner.provider()
175 }
176
177 async fn health_check(&self) -> Result<(), meerkat_client::LlmError> {
178 self.inner.health_check().await
179 }
180
181 fn compile_schema(
182 &self,
183 output_schema: &meerkat_core::OutputSchema,
184 ) -> Result<meerkat_core::schema::CompiledSchema, meerkat_core::schema::SchemaError> {
185 self.inner.compile_schema(output_schema)
186 }
187}
188
189pub(crate) type PreBuildHook = Arc<
217 dyn Fn(
218 &mut CreateSessionRequest,
219 ) -> std::pin::Pin<
220 Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
221 > + Send
222 + Sync,
223>;
224
225pub type AfterCreateHook = Arc<
227 dyn Fn(
228 meerkat_core::types::SessionId,
229 SessionCreatedContext,
230 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
231 + Send
232 + Sync,
233>;
234
235struct PreBuildMobSessionService {
242 inner: Arc<dyn MobSessionService>,
243 hook: PreBuildHook,
244 after_create_hook: Option<AfterCreateHook>,
245 runtime_adapter_override: Option<Arc<meerkat_runtime::MeerkatMachine>>,
246}
247
248fn no_op_pre_build_hook() -> PreBuildHook {
249 Arc::new(|_req: &mut CreateSessionRequest| Box::pin(async { Ok(()) }))
250}
251
252#[derive(Debug, Clone, Copy, PartialEq, Eq)]
253pub(crate) enum DelegateIdleRetireOverride {
254 Disabled,
255 Seconds(u64),
256}
257
258#[derive(Clone, Default)]
259pub(crate) struct ImplicitDelegateRetirementOverrides {
260 inner: Arc<tokio::sync::RwLock<BTreeMap<(String, String), DelegateIdleRetireOverride>>>,
261}
262
263impl ImplicitDelegateRetirementOverrides {
264 pub(crate) async fn set(
265 &self,
266 mob_id: impl Into<String>,
267 member_id: impl Into<String>,
268 override_policy: DelegateIdleRetireOverride,
269 ) {
270 self.inner
271 .write()
272 .await
273 .insert((mob_id.into(), member_id.into()), override_policy);
274 }
275
276 pub(crate) async fn get(
277 &self,
278 mob_id: &str,
279 member_id: &str,
280 ) -> Option<DelegateIdleRetireOverride> {
281 self.inner
282 .read()
283 .await
284 .get(&(mob_id.to_string(), member_id.to_string()))
285 .copied()
286 }
287}
288
289struct AutoWireParentMobToolsFactory {
290 inner: Arc<dyn meerkat_core::service::MobToolsFactory>,
291 implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides,
292}
293
294#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
295#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
296impl meerkat_core::service::MobToolsFactory for AutoWireParentMobToolsFactory {
297 async fn build_mob_tools(
298 &self,
299 args: meerkat_core::service::MobToolsBuildArgs,
300 ) -> Result<Arc<dyn meerkat_core::AgentToolDispatcher>, Box<dyn std::error::Error + Send + Sync>>
301 {
302 let inner = self.inner.build_mob_tools(args).await?;
303 Ok(Arc::new(AutoWireParentMobToolDispatcher {
304 inner,
305 implicit_delegate_retirement_overrides: self
306 .implicit_delegate_retirement_overrides
307 .clone(),
308 }))
309 }
310}
311
312struct AutoWireParentMobToolDispatcher {
313 inner: Arc<dyn meerkat_core::AgentToolDispatcher>,
314 implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides,
315}
316
317#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
318#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
319impl meerkat_core::AgentToolDispatcher for AutoWireParentMobToolDispatcher {
320 fn tools(&self) -> Arc<[Arc<meerkat_core::types::ToolDef>]> {
321 self.inner
322 .tools()
323 .iter()
324 .map(|tool| {
325 if tool.name == "delegate" {
326 Arc::new(delegate_tool_def_with_idle_retire_secs(tool))
327 } else if tool.name == "mob_spawn_member" {
328 Arc::new(mob_spawn_tool_def_with_idle_retire_secs(tool))
329 } else {
330 Arc::clone(tool)
331 }
332 })
333 .collect::<Vec<_>>()
334 .into()
335 }
336
337 async fn dispatch(
338 &self,
339 call: meerkat_core::types::ToolCallView<'_>,
340 ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
341 if call.name == "delegate" {
342 return self.dispatch_delegate(call).await;
343 }
344 if call.name != "mob_spawn_member" {
345 return self.inner.dispatch(call).await;
346 }
347 self.dispatch_mob_spawn_member(call).await
348 }
349
350 fn capabilities(&self) -> meerkat_core::agent::DispatcherCapabilities {
351 self.inner.capabilities()
352 }
353
354 fn bind_ops_lifecycle(
355 self: Arc<Self>,
356 registry: Arc<dyn meerkat_core::ops_lifecycle::OpsLifecycleRegistry>,
357 owner_bridge_session_id: meerkat_core::types::SessionId,
358 ) -> Result<meerkat_core::agent::BindOutcome, meerkat_core::agent::OpsLifecycleBindError> {
359 let owned = Arc::try_unwrap(self)
360 .map_err(|_| meerkat_core::agent::OpsLifecycleBindError::SharedOwnership)?;
361 let outcome = owned
362 .inner
363 .bind_ops_lifecycle(registry, owner_bridge_session_id)?;
364 let was_bound = outcome.was_bound();
365 let dispatcher = Arc::new(Self {
366 inner: outcome.into_dispatcher(),
367 implicit_delegate_retirement_overrides: owned.implicit_delegate_retirement_overrides,
368 });
369 Ok(if was_bound {
370 meerkat_core::agent::BindOutcome::Bound(dispatcher)
371 } else {
372 meerkat_core::agent::BindOutcome::Skipped(dispatcher)
373 })
374 }
375}
376
377impl AutoWireParentMobToolDispatcher {
378 async fn dispatch_mob_spawn_member(
379 &self,
380 call: meerkat_core::types::ToolCallView<'_>,
381 ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
382 let mut args = serde_json::from_str::<Value>(call.args.get()).map_err(|error| {
383 meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
384 })?;
385 let idle_retire_override = delegate_idle_retire_override_from_args(call.name, &mut args)?;
386 let idle_retire_targets = idle_retire_targets_from_spawn_args(&args);
387 if let Some(object) = args.as_object_mut() {
388 object
389 .entry("auto_wire_parent".to_string())
390 .or_insert(Value::Bool(true));
391 }
392 let args = serde_json::value::RawValue::from_string(args.to_string()).map_err(|error| {
393 meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
394 })?;
395 let call = meerkat_core::types::ToolCallView {
396 id: call.id,
397 name: call.name,
398 args: &args,
399 };
400 let outcome = self.inner.dispatch(call).await?;
401 self.register_idle_retire_override_from_outcome(
402 &outcome,
403 idle_retire_override,
404 &idle_retire_targets,
405 )
406 .await;
407
408 Ok(outcome)
409 }
410
411 async fn dispatch_delegate(
412 &self,
413 call: meerkat_core::types::ToolCallView<'_>,
414 ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
415 let mut args = serde_json::from_str::<Value>(call.args.get()).map_err(|error| {
416 meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
417 })?;
418 let idle_retire_override = delegate_idle_retire_override_from_args(call.name, &mut args)?;
419 let args = serde_json::value::RawValue::from_string(args.to_string()).map_err(|error| {
420 meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
421 })?;
422 let call = meerkat_core::types::ToolCallView {
423 id: call.id,
424 name: call.name,
425 args: &args,
426 };
427 let outcome = self.inner.dispatch(call).await?;
428
429 self.register_idle_retire_override_from_outcome(&outcome, idle_retire_override, &[])
430 .await;
431
432 Ok(outcome)
433 }
434
435 async fn register_idle_retire_override_from_outcome(
436 &self,
437 outcome: &meerkat_core::ToolDispatchOutcome,
438 idle_retire_override: Option<DelegateIdleRetireOverride>,
439 fallback_targets: &[IdleRetireTarget],
440 ) {
441 if outcome.result.is_error {
442 return;
443 }
444 let Some(override_policy) = idle_retire_override else {
445 return;
446 };
447 for target in
448 idle_retire_targets_from_outcome_text(&outcome.result.text_content(), fallback_targets)
449 {
450 self.implicit_delegate_retirement_overrides
451 .set(&target.mob_id, &target.member_id, override_policy)
452 .await;
453 }
454 }
455}
456
457#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
458struct IdleRetireTarget {
459 mob_id: String,
460 member_id: String,
461}
462
463fn text_field<'a>(value: &'a Value, key: &str) -> Option<&'a str> {
464 value
465 .get(key)
466 .and_then(Value::as_str)
467 .filter(|text| !text.is_empty())
468}
469
470fn member_identity_field(value: &Value) -> Option<&str> {
471 text_field(value, "agent_identity")
472 .or_else(|| text_field(value, "member_id"))
473 .or_else(|| text_field(value, "identity"))
474}
475
476fn target_from_value(value: &Value, default_mob_id: Option<&str>) -> Option<IdleRetireTarget> {
477 let mob_id = text_field(value, "mob_id").or(default_mob_id)?;
478 let member_id = member_identity_field(value)?;
479 Some(IdleRetireTarget {
480 mob_id: mob_id.to_string(),
481 member_id: member_id.to_string(),
482 })
483}
484
485fn idle_retire_targets_from_spawn_args(args: &Value) -> Vec<IdleRetireTarget> {
486 let default_mob_id = text_field(args, "mob_id");
487 let mut targets = BTreeSet::new();
488 if let Some(target) = target_from_value(args, default_mob_id) {
489 targets.insert(target);
490 }
491 for key in ["specs", "members"] {
492 let Some(values) = args.get(key).and_then(Value::as_array) else {
493 continue;
494 };
495 for value in values {
496 if let Some(target) = target_from_value(value, default_mob_id) {
497 targets.insert(target);
498 }
499 }
500 }
501 targets.into_iter().collect()
502}
503
504fn target_from_result_value(
505 value: &Value,
506 fallback_targets: &[IdleRetireTarget],
507) -> Option<IdleRetireTarget> {
508 if let Some(target) = target_from_value(value, None) {
509 return Some(target);
510 }
511 let member_id = member_identity_field(value)?;
512 let mut matches = fallback_targets
513 .iter()
514 .filter(|target| target.member_id == member_id);
515 let target = matches.next()?;
516 if matches.next().is_some() {
517 return None;
518 }
519 Some(target.clone())
520}
521
522fn collect_idle_retire_result_targets(
523 value: &Value,
524 fallback_targets: &[IdleRetireTarget],
525 targets: &mut BTreeSet<IdleRetireTarget>,
526) {
527 if let Some(target) = target_from_result_value(value, fallback_targets) {
528 targets.insert(target);
529 }
530 for key in ["members", "specs", "spawned", "results"] {
531 let Some(values) = value.get(key).and_then(Value::as_array) else {
532 continue;
533 };
534 for value in values {
535 collect_idle_retire_result_targets(value, fallback_targets, targets);
536 }
537 }
538}
539
540fn idle_retire_targets_from_outcome_text(
541 text: &str,
542 fallback_targets: &[IdleRetireTarget],
543) -> Vec<IdleRetireTarget> {
544 let Ok(payload) = serde_json::from_str::<Value>(text) else {
545 return fallback_targets.to_vec();
546 };
547 let mut targets = BTreeSet::new();
548 collect_idle_retire_result_targets(&payload, fallback_targets, &mut targets);
549 if targets.is_empty() {
550 targets.extend(fallback_targets.iter().cloned());
551 }
552 targets.into_iter().collect()
553}
554
555struct DefinitionSeededRealmProfileStore {
556 inner: Arc<dyn meerkat_mob::RealmProfileStore>,
557 profiles: BTreeMap<String, Profile>,
558}
559
560impl DefinitionSeededRealmProfileStore {
561 fn new(
562 definition: &MobDefinition,
563 inner: Arc<dyn meerkat_mob::RealmProfileStore>,
564 ) -> Option<Self> {
565 let profiles = definition
566 .profiles
567 .iter()
568 .filter_map(|(name, binding)| {
569 binding
570 .as_inline()
571 .cloned()
572 .map(|profile| (name.to_string(), profile))
573 })
574 .collect::<BTreeMap<_, _>>();
575
576 (!profiles.is_empty()).then_some(Self { inner, profiles })
577 }
578
579 fn stored(&self, name: &str, profile: &Profile) -> meerkat_mob::StoredRealmProfile {
580 let now = chrono::Utc::now();
581 meerkat_mob::StoredRealmProfile {
582 name: name.to_string(),
583 profile: profile.clone(),
584 revision: 0,
585 created_at: now,
586 updated_at: now,
587 }
588 }
589}
590
591#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
592#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
593impl meerkat_mob::RealmProfileStore for DefinitionSeededRealmProfileStore {
594 async fn create(
595 &self,
596 name: &str,
597 profile: &Profile,
598 ) -> Result<meerkat_mob::StoredRealmProfile, meerkat_mob::MobStoreError> {
599 if self.profiles.contains_key(name) {
600 return Err(meerkat_mob::MobStoreError::CasConflict(format!(
601 "realm profile already exists: {name}"
602 )));
603 }
604 self.inner.create(name, profile).await
605 }
606
607 async fn get(
608 &self,
609 name: &str,
610 ) -> Result<Option<meerkat_mob::StoredRealmProfile>, meerkat_mob::MobStoreError> {
611 if let Some(profile) = self.profiles.get(name) {
612 return Ok(Some(self.stored(name, profile)));
613 }
614 self.inner.get(name).await
615 }
616
617 async fn list(
618 &self,
619 ) -> Result<Vec<meerkat_mob::StoredRealmProfile>, meerkat_mob::MobStoreError> {
620 let mut merged = self.inner.list().await?;
621 merged.retain(|profile| !self.profiles.contains_key(profile.name.as_str()));
622 merged.extend(
623 self.profiles
624 .iter()
625 .map(|(name, profile)| self.stored(name, profile)),
626 );
627 merged.sort_by(|a, b| a.name.cmp(&b.name));
628 Ok(merged)
629 }
630
631 async fn update(
632 &self,
633 name: &str,
634 profile: &Profile,
635 expected_revision: u64,
636 ) -> Result<meerkat_mob::StoredRealmProfile, meerkat_mob::MobStoreError> {
637 if self.profiles.contains_key(name) {
638 return Err(meerkat_mob::MobStoreError::CasConflict(format!(
639 "realm profile '{name}' is provided by the mob definition"
640 )));
641 }
642 self.inner.update(name, profile, expected_revision).await
643 }
644
645 async fn delete(
646 &self,
647 name: &str,
648 expected_revision: u64,
649 ) -> Result<meerkat_mob::StoredRealmProfile, meerkat_mob::MobStoreError> {
650 if self.profiles.contains_key(name) {
651 return Err(meerkat_mob::MobStoreError::CasConflict(format!(
652 "realm profile '{name}' is provided by the mob definition"
653 )));
654 }
655 self.inner.delete(name, expected_revision).await
656 }
657}
658
659fn delegate_idle_retire_override_from_args(
660 tool_name: &str,
661 args: &mut Value,
662) -> Result<Option<DelegateIdleRetireOverride>, meerkat_core::ToolError> {
663 let Some(object) = args.as_object_mut() else {
664 return Ok(None);
665 };
666 let Some(value) = object.remove("idle_retire_secs") else {
667 return Ok(None);
668 };
669 if value.is_null() {
670 return Ok(Some(DelegateIdleRetireOverride::Disabled));
671 }
672 value
673 .as_u64()
674 .map(DelegateIdleRetireOverride::Seconds)
675 .map(Some)
676 .ok_or_else(|| {
677 meerkat_core::ToolError::invalid_arguments(
678 tool_name,
679 "idle_retire_secs must be a non-negative integer or null",
680 )
681 })
682}
683
684fn delegate_tool_def_with_idle_retire_secs(
685 tool: &meerkat_core::types::ToolDef,
686) -> meerkat_core::types::ToolDef {
687 let mut patched = tool.clone();
688 if !patched.description.contains("IDLE RETIREMENT:") {
689 patched.description.push_str(
690 "\n\nIDLE RETIREMENT:\n\
691 Omit idle_retire_secs to use the runtime default. Pass an integer \
692 number of seconds to override idle auto-retirement for this helper. \
693 Pass null to disable auto-retirement for this helper.",
694 );
695 }
696 if let Some(properties) = patched
697 .input_schema
698 .get_mut("properties")
699 .and_then(Value::as_object_mut)
700 {
701 properties
702 .entry("idle_retire_secs".to_string())
703 .or_insert_with(|| {
704 serde_json::json!({
705 "description": "Override idle auto-retirement for this helper. Omit to use the runtime default, use an integer number of seconds to override, or null to disable auto-retirement for this helper.",
706 "anyOf": [
707 {"type": "integer", "minimum": 0},
708 {"type": "null"}
709 ]
710 })
711 });
712 }
713 patched
714}
715
716fn mob_spawn_tool_def_with_idle_retire_secs(
717 tool: &meerkat_core::types::ToolDef,
718) -> meerkat_core::types::ToolDef {
719 let mut patched = tool.clone();
720 if !patched.description.contains("IDLE RETIREMENT:") {
721 patched.description.push_str(
722 "\n\nIDLE RETIREMENT:\n\
723 Omit idle_retire_secs to leave this spawned member out of auto-retirement. \
724 Pass an integer number of seconds to retire the member after it has been \
725 idle for that long. Pass null to explicitly disable auto-retirement.",
726 );
727 }
728 if let Some(properties) = patched
729 .input_schema
730 .get_mut("properties")
731 .and_then(Value::as_object_mut)
732 {
733 properties
734 .entry("idle_retire_secs".to_string())
735 .or_insert_with(|| {
736 serde_json::json!({
737 "description": "Opt this spawned member into idle auto-retirement. Omit to keep the member indefinitely, use an integer number of seconds to retire after that much idle time, or null to explicitly disable auto-retirement.",
738 "anyOf": [
739 {"type": "integer", "minimum": 0},
740 {"type": "null"}
741 ]
742 })
743 });
744 }
745 patched
746}
747
748fn install_agent_mob_tools(
749 definition: &MobDefinition,
750 slot: Arc<std::sync::RwLock<Option<Arc<dyn meerkat_core::service::MobToolsFactory>>>>,
751 session_service: Arc<dyn MobSessionService>,
752) -> (
753 Arc<meerkat_mob_mcp::MobMcpState>,
754 ImplicitDelegateRetirementOverrides,
755 SharedDefaultLlmClientSlot,
756) {
757 let default_llm_client_slot = Arc::new(std::sync::RwLock::new(None::<Arc<dyn LlmClient>>));
758 let default_llm_client_provider_slot = Arc::clone(&default_llm_client_slot);
759 let mut state = meerkat_mob_mcp::MobMcpState::new(session_service);
760 if let Some(base_store) = state.realm_profile_store().cloned()
761 && let Some(store) = DefinitionSeededRealmProfileStore::new(definition, base_store)
762 {
763 state = state.with_realm_profile_store(Some(Arc::new(store)));
764 }
765 state = state
766 .with_realm_skill_sources(definition.skills.clone())
767 .with_default_llm_client_provider(Some(Arc::new(move || {
768 default_llm_client_provider_slot
769 .read()
770 .unwrap_or_else(std::sync::PoisonError::into_inner)
771 .clone()
772 })));
773 let state = Arc::new(state);
774 let implicit_delegate_retirement_overrides = ImplicitDelegateRetirementOverrides::default();
775 let inner = Arc::new(meerkat_mob_mcp::AgentMobToolSurfaceFactory::new(
776 Arc::clone(&state),
777 ));
778 let factory = Arc::new(AutoWireParentMobToolsFactory {
779 inner,
780 implicit_delegate_retirement_overrides: implicit_delegate_retirement_overrides.clone(),
781 });
782 *slot
783 .write()
784 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(factory);
785 (
786 state,
787 implicit_delegate_retirement_overrides,
788 default_llm_client_slot,
789 )
790}
791
792#[cfg(test)]
793#[allow(dead_code)]
794#[derive(Debug, Clone)]
795pub(crate) struct RuntimeTurnTrace {
796 pub(crate) session_id: String,
797 pub(crate) boundary: String,
798 pub(crate) contributing_input_count: usize,
799 pub(crate) outcome: String,
800}
801
802fn is_replay_unsafe_server_tool_content(name: &str, content: &Value) -> bool {
803 name == "web_search_annotations"
804 || content
805 .get("type")
806 .and_then(Value::as_str)
807 .is_some_and(|kind| kind.starts_with("response."))
808}
809
810fn sanitize_llm_request_for_stateless_replay(request: &LlmRequest) -> LlmRequest {
811 let mut sanitized = request.clone();
812 sanitized.messages = request
813 .messages
814 .iter()
815 .cloned()
816 .map(sanitize_message_for_stateless_replay)
817 .collect();
818 sanitized
819}
820
821fn sanitize_create_session_request_llm_override(req: &mut CreateSessionRequest) {
822 let Some(build) = req.build.as_mut() else {
823 return;
824 };
825 let Some(client) = build
826 .llm_client_override
827 .as_ref()
828 .and_then(meerkat::decode_llm_client_override_from_service)
829 else {
830 return;
831 };
832 build.llm_client_override = Some(meerkat::encode_llm_client_override_for_service(
833 ReplaySanitizingLlmClient::wrap(client),
834 ));
835}
836
837fn sanitize_message_for_stateless_replay(message: Message) -> Message {
838 match message {
839 Message::BlockAssistant(mut assistant) => {
840 assistant.blocks = assistant
841 .blocks
842 .into_iter()
843 .filter_map(|block| match block {
844 AssistantBlock::ServerToolContent { name, content, .. }
845 if is_replay_unsafe_server_tool_content(&name, &content) =>
846 {
847 None
848 }
849 other => Some(other),
850 })
851 .collect();
852 Message::BlockAssistant(assistant)
853 }
854 other => other,
855 }
856}
857
858fn build_persistent_runtime_store(store_path: &Path) -> Arc<dyn meerkat_runtime::RuntimeStore> {
868 let runtime_db = store_path.join("runtime.sqlite");
869 match meerkat_runtime::store::SqliteRuntimeStore::new(&runtime_db) {
870 Ok(store) => Arc::new(store),
871 Err(err) => {
872 tracing::warn!(
873 path = %runtime_db.display(),
874 error = %err,
875 "failed to open SqliteRuntimeStore; falling back to InMemoryRuntimeStore. \
876 Sessions will not survive process restart and archive operations may fail.",
877 );
878 Arc::new(meerkat_runtime::InMemoryRuntimeStore::new())
879 }
880 }
881}
882
883struct SessionStoreBackedRuntimeStore {
891 inner: Arc<dyn meerkat_runtime::RuntimeStore>,
892}
893
894impl SessionStoreBackedRuntimeStore {
895 fn new(
896 inner: Arc<dyn meerkat_runtime::RuntimeStore>,
897 _session_store: Arc<dyn SessionStore>,
898 ) -> Self {
899 Self { inner }
900 }
901}
902
903#[async_trait]
904impl meerkat_runtime::RuntimeStore for SessionStoreBackedRuntimeStore {
905 fn auth_authority_key(&self) -> Option<String> {
906 self.inner.auth_authority_key()
907 }
908
909 fn persist_auth_oauth_flow_snapshot(
910 &self,
911 snapshot_json: &[u8],
912 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
913 self.inner.persist_auth_oauth_flow_snapshot(snapshot_json)
914 }
915
916 fn load_auth_oauth_flow_snapshot(
917 &self,
918 ) -> Result<Option<Vec<u8>>, meerkat_runtime::store::RuntimeStoreError> {
919 self.inner.load_auth_oauth_flow_snapshot()
920 }
921
922 fn update_auth_oauth_flow_snapshot(
923 &self,
924 update: &mut meerkat_runtime::store::AuthOAuthFlowSnapshotUpdate<'_>,
925 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
926 self.inner.update_auth_oauth_flow_snapshot(update)
927 }
928
929 async fn commit_session_snapshot(
930 &self,
931 runtime_id: &meerkat_runtime::LogicalRuntimeId,
932 session_delta: meerkat_runtime::store::SessionDelta,
933 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
934 self.inner
935 .commit_session_snapshot(runtime_id, session_delta)
936 .await
937 }
938
939 async fn commit_session_transcript_rewrite_snapshot(
940 &self,
941 runtime_id: &meerkat_runtime::LogicalRuntimeId,
942 session_delta: meerkat_runtime::store::SessionDelta,
943 commit: &meerkat_core::TranscriptRewriteCommit,
944 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
945 self.inner
946 .commit_session_transcript_rewrite_snapshot(runtime_id, session_delta, commit)
947 .await
948 }
949
950 async fn atomic_apply(
951 &self,
952 runtime_id: &meerkat_runtime::LogicalRuntimeId,
953 session_delta: Option<meerkat_runtime::store::SessionDelta>,
954 receipt: meerkat_core::lifecycle::RunBoundaryReceipt,
955 input_updates: Vec<StoredInputState>,
956 session_store_key: Option<meerkat_core::types::SessionId>,
957 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
958 self.inner
959 .atomic_apply(
960 runtime_id,
961 session_delta,
962 receipt,
963 input_updates,
964 session_store_key,
965 )
966 .await
967 }
968
969 async fn load_input_states(
970 &self,
971 runtime_id: &meerkat_runtime::LogicalRuntimeId,
972 ) -> Result<Vec<StoredInputState>, meerkat_runtime::store::RuntimeStoreError> {
973 self.inner.load_input_states(runtime_id).await
974 }
975
976 async fn load_boundary_receipt(
977 &self,
978 runtime_id: &meerkat_runtime::LogicalRuntimeId,
979 run_id: &meerkat_core::lifecycle::RunId,
980 sequence: u64,
981 ) -> Result<
982 Option<meerkat_core::lifecycle::RunBoundaryReceipt>,
983 meerkat_runtime::store::RuntimeStoreError,
984 > {
985 self.inner
986 .load_boundary_receipt(runtime_id, run_id, sequence)
987 .await
988 }
989
990 async fn load_session_snapshot(
991 &self,
992 runtime_id: &meerkat_runtime::LogicalRuntimeId,
993 ) -> Result<Option<Vec<u8>>, meerkat_runtime::store::RuntimeStoreError> {
994 self.inner.load_session_snapshot(runtime_id).await
995 }
996
997 async fn clear_session_snapshot(
998 &self,
999 runtime_id: &meerkat_runtime::LogicalRuntimeId,
1000 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
1001 self.inner.clear_session_snapshot(runtime_id).await
1002 }
1003
1004 async fn replace_session_snapshot_if_current(
1005 &self,
1006 runtime_id: &meerkat_runtime::LogicalRuntimeId,
1007 expected_current: &[u8],
1008 replacement: Vec<u8>,
1009 ) -> Result<bool, meerkat_runtime::store::RuntimeStoreError> {
1010 self.inner
1011 .replace_session_snapshot_if_current(runtime_id, expected_current, replacement)
1012 .await
1013 }
1014
1015 async fn clear_session_snapshot_if_current(
1016 &self,
1017 runtime_id: &meerkat_runtime::LogicalRuntimeId,
1018 expected_current: &[u8],
1019 ) -> Result<bool, meerkat_runtime::store::RuntimeStoreError> {
1020 self.inner
1021 .clear_session_snapshot_if_current(runtime_id, expected_current)
1022 .await
1023 }
1024
1025 async fn persist_input_state(
1026 &self,
1027 runtime_id: &meerkat_runtime::LogicalRuntimeId,
1028 state: &StoredInputState,
1029 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
1030 self.inner.persist_input_state(runtime_id, state).await
1031 }
1032
1033 async fn load_input_state(
1034 &self,
1035 runtime_id: &meerkat_runtime::LogicalRuntimeId,
1036 input_id: &meerkat_core::lifecycle::InputId,
1037 ) -> Result<Option<StoredInputState>, meerkat_runtime::store::RuntimeStoreError> {
1038 self.inner.load_input_state(runtime_id, input_id).await
1039 }
1040
1041 async fn load_runtime_state(
1042 &self,
1043 runtime_id: &meerkat_runtime::LogicalRuntimeId,
1044 ) -> Result<Option<meerkat_runtime::RuntimeState>, meerkat_runtime::store::RuntimeStoreError>
1045 {
1046 self.inner.load_runtime_state(runtime_id).await
1047 }
1048
1049 async fn commit_machine_lifecycle(
1050 &self,
1051 runtime_id: &meerkat_runtime::LogicalRuntimeId,
1052 commit: MachineLifecycleCommit,
1053 input_states: &[StoredInputState],
1054 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
1055 self.inner
1056 .commit_machine_lifecycle(runtime_id, commit, input_states)
1057 .await
1058 }
1059
1060 async fn persist_ops_lifecycle(
1061 &self,
1062 runtime_id: &meerkat_runtime::LogicalRuntimeId,
1063 snapshot: &meerkat_runtime::ops_lifecycle::PersistedOpsSnapshot,
1064 ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
1065 self.inner.persist_ops_lifecycle(runtime_id, snapshot).await
1066 }
1067
1068 async fn load_ops_lifecycle(
1069 &self,
1070 runtime_id: &meerkat_runtime::LogicalRuntimeId,
1071 ) -> Result<
1072 Option<meerkat_runtime::ops_lifecycle::PersistedOpsSnapshot>,
1073 meerkat_runtime::store::RuntimeStoreError,
1074 > {
1075 self.inner.load_ops_lifecycle(runtime_id).await
1076 }
1077}
1078
1079#[cfg(test)]
1080static RUNTIME_TURN_TRACES: OnceLock<Mutex<Vec<RuntimeTurnTrace>>> = OnceLock::new();
1081
1082#[cfg(test)]
1083fn runtime_turn_traces() -> &'static Mutex<Vec<RuntimeTurnTrace>> {
1084 RUNTIME_TURN_TRACES.get_or_init(|| Mutex::new(Vec::new()))
1085}
1086
1087#[cfg(test)]
1088#[allow(clippy::expect_used)]
1089fn record_runtime_turn_trace(trace: RuntimeTurnTrace) {
1090 runtime_turn_traces()
1091 .lock()
1092 .expect("runtime turn traces mutex")
1093 .push(trace);
1094}
1095
1096#[cfg(test)]
1097#[allow(dead_code)]
1098#[allow(clippy::expect_used)]
1099pub(crate) fn take_runtime_turn_traces() -> Vec<RuntimeTurnTrace> {
1100 std::mem::take(
1101 &mut *runtime_turn_traces()
1102 .lock()
1103 .expect("runtime turn traces mutex"),
1104 )
1105}
1106
1107#[cfg(not(test))]
1108#[allow(dead_code)]
1109fn record_runtime_turn_trace(_trace: ()) {}
1110
1111fn runtime_turn_diagnostics_enabled() -> bool {
1112 std::env::var_os("MOBKIT_TRACE_RUNTIME_TURNS").is_some()
1113}
1114
1115fn summarize_runtime_prompt(prompt: &meerkat_core::ContentInput) -> String {
1116 match prompt {
1117 meerkat_core::ContentInput::Text(text) => {
1118 text.lines().take(6).collect::<Vec<_>>().join(" ")
1119 }
1120 meerkat_core::ContentInput::Blocks(blocks) => blocks
1121 .iter()
1122 .map(|block| block.text_projection().to_string())
1123 .collect::<Vec<_>>()
1124 .join(" ")
1125 .lines()
1126 .take(6)
1127 .collect::<Vec<_>>()
1128 .join(" "),
1129 }
1130}
1131
1132pub fn mob_definition_may_use_image_generation(definition: &MobDefinition) -> bool {
1138 definition.profiles.values().any(|binding| {
1139 binding
1140 .as_inline()
1141 .is_none_or(|profile| profile.tools.image_generation)
1142 })
1143}
1144
1145fn normalize_runtime_turn_request(
1146 mut req: meerkat_core::service::StartTurnRequest,
1147) -> meerkat_core::service::StartTurnRequest {
1148 req.runtime.handling_mode = meerkat_core::types::HandlingMode::Queue;
1154 req.runtime.render_metadata = None;
1155 req
1156}
1157
1158macro_rules! delegate_mob_session_service {
1161 ($wrapper:ty) => {
1162 #[async_trait]
1163 impl meerkat_core::service::SessionService for $wrapper {
1164 async fn create_session(
1165 &self,
1166 mut req: CreateSessionRequest,
1167 ) -> Result<meerkat_core::types::RunResult, SessionError> {
1168 (self.hook)(&mut req).await?;
1169 sanitize_create_session_request_llm_override(&mut req);
1170
1171 let ctx = SessionCreatedContext {
1173 model: req.model.clone(),
1174 labels: req.labels.clone().unwrap_or_default(),
1175 system_prompt: req.system_prompt.clone(),
1176 };
1177 let result = self.inner.create_session(req).await?;
1178
1179 if let Some(ref after_hook) = self.after_create_hook {
1181 after_hook(result.session_id.clone(), ctx).await;
1182 }
1183
1184 Ok(result)
1185 }
1186 async fn start_turn(
1187 &self,
1188 id: &meerkat_core::types::SessionId,
1189 req: meerkat_core::service::StartTurnRequest,
1190 ) -> Result<meerkat_core::types::RunResult, SessionError> {
1191 self.inner.start_turn(id, req).await
1192 }
1193 async fn interrupt(
1194 &self,
1195 id: &meerkat_core::types::SessionId,
1196 ) -> Result<(), SessionError> {
1197 self.inner.interrupt(id).await
1198 }
1199 async fn cancel_after_boundary(
1200 &self,
1201 id: &meerkat_core::types::SessionId,
1202 ) -> Result<(), SessionError> {
1203 self.inner.cancel_after_boundary(id).await
1204 }
1205 async fn set_session_client(
1206 &self,
1207 id: &meerkat_core::types::SessionId,
1208 client: Arc<dyn meerkat_core::AgentLlmClient>,
1209 ) -> Result<(), SessionError> {
1210 self.inner
1211 .set_session_client(id, ReplaySanitizingAgentLlmClient::wrap(client))
1212 .await
1213 }
1214 async fn hot_swap_session_llm_identity(
1215 &self,
1216 id: &meerkat_core::types::SessionId,
1217 client: Arc<dyn meerkat_core::AgentLlmClient>,
1218 identity: meerkat_core::session::SessionLlmIdentity,
1219 request_policy: meerkat_core::SessionLlmRequestPolicy,
1220 ) -> Result<(), SessionError> {
1221 self.inner
1222 .hot_swap_session_llm_identity(
1223 id,
1224 ReplaySanitizingAgentLlmClient::wrap(client),
1225 identity,
1226 request_policy,
1227 )
1228 .await
1229 }
1230 async fn update_session_keep_alive(
1231 &self,
1232 id: &meerkat_core::types::SessionId,
1233 keep_alive: bool,
1234 ) -> Result<(), SessionError> {
1235 self.inner.update_session_keep_alive(id, keep_alive).await
1236 }
1237 async fn update_session_mob_authority_context(
1238 &self,
1239 id: &meerkat_core::types::SessionId,
1240 authority_context: Option<meerkat_core::service::MobToolAuthorityContext>,
1241 ) -> Result<(), SessionError> {
1242 self.inner
1243 .update_session_mob_authority_context(id, authority_context)
1244 .await
1245 }
1246 async fn has_live_session(
1247 &self,
1248 id: &meerkat_core::types::SessionId,
1249 ) -> Result<bool, SessionError> {
1250 self.inner.has_live_session(id).await
1251 }
1252 async fn set_session_tool_visibility_state(
1253 &self,
1254 id: &meerkat_core::types::SessionId,
1255 state: Option<meerkat_core::SessionToolVisibilityState>,
1256 ) -> Result<(), SessionError> {
1257 self.inner
1258 .set_session_tool_visibility_state(id, state)
1259 .await
1260 }
1261 async fn set_session_tool_filter(
1262 &self,
1263 id: &meerkat_core::types::SessionId,
1264 filter: meerkat_core::ToolFilter,
1265 ) -> Result<(), SessionError> {
1266 self.inner.set_session_tool_filter(id, filter).await
1267 }
1268 async fn read(
1269 &self,
1270 id: &meerkat_core::types::SessionId,
1271 ) -> Result<meerkat_core::service::SessionView, SessionError> {
1272 self.inner.read(id).await
1273 }
1274 async fn list(
1275 &self,
1276 query: meerkat_core::service::SessionQuery,
1277 ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
1278 self.inner.list(query).await
1279 }
1280 async fn archive(
1281 &self,
1282 id: &meerkat_core::types::SessionId,
1283 ) -> Result<(), SessionError> {
1284 self.inner.archive(id).await
1285 }
1286 async fn subscribe_session_events(
1287 &self,
1288 id: &meerkat_core::types::SessionId,
1289 ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1290 meerkat_core::service::SessionService::subscribe_session_events(
1291 self.inner.as_ref(),
1292 id,
1293 )
1294 .await
1295 }
1296 }
1297
1298 #[async_trait]
1299 impl meerkat_core::service::SessionServiceCommsExt for $wrapper {
1300 async fn comms_runtime(
1301 &self,
1302 id: &meerkat_core::types::SessionId,
1303 ) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
1304 self.inner.comms_runtime(id).await
1305 }
1306
1307 async fn event_injector(
1308 &self,
1309 id: &meerkat_core::types::SessionId,
1310 ) -> Option<Arc<dyn meerkat_core::EventInjector>> {
1311 self.inner.event_injector(id).await
1312 }
1313
1314 async fn interaction_event_injector(
1315 &self,
1316 id: &meerkat_core::types::SessionId,
1317 ) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
1318 self.inner.interaction_event_injector(id).await
1319 }
1320 }
1321
1322 #[async_trait]
1323 impl meerkat_core::service::SessionServiceControlExt for $wrapper {
1324 async fn append_system_context(
1325 &self,
1326 id: &meerkat_core::types::SessionId,
1327 req: meerkat_core::service::AppendSystemContextRequest,
1328 ) -> Result<
1329 meerkat_core::service::AppendSystemContextResult,
1330 meerkat_core::service::SessionControlError,
1331 > {
1332 self.inner.append_system_context(id, req).await
1333 }
1334 async fn stage_tool_results(
1335 &self,
1336 id: &meerkat_core::types::SessionId,
1337 req: meerkat_core::service::StageToolResultsRequest,
1338 ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
1339 self.inner.stage_tool_results(id, req).await
1340 }
1341 }
1342
1343 #[async_trait]
1344 impl meerkat_core::service::SessionServiceHistoryExt for $wrapper {
1345 async fn read_history(
1346 &self,
1347 id: &meerkat_core::types::SessionId,
1348 query: meerkat_core::service::SessionHistoryQuery,
1349 ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
1350 self.inner.read_history(id, query).await
1351 }
1352 }
1353
1354 #[async_trait]
1355 impl MobSessionService for $wrapper {
1356 fn supports_persistent_sessions(&self) -> bool {
1357 self.inner.supports_persistent_sessions()
1358 }
1359 fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
1360 self.runtime_adapter_override
1361 .clone()
1362 .or_else(|| self.inner.runtime_adapter())
1363 }
1364 async fn interrupt_with_machine_authority(
1365 &self,
1366 session_id: &meerkat_core::types::SessionId,
1367 authority: meerkat_runtime::MachineSessionControlAuthority,
1368 ) -> Result<(), SessionError> {
1369 self.inner
1370 .interrupt_with_machine_authority(session_id, authority)
1371 .await
1372 }
1373 async fn cancel_after_boundary_with_machine_authority(
1374 &self,
1375 session_id: &meerkat_core::types::SessionId,
1376 authority: meerkat_runtime::MachineSessionControlAuthority,
1377 ) -> Result<(), SessionError> {
1378 self.inner
1379 .cancel_after_boundary_with_machine_authority(session_id, authority)
1380 .await
1381 }
1382 async fn session_belongs_to_mob(
1383 &self,
1384 session_id: &meerkat_core::types::SessionId,
1385 mob_id: &meerkat_mob::MobId,
1386 ) -> bool {
1387 self.inner.session_belongs_to_mob(session_id, mob_id).await
1388 }
1389 async fn load_persisted_session(
1390 &self,
1391 session_id: &meerkat_core::types::SessionId,
1392 ) -> Result<Option<meerkat_core::session::Session>, SessionError> {
1393 self.inner.load_persisted_session(session_id).await
1394 }
1395 async fn subscribe_session_events(
1396 &self,
1397 session_id: &meerkat_core::types::SessionId,
1398 ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1399 meerkat_mob::MobSessionService::subscribe_session_events(
1400 self.inner.as_ref(),
1401 session_id,
1402 )
1403 .await
1404 }
1405 async fn archive_with_mob_lifecycle_authority(
1406 &self,
1407 session_id: &meerkat_core::types::SessionId,
1408 ) -> Result<(), SessionError> {
1409 self.inner
1410 .archive_with_mob_lifecycle_authority(session_id)
1411 .await
1412 }
1413 async fn execution_snapshot(
1414 &self,
1415 session_id: &meerkat_core::types::SessionId,
1416 ) -> Result<Option<meerkat_core::agent::AgentExecutionSnapshot>, SessionError> {
1417 self.inner.execution_snapshot(session_id).await
1418 }
1419 async fn tool_scope_snapshot(
1420 &self,
1421 session_id: &meerkat_core::types::SessionId,
1422 ) -> Result<Option<meerkat_core::ToolScopeSnapshot>, SessionError> {
1423 self.inner.tool_scope_snapshot(session_id).await
1424 }
1425 async fn external_tool_surface_snapshot(
1426 &self,
1427 session_id: &meerkat_core::types::SessionId,
1428 ) -> Result<Option<meerkat_core::ExternalToolSurfaceSnapshot>, SessionError> {
1429 self.inner.external_tool_surface_snapshot(session_id).await
1430 }
1431 async fn peer_ingress_runtime_snapshot(
1432 &self,
1433 session_id: &meerkat_core::types::SessionId,
1434 ) -> Result<Option<meerkat_core::PeerIngressRuntimeSnapshot>, SessionError> {
1435 self.inner.peer_ingress_runtime_snapshot(session_id).await
1436 }
1437 async fn apply_runtime_turn(
1438 &self,
1439 session_id: &meerkat_core::types::SessionId,
1440 run_id: meerkat_core::lifecycle::RunId,
1441 req: meerkat_core::service::StartTurnRequest,
1442 boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1443 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1444 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1445 #[cfg(test)]
1446 let boundary_name = format!("{boundary:?}");
1447 #[cfg(test)]
1448 let contributing_count = contributing_input_ids.len();
1449 let run_id_for_log = run_id.to_string();
1450 let prompt_summary = if runtime_turn_diagnostics_enabled() {
1451 Some(summarize_runtime_prompt(&req.prompt))
1452 } else {
1453 None
1454 };
1455 if let Some(summary) = prompt_summary.as_ref() {
1456 tracing::warn!(
1457 session_id = %session_id,
1458 run_id = %run_id_for_log,
1459 boundary = ?boundary,
1460 contributing_inputs = contributing_input_ids.len(),
1461 prompt = %summary,
1462 runtime = ?req.runtime,
1463 "mobkit runtime turn start"
1464 );
1465 }
1466 let result = self
1467 .inner
1468 .apply_runtime_turn(
1469 session_id,
1470 run_id,
1471 normalize_runtime_turn_request(req),
1472 boundary,
1473 contributing_input_ids,
1474 )
1475 .await;
1476 #[cfg(test)]
1477 record_runtime_turn_trace(RuntimeTurnTrace {
1478 session_id: session_id.to_string(),
1479 boundary: boundary_name,
1480 contributing_input_count: contributing_count,
1481 outcome: match &result {
1482 Ok(_) => "ok".to_string(),
1483 Err(error) => format!("err:{error}"),
1484 },
1485 });
1486 if runtime_turn_diagnostics_enabled() {
1487 match &result {
1488 Ok(_) => tracing::warn!(
1489 session_id = %session_id,
1490 run_id = %run_id_for_log,
1491 "mobkit runtime turn ok"
1492 ),
1493 Err(error) => tracing::error!(
1494 session_id = %session_id,
1495 run_id = %run_id_for_log,
1496 error = %error,
1497 error_debug = ?error,
1498 "mobkit runtime turn error"
1499 ),
1500 }
1501 }
1502 result
1503 }
1504 async fn apply_runtime_context_appends(
1505 &self,
1506 session_id: &meerkat_core::types::SessionId,
1507 run_id: meerkat_core::lifecycle::RunId,
1508 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1509 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1510 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1511 self.inner
1512 .apply_runtime_context_appends(
1513 session_id,
1514 run_id,
1515 appends,
1516 contributing_input_ids,
1517 )
1518 .await
1519 }
1520 async fn apply_runtime_context_appends_with_boundary(
1521 &self,
1522 session_id: &meerkat_core::types::SessionId,
1523 run_id: meerkat_core::lifecycle::RunId,
1524 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1525 boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1526 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1527 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1528 self.inner
1529 .apply_runtime_context_appends_with_boundary(
1530 session_id,
1531 run_id,
1532 appends,
1533 boundary,
1534 contributing_input_ids,
1535 )
1536 .await
1537 }
1538 async fn apply_runtime_system_context_for_turn(
1539 &self,
1540 session_id: &meerkat_core::types::SessionId,
1541 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1542 ) -> Result<(), SessionError> {
1543 self.inner
1544 .apply_runtime_system_context_for_turn(session_id, appends)
1545 .await
1546 }
1547 async fn stage_runtime_system_context_for_active_turn(
1548 &self,
1549 session_id: &meerkat_core::types::SessionId,
1550 expected_run_id: &meerkat_core::lifecycle::RunId,
1551 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1552 ) -> Result<Option<Vec<u8>>, SessionError> {
1553 self.inner
1554 .stage_runtime_system_context_for_active_turn(
1555 session_id,
1556 expected_run_id,
1557 appends,
1558 )
1559 .await
1560 }
1561 async fn discard_runtime_system_context_for_active_turn(
1562 &self,
1563 session_id: &meerkat_core::types::SessionId,
1564 expected_run_id: &meerkat_core::lifecycle::RunId,
1565 idempotency_keys: Vec<String>,
1566 ) -> Result<(), SessionError> {
1567 self.inner
1568 .discard_runtime_system_context_for_active_turn(
1569 session_id,
1570 expected_run_id,
1571 idempotency_keys,
1572 )
1573 .await
1574 }
1575 async fn active_turn_system_context_boundary_available(
1576 &self,
1577 session_id: &meerkat_core::types::SessionId,
1578 ) -> Result<Option<bool>, SessionError> {
1579 self.inner
1580 .active_turn_system_context_boundary_available(session_id)
1581 .await
1582 }
1583 async fn discard_live_session(
1584 &self,
1585 session_id: &meerkat_core::types::SessionId,
1586 ) -> Result<(), SessionError> {
1587 self.inner.discard_live_session(session_id).await
1588 }
1589 async fn checkpoint_committed_runtime_session_snapshot(
1590 &self,
1591 session_id: &meerkat_core::types::SessionId,
1592 session_snapshot: &[u8],
1593 ) -> Result<(), SessionError> {
1594 self.inner
1595 .checkpoint_committed_runtime_session_snapshot(session_id, session_snapshot)
1596 .await
1597 }
1598 async fn cancel_all_checkpointers(&self) {
1599 self.inner.cancel_all_checkpointers().await;
1600 }
1601 async fn rearm_all_checkpointers(&self) {
1602 self.inner.rearm_all_checkpointers().await;
1603 }
1604 }
1605 };
1606}
1607
1608delegate_mob_session_service!(PreBuildMobSessionService);
1609
1610struct AfterCreateMobSessionService {
1615 inner: Arc<dyn MobSessionService>,
1616 after_hook: AfterCreateHook,
1617}
1618
1619#[async_trait]
1620impl meerkat_core::service::SessionService for AfterCreateMobSessionService {
1621 async fn create_session(
1622 &self,
1623 mut req: CreateSessionRequest,
1624 ) -> Result<meerkat_core::types::RunResult, SessionError> {
1625 sanitize_create_session_request_llm_override(&mut req);
1626 let ctx = SessionCreatedContext {
1636 model: req.model.clone(),
1637 labels: req.labels.clone().unwrap_or_default(),
1638 system_prompt: req.system_prompt.clone(),
1639 };
1640 let result = self.inner.create_session(req).await?;
1641 (self.after_hook)(result.session_id.clone(), ctx).await;
1642 Ok(result)
1643 }
1644 async fn start_turn(
1645 &self,
1646 id: &meerkat_core::types::SessionId,
1647 req: meerkat_core::service::StartTurnRequest,
1648 ) -> Result<meerkat_core::types::RunResult, SessionError> {
1649 self.inner.start_turn(id, req).await
1650 }
1651 async fn interrupt(&self, id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
1652 self.inner.interrupt(id).await
1653 }
1654 async fn cancel_after_boundary(
1655 &self,
1656 id: &meerkat_core::types::SessionId,
1657 ) -> Result<(), SessionError> {
1658 self.inner.cancel_after_boundary(id).await
1659 }
1660 async fn set_session_client(
1661 &self,
1662 id: &meerkat_core::types::SessionId,
1663 client: Arc<dyn meerkat_core::AgentLlmClient>,
1664 ) -> Result<(), SessionError> {
1665 self.inner
1666 .set_session_client(id, ReplaySanitizingAgentLlmClient::wrap(client))
1667 .await
1668 }
1669 async fn hot_swap_session_llm_identity(
1670 &self,
1671 id: &meerkat_core::types::SessionId,
1672 client: Arc<dyn meerkat_core::AgentLlmClient>,
1673 identity: meerkat_core::session::SessionLlmIdentity,
1674 request_policy: meerkat_core::SessionLlmRequestPolicy,
1675 ) -> Result<(), SessionError> {
1676 self.inner
1677 .hot_swap_session_llm_identity(
1678 id,
1679 ReplaySanitizingAgentLlmClient::wrap(client),
1680 identity,
1681 request_policy,
1682 )
1683 .await
1684 }
1685 async fn update_session_keep_alive(
1686 &self,
1687 id: &meerkat_core::types::SessionId,
1688 keep_alive: bool,
1689 ) -> Result<(), SessionError> {
1690 self.inner.update_session_keep_alive(id, keep_alive).await
1691 }
1692 async fn update_session_mob_authority_context(
1693 &self,
1694 id: &meerkat_core::types::SessionId,
1695 authority_context: Option<meerkat_core::service::MobToolAuthorityContext>,
1696 ) -> Result<(), SessionError> {
1697 self.inner
1698 .update_session_mob_authority_context(id, authority_context)
1699 .await
1700 }
1701 async fn has_live_session(
1702 &self,
1703 id: &meerkat_core::types::SessionId,
1704 ) -> Result<bool, SessionError> {
1705 self.inner.has_live_session(id).await
1706 }
1707 async fn set_session_tool_visibility_state(
1708 &self,
1709 id: &meerkat_core::types::SessionId,
1710 state: Option<meerkat_core::SessionToolVisibilityState>,
1711 ) -> Result<(), SessionError> {
1712 self.inner
1713 .set_session_tool_visibility_state(id, state)
1714 .await
1715 }
1716 async fn set_session_tool_filter(
1717 &self,
1718 id: &meerkat_core::types::SessionId,
1719 filter: meerkat_core::ToolFilter,
1720 ) -> Result<(), SessionError> {
1721 self.inner.set_session_tool_filter(id, filter).await
1722 }
1723 async fn read(
1724 &self,
1725 id: &meerkat_core::types::SessionId,
1726 ) -> Result<meerkat_core::service::SessionView, SessionError> {
1727 self.inner.read(id).await
1728 }
1729 async fn list(
1730 &self,
1731 query: meerkat_core::service::SessionQuery,
1732 ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
1733 self.inner.list(query).await
1734 }
1735 async fn archive(&self, id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
1736 self.inner.archive(id).await
1737 }
1738 async fn subscribe_session_events(
1739 &self,
1740 id: &meerkat_core::types::SessionId,
1741 ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1742 meerkat_core::service::SessionService::subscribe_session_events(self.inner.as_ref(), id)
1743 .await
1744 }
1745}
1746
1747#[async_trait]
1748impl meerkat_core::service::SessionServiceCommsExt for AfterCreateMobSessionService {
1749 async fn comms_runtime(
1750 &self,
1751 id: &meerkat_core::types::SessionId,
1752 ) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
1753 self.inner.comms_runtime(id).await
1754 }
1755
1756 async fn event_injector(
1757 &self,
1758 id: &meerkat_core::types::SessionId,
1759 ) -> Option<Arc<dyn meerkat_core::EventInjector>> {
1760 self.inner.event_injector(id).await
1761 }
1762
1763 async fn interaction_event_injector(
1764 &self,
1765 id: &meerkat_core::types::SessionId,
1766 ) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
1767 self.inner.interaction_event_injector(id).await
1768 }
1769}
1770
1771#[async_trait]
1772impl meerkat_core::service::SessionServiceControlExt for AfterCreateMobSessionService {
1773 async fn append_system_context(
1774 &self,
1775 id: &meerkat_core::types::SessionId,
1776 req: meerkat_core::service::AppendSystemContextRequest,
1777 ) -> Result<
1778 meerkat_core::service::AppendSystemContextResult,
1779 meerkat_core::service::SessionControlError,
1780 > {
1781 self.inner.append_system_context(id, req).await
1782 }
1783 async fn stage_tool_results(
1784 &self,
1785 id: &meerkat_core::types::SessionId,
1786 req: meerkat_core::service::StageToolResultsRequest,
1787 ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
1788 self.inner.stage_tool_results(id, req).await
1789 }
1790}
1791
1792#[async_trait]
1793impl meerkat_core::service::SessionServiceHistoryExt for AfterCreateMobSessionService {
1794 async fn read_history(
1795 &self,
1796 id: &meerkat_core::types::SessionId,
1797 query: meerkat_core::service::SessionHistoryQuery,
1798 ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
1799 self.inner.read_history(id, query).await
1800 }
1801}
1802
1803#[async_trait]
1804impl MobSessionService for AfterCreateMobSessionService {
1805 fn supports_persistent_sessions(&self) -> bool {
1806 self.inner.supports_persistent_sessions()
1807 }
1808 fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
1809 self.inner.runtime_adapter()
1810 }
1811 async fn interrupt_with_machine_authority(
1812 &self,
1813 session_id: &meerkat_core::types::SessionId,
1814 authority: meerkat_runtime::MachineSessionControlAuthority,
1815 ) -> Result<(), SessionError> {
1816 self.inner
1817 .interrupt_with_machine_authority(session_id, authority)
1818 .await
1819 }
1820 async fn cancel_after_boundary_with_machine_authority(
1821 &self,
1822 session_id: &meerkat_core::types::SessionId,
1823 authority: meerkat_runtime::MachineSessionControlAuthority,
1824 ) -> Result<(), SessionError> {
1825 self.inner
1826 .cancel_after_boundary_with_machine_authority(session_id, authority)
1827 .await
1828 }
1829 async fn session_belongs_to_mob(
1830 &self,
1831 session_id: &meerkat_core::types::SessionId,
1832 mob_id: &meerkat_mob::MobId,
1833 ) -> bool {
1834 self.inner.session_belongs_to_mob(session_id, mob_id).await
1835 }
1836 async fn load_persisted_session(
1837 &self,
1838 session_id: &meerkat_core::types::SessionId,
1839 ) -> Result<Option<meerkat_core::session::Session>, SessionError> {
1840 self.inner.load_persisted_session(session_id).await
1841 }
1842 async fn subscribe_session_events(
1843 &self,
1844 session_id: &meerkat_core::types::SessionId,
1845 ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1846 meerkat_mob::MobSessionService::subscribe_session_events(self.inner.as_ref(), session_id)
1847 .await
1848 }
1849 async fn archive_with_mob_lifecycle_authority(
1850 &self,
1851 session_id: &meerkat_core::types::SessionId,
1852 ) -> Result<(), SessionError> {
1853 self.inner
1854 .archive_with_mob_lifecycle_authority(session_id)
1855 .await
1856 }
1857 async fn execution_snapshot(
1858 &self,
1859 session_id: &meerkat_core::types::SessionId,
1860 ) -> Result<Option<meerkat_core::agent::AgentExecutionSnapshot>, SessionError> {
1861 self.inner.execution_snapshot(session_id).await
1862 }
1863 async fn tool_scope_snapshot(
1864 &self,
1865 session_id: &meerkat_core::types::SessionId,
1866 ) -> Result<Option<meerkat_core::ToolScopeSnapshot>, SessionError> {
1867 self.inner.tool_scope_snapshot(session_id).await
1868 }
1869 async fn external_tool_surface_snapshot(
1870 &self,
1871 session_id: &meerkat_core::types::SessionId,
1872 ) -> Result<Option<meerkat_core::ExternalToolSurfaceSnapshot>, SessionError> {
1873 self.inner.external_tool_surface_snapshot(session_id).await
1874 }
1875 async fn peer_ingress_runtime_snapshot(
1876 &self,
1877 session_id: &meerkat_core::types::SessionId,
1878 ) -> Result<Option<meerkat_core::PeerIngressRuntimeSnapshot>, SessionError> {
1879 self.inner.peer_ingress_runtime_snapshot(session_id).await
1880 }
1881 async fn apply_runtime_turn(
1882 &self,
1883 session_id: &meerkat_core::types::SessionId,
1884 run_id: meerkat_core::lifecycle::RunId,
1885 req: meerkat_core::service::StartTurnRequest,
1886 boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1887 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1888 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1889 self.inner
1890 .apply_runtime_turn(session_id, run_id, req, boundary, contributing_input_ids)
1891 .await
1892 }
1893 async fn apply_runtime_context_appends(
1894 &self,
1895 session_id: &meerkat_core::types::SessionId,
1896 run_id: meerkat_core::lifecycle::RunId,
1897 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1898 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1899 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1900 self.inner
1901 .apply_runtime_context_appends(session_id, run_id, appends, contributing_input_ids)
1902 .await
1903 }
1904 async fn apply_runtime_context_appends_with_boundary(
1905 &self,
1906 session_id: &meerkat_core::types::SessionId,
1907 run_id: meerkat_core::lifecycle::RunId,
1908 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1909 boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1910 contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1911 ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1912 self.inner
1913 .apply_runtime_context_appends_with_boundary(
1914 session_id,
1915 run_id,
1916 appends,
1917 boundary,
1918 contributing_input_ids,
1919 )
1920 .await
1921 }
1922 async fn apply_runtime_system_context_for_turn(
1923 &self,
1924 session_id: &meerkat_core::types::SessionId,
1925 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1926 ) -> Result<(), SessionError> {
1927 self.inner
1928 .apply_runtime_system_context_for_turn(session_id, appends)
1929 .await
1930 }
1931 async fn stage_runtime_system_context_for_active_turn(
1932 &self,
1933 session_id: &meerkat_core::types::SessionId,
1934 expected_run_id: &meerkat_core::lifecycle::RunId,
1935 appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1936 ) -> Result<Option<Vec<u8>>, SessionError> {
1937 self.inner
1938 .stage_runtime_system_context_for_active_turn(session_id, expected_run_id, appends)
1939 .await
1940 }
1941 async fn discard_runtime_system_context_for_active_turn(
1942 &self,
1943 session_id: &meerkat_core::types::SessionId,
1944 expected_run_id: &meerkat_core::lifecycle::RunId,
1945 idempotency_keys: Vec<String>,
1946 ) -> Result<(), SessionError> {
1947 self.inner
1948 .discard_runtime_system_context_for_active_turn(
1949 session_id,
1950 expected_run_id,
1951 idempotency_keys,
1952 )
1953 .await
1954 }
1955 async fn active_turn_system_context_boundary_available(
1956 &self,
1957 session_id: &meerkat_core::types::SessionId,
1958 ) -> Result<Option<bool>, SessionError> {
1959 self.inner
1960 .active_turn_system_context_boundary_available(session_id)
1961 .await
1962 }
1963 async fn discard_live_session(
1964 &self,
1965 session_id: &meerkat_core::types::SessionId,
1966 ) -> Result<(), SessionError> {
1967 self.inner.discard_live_session(session_id).await
1968 }
1969 async fn checkpoint_committed_runtime_session_snapshot(
1970 &self,
1971 session_id: &meerkat_core::types::SessionId,
1972 session_snapshot: &[u8],
1973 ) -> Result<(), SessionError> {
1974 self.inner
1975 .checkpoint_committed_runtime_session_snapshot(session_id, session_snapshot)
1976 .await
1977 }
1978 async fn cancel_all_checkpointers(&self) {
1979 self.inner.cancel_all_checkpointers().await;
1980 }
1981 async fn rearm_all_checkpointers(&self) {
1982 self.inner.rearm_all_checkpointers().await;
1983 }
1984}
1985
1986pub struct MobBootstrapSpec {
1988 pub definition: MobDefinition,
1989 pub storage: MobStorage,
1990 pub session_service: Arc<dyn MobSessionService>,
1991 pub binary_blob_store: Option<Arc<dyn BinaryBlobStore>>,
1992 pub(crate) agent_mob_mcp_state: Option<Arc<meerkat_mob_mcp::MobMcpState>>,
1993 pub(crate) implicit_delegate_retirement_overrides: Option<ImplicitDelegateRetirementOverrides>,
1994 pub(crate) agent_mob_default_llm_client_slot: Option<SharedDefaultLlmClientSlot>,
1995 pub options: MobBootstrapOptions,
1996 pub runtime_adapter: Option<Arc<meerkat_runtime::MeerkatMachine>>,
2002 pub(crate) _ephemeral_dir: Option<Arc<tempfile::TempDir>>,
2005}
2006
2007impl MobBootstrapSpec {
2008 pub fn new(
2009 definition: MobDefinition,
2010 storage: MobStorage,
2011 session_service: Arc<dyn MobSessionService>,
2012 ) -> Self {
2013 let session_service = Arc::new(PreBuildMobSessionService {
2014 inner: session_service,
2015 hook: no_op_pre_build_hook(),
2016 after_create_hook: None,
2017 runtime_adapter_override: None,
2018 }) as Arc<dyn MobSessionService>;
2019 Self {
2020 definition,
2021 storage,
2022 session_service,
2023 binary_blob_store: None,
2024 agent_mob_mcp_state: None,
2025 implicit_delegate_retirement_overrides: None,
2026 agent_mob_default_llm_client_slot: None,
2027 options: MobBootstrapOptions {
2028 allow_ephemeral_sessions: true,
2029 notify_orchestrator_on_resume: true,
2030 default_llm_client: None,
2031 },
2032 runtime_adapter: None,
2033 _ephemeral_dir: None,
2034 }
2035 }
2036
2037 pub fn with_options(mut self, options: MobBootstrapOptions) -> Self {
2038 self.options = options;
2039 self
2040 }
2041
2042 pub fn with_session_runtime_adapter(
2050 mut self,
2051 adapter: Arc<meerkat_runtime::MeerkatMachine>,
2052 ) -> Self {
2053 self.session_service = Arc::new(PreBuildMobSessionService {
2054 inner: self.session_service,
2055 hook: no_op_pre_build_hook(),
2056 after_create_hook: None,
2057 runtime_adapter_override: Some(adapter),
2058 });
2059 self
2060 }
2061
2062 pub fn with_after_create_hook(mut self, hook: AfterCreateHook) -> Self {
2068 self.session_service = Arc::new(AfterCreateMobSessionService {
2069 inner: self.session_service,
2070 after_hook: hook,
2071 });
2072 self
2073 }
2074
2075 pub fn ephemeral(
2080 definition: MobDefinition,
2081 storage: MobStorage,
2082 store_path: PathBuf,
2083 max_sessions: usize,
2084 session_store: Option<Arc<dyn AgentSessionStore>>,
2085 ) -> Self {
2086 Self::ephemeral_inner(
2087 definition,
2088 storage,
2089 store_path,
2090 max_sessions,
2091 session_store,
2092 None,
2093 CapabilityFlags::default(),
2094 None,
2095 None,
2096 )
2097 }
2098
2099 pub fn ephemeral_with_hook(
2103 definition: MobDefinition,
2104 storage: MobStorage,
2105 store_path: PathBuf,
2106 max_sessions: usize,
2107 session_store: Option<Arc<dyn AgentSessionStore>>,
2108 hook: impl Fn(
2109 &mut CreateSessionRequest,
2110 ) -> std::pin::Pin<
2111 Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
2112 > + Send
2113 + Sync
2114 + 'static,
2115 ) -> Self {
2116 Self::ephemeral_inner(
2117 definition,
2118 storage,
2119 store_path,
2120 max_sessions,
2121 session_store,
2122 Some(Arc::new(hook)),
2123 CapabilityFlags::default(),
2124 None,
2125 None,
2126 )
2127 }
2128
2129 #[allow(clippy::too_many_arguments)]
2130 pub(crate) fn ephemeral_inner(
2131 definition: MobDefinition,
2132 storage: MobStorage,
2133 store_path: PathBuf,
2134 max_sessions: usize,
2135 session_store: Option<Arc<dyn AgentSessionStore>>,
2136 hook: Option<PreBuildHook>,
2137 mut caps: CapabilityFlags,
2138 after_create_hook: Option<AfterCreateHook>,
2139 agent_config: Option<Config>,
2140 ) -> Self {
2141 caps.image_generation |= mob_definition_may_use_image_generation(&definition);
2142 let binary_blob_store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
2143 let blob_store: Arc<dyn meerkat_core::BlobStore> =
2144 Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
2145 let runtime_adapter = if caps.image_generation {
2146 let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2147 Arc::new(meerkat_runtime::InMemoryRuntimeStore::new());
2148 Some(Arc::new(meerkat_runtime::MeerkatMachine::persistent(
2149 runtime_store,
2150 Arc::clone(&blob_store),
2151 )))
2152 } else {
2153 None
2154 };
2155 let mut factory = AgentFactory::new(&store_path)
2156 .builtins(caps.builtins)
2157 .shell(caps.shell)
2158 .mob(caps.mob)
2159 .comms(caps.comms)
2160 .memory(caps.memory);
2161 if let Some(machine) = runtime_adapter.clone() {
2162 factory = factory.with_image_generation_machine(machine);
2163 }
2164 let config = agent_config.unwrap_or_default();
2165 let mut builder = FactoryAgentBuilder::new(factory, config);
2166 builder.default_blob_store = Some(blob_store);
2167 if let Some(store) = session_store {
2168 builder.default_session_store = Some(store);
2169 }
2170 let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
2171 let session_service: Arc<dyn MobSessionService> = Arc::new(
2172 meerkat_session::EphemeralSessionService::new(builder, max_sessions),
2173 );
2174 let hook = hook.unwrap_or_else(no_op_pre_build_hook);
2175 let after_create_hook = if let Some(runtime_adapter) = runtime_adapter.clone() {
2176 let user_after_create_hook = after_create_hook.clone();
2177 Some(Arc::new(
2178 move |session_id: meerkat_core::types::SessionId, ctx: SessionCreatedContext| {
2179 let runtime_adapter = runtime_adapter.clone();
2180 let user_after_create_hook = user_after_create_hook.clone();
2181 Box::pin(async move {
2182 runtime_adapter.register_session(session_id.clone()).await;
2183 if let Some(user_after_create_hook) = user_after_create_hook {
2184 user_after_create_hook(session_id, ctx).await;
2185 }
2186 })
2187 as std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
2188 },
2189 ) as AfterCreateHook)
2190 } else {
2191 after_create_hook
2192 };
2193 let session_service = Arc::new(PreBuildMobSessionService {
2194 inner: session_service,
2195 hook,
2196 after_create_hook,
2197 runtime_adapter_override: runtime_adapter.clone(),
2198 }) as Arc<dyn MobSessionService>;
2199 let (
2200 agent_mob_mcp_state,
2201 implicit_delegate_retirement_overrides,
2202 agent_mob_default_llm_client_slot,
2203 ) = install_agent_mob_tools(&definition, mob_tools_slot, Arc::clone(&session_service));
2204 let mut spec = Self::new(definition, storage, session_service);
2205 spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
2206 spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
2207 spec.agent_mob_default_llm_client_slot = Some(agent_mob_default_llm_client_slot);
2208 spec.runtime_adapter = runtime_adapter;
2209 spec.binary_blob_store = Some(binary_blob_store);
2210 spec
2211 }
2212
2213 pub fn persistent(
2220 definition: MobDefinition,
2221 storage: MobStorage,
2222 store_path: PathBuf,
2223 max_sessions: usize,
2224 session_store: Arc<dyn SessionStore>,
2225 ) -> Self {
2226 Self::persistent_inner(
2227 definition,
2228 storage,
2229 store_path,
2230 max_sessions,
2231 session_store,
2232 None,
2233 None,
2234 CapabilityFlags::default(),
2235 None,
2236 None,
2237 )
2238 }
2239
2240 pub fn persistent_with_hook(
2244 definition: MobDefinition,
2245 storage: MobStorage,
2246 store_path: PathBuf,
2247 max_sessions: usize,
2248 session_store: Arc<dyn SessionStore>,
2249 hook: impl Fn(
2250 &mut CreateSessionRequest,
2251 ) -> std::pin::Pin<
2252 Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
2253 > + Send
2254 + Sync
2255 + 'static,
2256 ) -> Self {
2257 Self::persistent_inner(
2258 definition,
2259 storage,
2260 store_path,
2261 max_sessions,
2262 session_store,
2263 None,
2264 Some(Arc::new(hook)),
2265 CapabilityFlags::default(),
2266 None,
2267 None,
2268 )
2269 }
2270
2271 #[allow(clippy::too_many_arguments)]
2272 pub(crate) fn persistent_inner(
2273 definition: MobDefinition,
2274 storage: MobStorage,
2275 store_path: PathBuf,
2276 max_sessions: usize,
2277 session_store: Arc<dyn SessionStore>,
2278 custom_blob_store: Option<Arc<dyn meerkat_core::BlobStore>>,
2279 hook: Option<PreBuildHook>,
2280 mut caps: CapabilityFlags,
2281 after_create_hook: Option<AfterCreateHook>,
2282 agent_config: Option<Config>,
2283 ) -> Self {
2284 caps.image_generation |= mob_definition_may_use_image_generation(&definition);
2285 let (binary_blob_store, blob_store): (
2286 Arc<dyn BinaryBlobStore>,
2287 Arc<dyn meerkat_core::BlobStore>,
2288 ) = if let Some(blob_store) = custom_blob_store {
2289 (
2290 Arc::new(BinaryBlobStoreAdapter::new(blob_store.clone())),
2291 blob_store,
2292 )
2293 } else {
2294 let binary_blob_store: Arc<dyn BinaryBlobStore> = match ObjectStoreBlobStore::local(
2295 store_path.join("blobs"),
2296 ) {
2297 Ok(store) => Arc::new(store),
2298 Err(err) => {
2299 tracing::warn!(
2300 error = %err,
2301 "failed to initialize persistent binary blob store; falling back to in-memory blobs"
2302 );
2303 Arc::new(ObjectStoreBlobStore::memory())
2304 }
2305 };
2306 let blob_store: Arc<dyn meerkat_core::BlobStore> =
2307 Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
2308 (binary_blob_store, blob_store)
2309 };
2310 let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2320 build_persistent_runtime_store(&store_path);
2321 let runtime_adapter = Arc::new(meerkat_runtime::MeerkatMachine::persistent(
2322 Arc::clone(&runtime_store),
2323 Arc::clone(&blob_store),
2324 ));
2325 let mut factory = AgentFactory::new(&store_path)
2326 .builtins(caps.builtins)
2327 .shell(caps.shell)
2328 .mob(caps.mob)
2329 .comms(caps.comms)
2330 .memory(caps.memory);
2331 if caps.image_generation {
2332 factory = factory.with_image_generation_machine(runtime_adapter.clone());
2333 }
2334 let config = agent_config.unwrap_or_default();
2335 let mut builder = FactoryAgentBuilder::new(factory, config);
2336 builder.default_session_store = Some(Arc::new(StoreAdapter::new(session_store.clone())));
2337 builder.default_blob_store = Some(blob_store.clone());
2338 let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
2339 let session_service: Arc<dyn MobSessionService> =
2340 Arc::new(meerkat_session::PersistentSessionService::new(
2341 builder,
2342 max_sessions,
2343 session_store,
2344 Some(runtime_store),
2345 blob_store,
2346 ));
2347 let hook = hook.unwrap_or_else(no_op_pre_build_hook);
2348 let session_service = Arc::new(PreBuildMobSessionService {
2349 inner: session_service,
2350 hook,
2351 after_create_hook,
2352 runtime_adapter_override: None,
2353 }) as Arc<dyn MobSessionService>;
2354 let (
2355 agent_mob_mcp_state,
2356 implicit_delegate_retirement_overrides,
2357 agent_mob_default_llm_client_slot,
2358 ) = install_agent_mob_tools(&definition, mob_tools_slot, Arc::clone(&session_service));
2359 let mut spec = Self::new(definition, storage, session_service);
2360 spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
2361 spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
2362 spec.agent_mob_default_llm_client_slot = Some(agent_mob_default_llm_client_slot);
2363 spec.runtime_adapter = Some(runtime_adapter);
2364 spec.binary_blob_store = Some(binary_blob_store);
2365 spec
2366 }
2367
2368 #[allow(clippy::too_many_arguments)]
2369 pub(crate) fn ephemeral_runtime_backed_inner(
2370 definition: MobDefinition,
2371 storage: MobStorage,
2372 store_path: PathBuf,
2373 max_sessions: usize,
2374 custom_session_store: Option<Arc<dyn SessionStore>>,
2375 custom_blob_store: Option<Arc<dyn meerkat_core::BlobStore>>,
2376 hook: Option<PreBuildHook>,
2377 mut caps: CapabilityFlags,
2378 after_create_hook: Option<AfterCreateHook>,
2379 agent_config: Option<Config>,
2380 ) -> Self {
2381 caps.image_generation |= mob_definition_may_use_image_generation(&definition);
2382 let config = agent_config.unwrap_or_default();
2383 let session_store: Arc<dyn SessionStore> = custom_session_store
2384 .clone()
2385 .unwrap_or_else(|| Arc::new(meerkat_store::MemoryStore::new()));
2386 let (binary_blob_store, blob_store): (
2387 Arc<dyn BinaryBlobStore>,
2388 Arc<dyn meerkat_core::BlobStore>,
2389 ) = if let Some(blob_store) = custom_blob_store {
2390 (
2391 Arc::new(BinaryBlobStoreAdapter::new(blob_store.clone())),
2392 blob_store,
2393 )
2394 } else {
2395 let binary_blob_store: Arc<dyn BinaryBlobStore> =
2396 Arc::new(ObjectStoreBlobStore::memory());
2397 let blob_store: Arc<dyn meerkat_core::BlobStore> =
2398 Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
2399 (binary_blob_store, blob_store)
2400 };
2401 let base_runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2409 Arc::new(meerkat_runtime::InMemoryRuntimeStore::new());
2410 let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2411 if let Some(custom_session_store) = custom_session_store.clone() {
2412 Arc::new(SessionStoreBackedRuntimeStore::new(
2413 Arc::clone(&base_runtime_store),
2414 custom_session_store,
2415 ))
2416 } else {
2417 Arc::clone(&base_runtime_store)
2418 };
2419 let runtime_adapter = Arc::new(meerkat_runtime::MeerkatMachine::persistent(
2420 Arc::clone(&runtime_store),
2421 Arc::clone(&blob_store),
2422 ));
2423 let mut factory = AgentFactory::new(&store_path)
2424 .builtins(caps.builtins)
2425 .shell(caps.shell)
2426 .mob(caps.mob)
2427 .comms(caps.comms)
2428 .memory(caps.memory);
2429 if caps.image_generation {
2430 factory = factory.with_image_generation_machine(runtime_adapter.clone());
2431 }
2432 let mut builder = FactoryAgentBuilder::new(factory, config);
2433 builder.default_session_store = Some(Arc::new(StoreAdapter::new(session_store.clone())));
2434 builder.default_blob_store = Some(blob_store.clone());
2435 let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
2436 let session_service: Arc<dyn MobSessionService> =
2437 if let Some(custom_session_store) = custom_session_store {
2438 Arc::new(meerkat_session::PersistentSessionService::new(
2439 builder,
2440 max_sessions,
2441 custom_session_store,
2442 Some(runtime_store.clone()),
2443 blob_store,
2444 ))
2445 } else {
2446 Arc::new(meerkat_session::EphemeralSessionService::new(
2447 builder,
2448 max_sessions,
2449 ))
2450 };
2451 let hook = hook.unwrap_or_else(no_op_pre_build_hook);
2452 let runtime_adapter_for_after_create = runtime_adapter.clone();
2453 let combined_after_create_hook: AfterCreateHook = Arc::new(move |session_id, ctx| {
2454 let runtime_adapter = runtime_adapter_for_after_create.clone();
2455 let after_create_hook = after_create_hook.clone();
2456 Box::pin(async move {
2457 runtime_adapter.register_session(session_id.clone()).await;
2458 if let Some(after_create_hook) = after_create_hook {
2459 after_create_hook(session_id, ctx).await;
2460 }
2461 })
2462 });
2463 let session_service = Arc::new(PreBuildMobSessionService {
2464 inner: session_service,
2465 hook,
2466 after_create_hook: Some(combined_after_create_hook),
2467 runtime_adapter_override: Some(runtime_adapter.clone()),
2468 }) as Arc<dyn MobSessionService>;
2469 let (
2470 agent_mob_mcp_state,
2471 implicit_delegate_retirement_overrides,
2472 agent_mob_default_llm_client_slot,
2473 ) = install_agent_mob_tools(&definition, mob_tools_slot, Arc::clone(&session_service));
2474 let mut spec = Self::new(definition, storage, session_service);
2475 spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
2476 spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
2477 spec.agent_mob_default_llm_client_slot = Some(agent_mob_default_llm_client_slot);
2478 spec.runtime_adapter = Some(runtime_adapter);
2479 spec.binary_blob_store = Some(binary_blob_store);
2480 spec
2481 }
2482}
2483
2484#[derive(Debug)]
2486pub enum MobRuntimeError {
2487 Mob(MobError),
2488 InvalidInput(&'static str),
2489 InvalidConfig(String),
2490}
2491
2492impl std::fmt::Display for MobRuntimeError {
2493 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2494 match self {
2495 Self::Mob(err) => write!(f, "{err}"),
2496 Self::InvalidInput(message) => write!(f, "{message}"),
2497 Self::InvalidConfig(message) => write!(f, "{message}"),
2498 }
2499 }
2500}
2501
2502impl std::error::Error for MobRuntimeError {}
2503
2504impl From<MobError> for MobRuntimeError {
2505 fn from(value: MobError) -> Self {
2506 Self::Mob(value)
2507 }
2508}
2509
2510#[derive(Clone, Debug)]
2519pub struct SessionCreatedContext {
2520 pub model: String,
2521 pub labels: std::collections::BTreeMap<String, String>,
2522 pub system_prompt: Option<String>,
2523}
2524
2525#[async_trait]
2532pub trait SessionHook: Send + Sync {
2533 async fn before_create(&self, _req: &mut CreateSessionRequest) -> Result<(), SessionError> {
2537 Ok(())
2538 }
2539
2540 async fn after_create(
2543 &self,
2544 _session_id: &meerkat_core::types::SessionId,
2545 _ctx: &SessionCreatedContext,
2546 ) {
2547 }
2548}
2549
2550#[derive(Clone, Copy, Debug)]
2552pub struct CapabilityFlags {
2553 pub builtins: bool,
2554 pub shell: bool,
2555 pub mob: bool,
2556 pub comms: bool,
2557 pub memory: bool,
2558 pub image_generation: bool,
2559}
2560
2561impl Default for CapabilityFlags {
2562 fn default() -> Self {
2563 Self {
2564 builtins: true,
2565 shell: true,
2566 mob: true,
2567 comms: true,
2568 memory: true,
2569 image_generation: false,
2570 }
2571 }
2572}
2573
2574pub type RealMobRuntime = MobRuntime;
2576
2577#[derive(Clone)]
2579pub struct MobRuntime {
2580 handle: MobHandle,
2581 session_service: Option<Arc<dyn MobSessionService>>,
2582 agent_mob_mcp_state: Option<Arc<meerkat_mob_mcp::MobMcpState>>,
2583 implicit_delegate_retirement_overrides: Option<ImplicitDelegateRetirementOverrides>,
2584 binary_blob_store: Option<Arc<dyn BinaryBlobStore>>,
2585 baseline_member_specs: Arc<tokio::sync::RwLock<Vec<SpawnMemberSpec>>>,
2586 _ephemeral_dir: Option<Arc<tempfile::TempDir>>,
2589}
2590
2591impl MobRuntime {
2592 pub async fn bootstrap(spec: MobBootstrapSpec) -> Result<Self, MobRuntimeError> {
2593 let ephemeral_dir = spec._ephemeral_dir.clone();
2594 let session_service = spec.session_service.clone();
2595 let binary_blob_store = spec.binary_blob_store.clone();
2596 let mob_id = spec.definition.id.clone();
2597 let agent_mob_mcp_state = spec.agent_mob_mcp_state.clone();
2598 let implicit_delegate_retirement_overrides =
2599 spec.implicit_delegate_retirement_overrides.clone();
2600 let default_llm_client = spec
2601 .options
2602 .default_llm_client
2603 .clone()
2604 .map(ReplaySanitizingLlmClient::wrap);
2605 if let Some(slot) = spec.agent_mob_default_llm_client_slot.as_ref() {
2606 *slot
2607 .write()
2608 .unwrap_or_else(std::sync::PoisonError::into_inner) = default_llm_client.clone();
2609 }
2610 let effective_runtime_adapter = spec
2611 .runtime_adapter
2612 .clone()
2613 .or_else(|| session_service.runtime_adapter());
2614
2615 let mut builder = MobBuilder::new(spec.definition, spec.storage);
2616
2617 if let Some(adapter) = effective_runtime_adapter {
2624 builder = builder.with_runtime_adapter(adapter);
2625 }
2626
2627 builder = builder
2628 .with_session_service(session_service.clone())
2629 .allow_ephemeral_sessions(spec.options.allow_ephemeral_sessions)
2630 .notify_orchestrator_on_resume(spec.options.notify_orchestrator_on_resume);
2631
2632 if let Some(client) = default_llm_client {
2633 builder = builder.with_default_llm_client(client);
2634 }
2635
2636 let handle = builder.create().await?;
2637 if let Some(state) = agent_mob_mcp_state.as_ref() {
2638 state.mob_insert_handle(mob_id, handle.clone()).await;
2639 }
2640 Ok(Self {
2641 handle,
2642 session_service: Some(session_service),
2643 agent_mob_mcp_state,
2644 implicit_delegate_retirement_overrides,
2645 binary_blob_store,
2646 baseline_member_specs: Arc::new(tokio::sync::RwLock::new(Vec::new())),
2647 _ephemeral_dir: ephemeral_dir,
2648 })
2649 }
2650
2651 pub fn from_handle(handle: MobHandle) -> Self {
2652 Self {
2653 handle,
2654 session_service: None,
2655 agent_mob_mcp_state: None,
2656 implicit_delegate_retirement_overrides: None,
2657 binary_blob_store: None,
2658 baseline_member_specs: Arc::new(tokio::sync::RwLock::new(Vec::new())),
2659 _ephemeral_dir: None,
2660 }
2661 }
2662
2663 pub fn handle(&self) -> MobHandle {
2664 self.handle.clone()
2665 }
2666
2667 pub fn agent_mob_mcp_state(&self) -> Option<Arc<meerkat_mob_mcp::MobMcpState>> {
2668 self.agent_mob_mcp_state.clone()
2669 }
2670
2671 pub(crate) fn implicit_delegate_retirement_overrides(
2672 &self,
2673 ) -> Option<ImplicitDelegateRetirementOverrides> {
2674 self.implicit_delegate_retirement_overrides.clone()
2675 }
2676
2677 pub async fn set_baseline_member_specs(&self, specs: Vec<SpawnMemberSpec>) {
2678 *self.baseline_member_specs.write().await = specs;
2679 }
2680
2681 pub async fn baseline_member_specs(&self) -> Vec<SpawnMemberSpec> {
2682 self.baseline_member_specs.read().await.clone()
2683 }
2684
2685 pub async fn read_session_history(
2686 &self,
2687 session_id_str: &str,
2688 offset: usize,
2689 limit: Option<usize>,
2690 ) -> Result<SessionHistoryPage, MobRuntimeError> {
2691 if session_id_str.trim().is_empty() {
2692 return Err(MobRuntimeError::InvalidInput(
2693 "session_id must not be empty",
2694 ));
2695 }
2696 let Some(session_service) = self.session_service.as_ref() else {
2697 return Err(MobRuntimeError::InvalidInput(
2698 "session history unavailable for this runtime",
2699 ));
2700 };
2701 let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2702 .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2703 SessionServiceHistoryExt::read_history(
2704 session_service.as_ref(),
2705 &session_id,
2706 SessionHistoryQuery { offset, limit },
2707 )
2708 .await
2709 .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))
2710 }
2711
2712 #[allow(dead_code)]
2713 pub(crate) async fn runtime_state_for_session(
2714 &self,
2715 session_id_str: &str,
2716 ) -> Result<Option<meerkat_runtime::RuntimeState>, MobRuntimeError> {
2717 if session_id_str.trim().is_empty() {
2718 return Err(MobRuntimeError::InvalidInput(
2719 "session_id must not be empty",
2720 ));
2721 }
2722 let Some(session_service) = self.session_service.as_ref() else {
2723 return Ok(None);
2724 };
2725 let Some(runtime_adapter) = session_service.runtime_adapter() else {
2726 return Ok(None);
2727 };
2728 let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2729 .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2730 let state = meerkat_runtime::service_ext::SessionServiceRuntimeExt::runtime_state(
2731 runtime_adapter.as_ref(),
2732 &session_id,
2733 )
2734 .await
2735 .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))?;
2736 Ok(Some(state))
2737 }
2738
2739 #[allow(dead_code)]
2740 pub(crate) async fn comms_runtime_for_session(
2741 &self,
2742 session_id_str: &str,
2743 ) -> Result<Option<Arc<dyn CommsRuntime>>, MobRuntimeError> {
2744 if session_id_str.trim().is_empty() {
2745 return Err(MobRuntimeError::InvalidInput(
2746 "session_id must not be empty",
2747 ));
2748 }
2749 let Some(session_service) = self.session_service.as_ref() else {
2750 return Ok(None);
2751 };
2752 let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2753 .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2754 Ok(
2755 meerkat_core::service::SessionServiceCommsExt::comms_runtime(
2756 session_service.as_ref(),
2757 &session_id,
2758 )
2759 .await,
2760 )
2761 }
2762
2763 #[allow(dead_code)]
2764 pub(crate) async fn active_input_ids_for_session(
2765 &self,
2766 session_id_str: &str,
2767 ) -> Result<Option<Vec<String>>, MobRuntimeError> {
2768 if session_id_str.trim().is_empty() {
2769 return Err(MobRuntimeError::InvalidInput(
2770 "session_id must not be empty",
2771 ));
2772 }
2773 let Some(session_service) = self.session_service.as_ref() else {
2774 return Ok(None);
2775 };
2776 let Some(runtime_adapter) = session_service.runtime_adapter() else {
2777 return Ok(None);
2778 };
2779 let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2780 .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2781 let input_ids = meerkat_runtime::service_ext::SessionServiceRuntimeExt::list_active_inputs(
2782 runtime_adapter.as_ref(),
2783 &session_id,
2784 )
2785 .await
2786 .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))?;
2787 Ok(Some(
2788 input_ids.into_iter().map(|id| id.to_string()).collect(),
2789 ))
2790 }
2791
2792 #[allow(dead_code)]
2793 pub(crate) async fn ensure_comms_drain_for_session(
2794 &self,
2795 session_id_str: &str,
2796 ) -> Result<Option<bool>, MobRuntimeError> {
2797 if session_id_str.trim().is_empty() {
2798 return Err(MobRuntimeError::InvalidInput(
2799 "session_id must not be empty",
2800 ));
2801 }
2802 let Some(session_service) = self.session_service.as_ref() else {
2803 return Ok(None);
2804 };
2805 let Some(runtime_adapter) = session_service.runtime_adapter() else {
2806 return Ok(None);
2807 };
2808 let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2809 .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2810 let comms_runtime = meerkat_core::service::SessionServiceCommsExt::comms_runtime(
2811 session_service.as_ref(),
2812 &session_id,
2813 )
2814 .await;
2815 if let Some(comms) = comms_runtime {
2816 let _handle = meerkat_runtime::comms_drain::spawn_comms_drain(
2817 runtime_adapter.clone(),
2818 session_id,
2819 comms,
2820 None,
2821 );
2822 Ok(Some(true))
2823 } else {
2824 Ok(Some(false))
2825 }
2826 }
2827
2828 pub fn session_service(&self) -> Option<&Arc<dyn MobSessionService>> {
2834 self.session_service.as_ref()
2835 }
2836
2837 pub fn binary_blob_store(&self) -> Option<Arc<dyn BinaryBlobStore>> {
2838 self.binary_blob_store.clone()
2839 }
2840}
2841
2842pub fn member_entry_to_json(entry: &meerkat_mob::runtime::MobMemberListEntry) -> serde_json::Value {
2849 serde_json::to_value(entry).unwrap_or(serde_json::Value::Null)
2850}
2851
2852pub fn content_input_has_images(content: &meerkat_core::ContentInput) -> bool {
2853 match content {
2854 meerkat_core::ContentInput::Text(_) => false,
2855 meerkat_core::ContentInput::Blocks(blocks) => blocks
2856 .iter()
2857 .any(|block| matches!(block, meerkat_core::ContentBlock::Image { .. })),
2858 }
2859}
2860
2861pub fn model_capabilities_for_model(
2862 provider: Provider,
2863 model: &str,
2864) -> crate::runtime::ConsoleModelCapabilities {
2865 let image_input = meerkat_core::model_profile::profile_for(provider, model)
2866 .map(|profile| profile.vision)
2867 .unwrap_or(false);
2868 crate::runtime::ConsoleModelCapabilities { image_input }
2869}
2870
2871pub fn model_capabilities_for_profile(
2872 profile: &Profile,
2873) -> crate::runtime::ConsoleModelCapabilities {
2874 let image_input = Provider::infer_from_model(&profile.model)
2875 .and_then(|provider| meerkat_core::model_profile::profile_for(provider, &profile.model))
2876 .map(|profile| profile.vision)
2877 .unwrap_or(false);
2878 crate::runtime::ConsoleModelCapabilities { image_input }
2879}
2880
2881pub fn model_capabilities_for_role(
2882 definition: &MobDefinition,
2883 role: &str,
2884) -> crate::runtime::ConsoleModelCapabilities {
2885 let profile_name = ProfileName::from(role);
2886 definition
2887 .resolve_inline_profile(&profile_name)
2888 .map(model_capabilities_for_profile)
2889 .unwrap_or(crate::runtime::ConsoleModelCapabilities { image_input: false })
2890}
2891
2892pub fn model_capabilities_for_member_entry(
2893 definition: &MobDefinition,
2894 entry: &meerkat_mob::runtime::MobMemberListEntry,
2895) -> crate::runtime::ConsoleModelCapabilities {
2896 model_capabilities_for_role(definition, entry.role.as_str())
2897}
2898
2899pub async fn model_capabilities_for_member(
2900 handle: &MobHandle,
2901 session_service: Option<&Arc<dyn MobSessionService>>,
2902 member_id: &meerkat_mob::ids::MeerkatId,
2903) -> crate::runtime::ConsoleModelCapabilities {
2904 if let Some(service) = session_service
2905 && let Some(session_id) = handle.resolve_bridge_session_id(member_id).await
2906 && let Ok(view) = service.read(&session_id).await
2907 {
2908 return model_capabilities_for_model(view.state.provider, &view.state.model);
2909 }
2910
2911 handle
2912 .get_member(member_id)
2913 .await
2914 .map(|member| model_capabilities_for_role(handle.definition(), member.role.as_str()))
2915 .unwrap_or(crate::runtime::ConsoleModelCapabilities { image_input: false })
2916}
2917
2918pub async fn assert_member_accepts_images(
2919 handle: &MobHandle,
2920 session_service: Option<&Arc<dyn MobSessionService>>,
2921 member_id: &str,
2922 content: &meerkat_core::ContentInput,
2923) -> Result<(), MobRuntimeError> {
2924 if !content_input_has_images(content) {
2925 return Ok(());
2926 }
2927 let mid = meerkat_mob::ids::MeerkatId::from(member_id);
2928 let Some(member) = handle.get_member(&mid).await else {
2929 return Err(MobRuntimeError::InvalidInput("member not found"));
2930 };
2931 let caps = model_capabilities_for_member(handle, session_service, &member.agent_identity).await;
2932 if caps.image_input {
2933 Ok(())
2934 } else {
2935 Err(MobRuntimeError::InvalidInput(
2936 "target member model cannot accept image input",
2937 ))
2938 }
2939}
2940
2941pub async fn send_message_on_mob(
2951 handle: &MobHandle,
2952 member_id: &str,
2953 content: impl Into<meerkat_core::ContentInput>,
2954) -> Result<String, MobRuntimeError> {
2955 send_message_on_mob_with_mode(
2956 handle,
2957 member_id,
2958 content,
2959 meerkat_core::types::HandlingMode::Queue,
2960 )
2961 .await
2962}
2963
2964pub async fn send_message_on_mob_with_mode(
2967 handle: &MobHandle,
2968 member_id: &str,
2969 content: impl Into<meerkat_core::ContentInput>,
2970 handling_mode: meerkat_core::types::HandlingMode,
2971) -> Result<String, MobRuntimeError> {
2972 if member_id.trim().is_empty() {
2973 return Err(MobRuntimeError::InvalidInput("member_id must not be empty"));
2974 }
2975 let content = content.into();
2976 let is_empty = match &content {
2977 meerkat_core::ContentInput::Text(s) => s.trim().is_empty(),
2978 meerkat_core::ContentInput::Blocks(blocks) => blocks.is_empty(),
2979 };
2980 if is_empty {
2981 return Err(MobRuntimeError::InvalidInput("content must not be empty"));
2982 }
2983 let mid = meerkat_mob::ids::MeerkatId::from(member_id);
2984 let _receipt = handle
2985 .member(&mid)
2986 .await?
2987 .send(content, handling_mode)
2988 .await?;
2989 let session_id = handle
2990 .resolve_bridge_session_id(&mid)
2991 .await
2992 .ok_or_else(|| {
2993 MobRuntimeError::Mob(MobError::Internal(
2994 "member has no bridge session after send".to_string(),
2995 ))
2996 })?;
2997 Ok(session_id.to_string())
2998}
2999
3000#[cfg(test)]
3001#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
3002mod tests {
3003 use super::*;
3004
3005 struct EmptyDispatcher;
3006
3007 #[async_trait::async_trait]
3008 impl meerkat_core::AgentToolDispatcher for EmptyDispatcher {
3009 fn tools(&self) -> Arc<[Arc<meerkat_core::types::ToolDef>]> {
3010 Vec::<Arc<meerkat_core::types::ToolDef>>::new().into()
3011 }
3012
3013 async fn dispatch(
3014 &self,
3015 call: meerkat_core::types::ToolCallView<'_>,
3016 ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
3017 Err(meerkat_core::ToolError::not_found(call.name))
3018 }
3019
3020 fn capabilities(&self) -> meerkat_core::agent::DispatcherCapabilities {
3021 meerkat_core::agent::DispatcherCapabilities::default()
3022 }
3023 }
3024
3025 fn wrapper_with_overrides(
3026 overrides: ImplicitDelegateRetirementOverrides,
3027 ) -> AutoWireParentMobToolDispatcher {
3028 AutoWireParentMobToolDispatcher {
3029 inner: Arc::new(EmptyDispatcher),
3030 implicit_delegate_retirement_overrides: overrides,
3031 }
3032 }
3033
3034 #[test]
3035 fn delegate_tool_schema_exposes_idle_retire_secs() {
3036 let tool = meerkat_core::types::ToolDef::new(
3037 "delegate",
3038 "Delegate work",
3039 serde_json::json!({
3040 "type": "object",
3041 "properties": {
3042 "task": {"type": "string"}
3043 },
3044 "required": ["task"]
3045 }),
3046 );
3047
3048 let patched = delegate_tool_def_with_idle_retire_secs(&tool);
3049 let idle_retire_secs = &patched.input_schema["properties"]["idle_retire_secs"];
3050
3051 assert!(patched.description.contains("IDLE RETIREMENT:"));
3052 assert_eq!(idle_retire_secs["anyOf"][0]["type"], "integer");
3053 assert_eq!(idle_retire_secs["anyOf"][0]["minimum"], 0);
3054 assert_eq!(idle_retire_secs["anyOf"][1]["type"], "null");
3055 }
3056
3057 #[test]
3058 fn mob_spawn_tool_schema_exposes_opt_in_idle_retire_secs() {
3059 let tool = meerkat_core::types::ToolDef::new(
3060 "mob_spawn_member",
3061 "Spawn member",
3062 serde_json::json!({
3063 "type": "object",
3064 "properties": {
3065 "profile": {"type": "string"},
3066 "member_id": {"type": "string"}
3067 },
3068 "required": ["profile", "member_id"]
3069 }),
3070 );
3071
3072 let patched = mob_spawn_tool_def_with_idle_retire_secs(&tool);
3073 let idle_retire_secs = &patched.input_schema["properties"]["idle_retire_secs"];
3074
3075 assert!(
3076 patched
3077 .description
3078 .contains("Omit idle_retire_secs to leave this spawned member out")
3079 );
3080 assert_eq!(idle_retire_secs["anyOf"][0]["type"], "integer");
3081 assert_eq!(idle_retire_secs["anyOf"][0]["minimum"], 0);
3082 assert_eq!(idle_retire_secs["anyOf"][1]["type"], "null");
3083 }
3084
3085 #[tokio::test]
3086 async fn auto_wire_wrapper_preserves_ops_lifecycle_binding() {
3087 use meerkat_core::AgentToolDispatcher;
3088 use std::sync::atomic::{AtomicBool, Ordering};
3089
3090 struct BindAwareDispatcher {
3091 bound: Arc<AtomicBool>,
3092 }
3093
3094 #[async_trait::async_trait]
3095 impl meerkat_core::AgentToolDispatcher for BindAwareDispatcher {
3096 fn tools(&self) -> Arc<[Arc<meerkat_core::types::ToolDef>]> {
3097 Vec::<Arc<meerkat_core::types::ToolDef>>::new().into()
3098 }
3099
3100 async fn dispatch(
3101 &self,
3102 call: meerkat_core::types::ToolCallView<'_>,
3103 ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
3104 Err(meerkat_core::ToolError::not_found(call.name))
3105 }
3106
3107 fn capabilities(&self) -> meerkat_core::agent::DispatcherCapabilities {
3108 meerkat_core::agent::DispatcherCapabilities {
3109 ops_lifecycle: true,
3110 }
3111 }
3112
3113 fn bind_ops_lifecycle(
3114 self: Arc<Self>,
3115 _registry: Arc<dyn meerkat_core::ops_lifecycle::OpsLifecycleRegistry>,
3116 _owner_bridge_session_id: meerkat_core::types::SessionId,
3117 ) -> Result<meerkat_core::agent::BindOutcome, meerkat_core::agent::OpsLifecycleBindError>
3118 {
3119 self.bound.store(true, Ordering::SeqCst);
3120 Ok(meerkat_core::agent::BindOutcome::Bound(self))
3121 }
3122 }
3123
3124 let bound = Arc::new(AtomicBool::new(false));
3125 let dispatcher = Arc::new(AutoWireParentMobToolDispatcher {
3126 inner: Arc::new(BindAwareDispatcher {
3127 bound: Arc::clone(&bound),
3128 }),
3129 implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides::default(),
3130 });
3131
3132 assert!(dispatcher.capabilities().ops_lifecycle);
3133 let outcome = dispatcher
3134 .bind_ops_lifecycle(
3135 Arc::new(meerkat_runtime::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
3136 meerkat_core::types::SessionId::new(),
3137 )
3138 .expect("wrapper should delegate ops lifecycle binding");
3139
3140 assert!(outcome.was_bound());
3141 assert!(bound.load(Ordering::SeqCst));
3142 assert!(outcome.into_dispatcher().capabilities().ops_lifecycle);
3143 }
3144
3145 #[test]
3146 fn delegate_idle_retire_secs_arg_is_stripped_and_parsed() {
3147 let mut args = serde_json::json!({
3148 "task": "inspect",
3149 "idle_retire_secs": 42
3150 });
3151
3152 let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
3153 .expect("valid idle retire arg");
3154
3155 assert_eq!(parsed, Some(DelegateIdleRetireOverride::Seconds(42)));
3156 assert!(args.get("idle_retire_secs").is_none());
3157 }
3158
3159 #[test]
3160 fn delegate_idle_retire_secs_null_disables_member_retirement() {
3161 let mut args = serde_json::json!({
3162 "task": "inspect",
3163 "idle_retire_secs": null
3164 });
3165
3166 let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
3167 .expect("valid idle retire arg");
3168
3169 assert_eq!(parsed, Some(DelegateIdleRetireOverride::Disabled));
3170 assert!(args.get("idle_retire_secs").is_none());
3171 }
3172
3173 #[test]
3174 fn delegate_idle_retire_secs_omitted_inherits_runtime_default() {
3175 let mut args = serde_json::json!({"task": "inspect"});
3176
3177 let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
3178 .expect("omitted idle retire arg");
3179
3180 assert_eq!(parsed, None);
3181 assert_eq!(args, serde_json::json!({"task": "inspect"}));
3182 }
3183
3184 #[test]
3185 fn delegate_idle_retire_secs_rejects_negative_or_fractional_values() {
3186 let mut negative = serde_json::json!({"task": "inspect", "idle_retire_secs": -1});
3187 let mut fractional = serde_json::json!({"task": "inspect", "idle_retire_secs": 1.5});
3188
3189 assert!(delegate_idle_retire_override_from_args("delegate", &mut negative).is_err());
3190 assert!(delegate_idle_retire_override_from_args("delegate", &mut fractional).is_err());
3191 }
3192
3193 #[test]
3194 fn mob_spawn_idle_retire_targets_use_args_when_result_omits_mob_id() {
3195 let args = serde_json::json!({
3196 "mob_id": "ob3",
3197 "profile": "review-worker",
3198 "member_id": "review-worker-vibe-forward",
3199 });
3200 let fallback_targets = idle_retire_targets_from_spawn_args(&args);
3201
3202 assert_eq!(
3203 fallback_targets,
3204 vec![IdleRetireTarget {
3205 mob_id: "ob3".to_string(),
3206 member_id: "review-worker-vibe-forward".to_string(),
3207 }]
3208 );
3209 assert_eq!(
3210 idle_retire_targets_from_outcome_text(
3211 r#"{"agent_identity":"review-worker-vibe-forward","member_ref":"opaque"}"#,
3212 &fallback_targets,
3213 ),
3214 fallback_targets
3215 );
3216 }
3217
3218 #[test]
3219 fn mob_spawn_idle_retire_targets_support_canonical_specs_shape() {
3220 let args = serde_json::json!({
3221 "mob_id": "ob3",
3222 "specs": [
3223 {"profile": "person-worker", "agent_identity": "person-worker-a"},
3224 {"profile": "person-worker", "member_id": "person-worker-b", "mob_id": "other"}
3225 ]
3226 });
3227 let fallback_targets = idle_retire_targets_from_spawn_args(&args);
3228
3229 assert_eq!(
3230 fallback_targets,
3231 vec![
3232 IdleRetireTarget {
3233 mob_id: "ob3".to_string(),
3234 member_id: "person-worker-a".to_string(),
3235 },
3236 IdleRetireTarget {
3237 mob_id: "other".to_string(),
3238 member_id: "person-worker-b".to_string(),
3239 },
3240 ]
3241 );
3242 assert_eq!(
3243 idle_retire_targets_from_outcome_text(
3244 r#"{"members":[{"agent_identity":"person-worker-a"},{"agent_identity":"person-worker-b","mob_id":"other"}]}"#,
3245 &fallback_targets,
3246 ),
3247 fallback_targets
3248 );
3249 }
3250
3251 #[tokio::test]
3252 async fn implicit_delegate_retirement_overrides_round_trip_per_member() {
3253 let overrides = ImplicitDelegateRetirementOverrides::default();
3254
3255 overrides
3256 .set("mob-a", "worker-1", DelegateIdleRetireOverride::Seconds(12))
3257 .await;
3258 overrides
3259 .set("mob-a", "worker-2", DelegateIdleRetireOverride::Disabled)
3260 .await;
3261
3262 assert_eq!(
3263 overrides.get("mob-a", "worker-1").await,
3264 Some(DelegateIdleRetireOverride::Seconds(12))
3265 );
3266 assert_eq!(
3267 overrides.get("mob-a", "worker-2").await,
3268 Some(DelegateIdleRetireOverride::Disabled)
3269 );
3270 assert_eq!(overrides.get("mob-a", "worker-3").await, None);
3271 }
3272
3273 #[tokio::test]
3274 async fn mob_spawn_idle_retire_registration_uses_spawn_args_when_result_omits_mob_id() {
3275 let overrides = ImplicitDelegateRetirementOverrides::default();
3276 let dispatcher = wrapper_with_overrides(overrides.clone());
3277 let fallback_targets = idle_retire_targets_from_spawn_args(&serde_json::json!({
3278 "mob_id": "ob3",
3279 "member_id": "review-worker-vibe-forward",
3280 }));
3281 let outcome =
3282 meerkat_core::ToolDispatchOutcome::sync_result(meerkat_core::types::ToolResult::new(
3283 "spawn-1".to_string(),
3284 r#"{"agent_identity":"review-worker-vibe-forward","member_ref":"opaque"}"#
3285 .to_string(),
3286 false,
3287 ));
3288
3289 dispatcher
3290 .register_idle_retire_override_from_outcome(
3291 &outcome,
3292 Some(DelegateIdleRetireOverride::Seconds(900)),
3293 &fallback_targets,
3294 )
3295 .await;
3296
3297 assert_eq!(
3298 overrides.get("ob3", "review-worker-vibe-forward").await,
3299 Some(DelegateIdleRetireOverride::Seconds(900))
3300 );
3301 }
3302
3303 #[test]
3304 fn image_generation_substrate_defaults_off_for_inline_profiles() {
3305 let definition = meerkat_mob::MobDefinition::from_toml(
3306 r#"
3307[mob]
3308id = "test"
3309
3310[profiles.worker]
3311model = "gpt-5.5"
3312
3313[profiles.worker.tools]
3314builtins = true
3315"#,
3316 )
3317 .unwrap_or_else(|e| panic!("{e}"));
3318
3319 assert!(
3320 !mob_definition_may_use_image_generation(&definition),
3321 "inline profiles should not wire the image substrate unless a profile opts in"
3322 );
3323 }
3324
3325 #[test]
3326 fn image_generation_substrate_follows_profile_tool_config() {
3327 let definition = meerkat_mob::MobDefinition::from_toml(
3328 r#"
3329[mob]
3330id = "test"
3331
3332[profiles.commander]
3333model = "gpt-5.5"
3334
3335[profiles.commander.tools]
3336builtins = true
3337image_generation = true
3338
3339[profiles.investigator]
3340model = "gpt-5.5"
3341
3342[profiles.investigator.tools]
3343builtins = true
3344image_generation = false
3345"#,
3346 )
3347 .unwrap_or_else(|e| panic!("{e}"));
3348
3349 let commander = definition.profiles["commander"].as_inline().unwrap();
3350 let investigator = definition.profiles["investigator"].as_inline().unwrap();
3351 assert!(commander.tools.image_generation);
3352 assert!(!investigator.tools.image_generation);
3353 assert!(
3354 mob_definition_may_use_image_generation(&definition),
3355 "one opt-in profile is enough to wire substrate; Meerkat gates visibility per profile"
3356 );
3357 }
3358
3359 #[test]
3360 fn image_generation_profiles_can_disable_builtins_with_meerkat_062() {
3361 let definition = meerkat_mob::MobDefinition::from_toml(
3362 r#"
3363[mob]
3364id = "test"
3365
3366[profiles.commander]
3367model = "gpt-5.5"
3368
3369[profiles.commander.tools]
3370builtins = false
3371image_generation = true
3372"#,
3373 )
3374 .unwrap_or_else(|e| panic!("{e}"));
3375
3376 let commander = definition.profiles["commander"].as_inline().unwrap();
3377 assert!(!commander.tools.builtins);
3378 assert!(commander.tools.image_generation);
3379 assert!(
3380 mob_definition_may_use_image_generation(&definition),
3381 "image generation now has its own Meerkat tool gate"
3382 );
3383 }
3384
3385 #[test]
3386 fn image_generation_substrate_is_conservative_for_realm_profile_refs() {
3387 let definition = meerkat_mob::MobDefinition::from_toml(
3388 r#"
3389[mob]
3390id = "test"
3391
3392[profiles.worker]
3393realm_profile = "worker-v2"
3394"#,
3395 )
3396 .unwrap_or_else(|e| panic!("{e}"));
3397
3398 assert!(
3399 mob_definition_may_use_image_generation(&definition),
3400 "realm profiles resolve at spawn time, so MobKit wires substrate and lets Meerkat enforce profile policy"
3401 );
3402 }
3403
3404 #[test]
3405 fn sanitize_llm_request_drops_replay_unsafe_server_tool_blocks() {
3406 let request = meerkat_client::LlmRequest::new(
3407 "gpt-5.5",
3408 vec![meerkat_core::Message::BlockAssistant(
3409 meerkat_core::BlockAssistantMessage::new(
3410 vec![
3411 meerkat_core::AssistantBlock::Text {
3412 text: "done".to_string(),
3413 meta: None,
3414 },
3415 meerkat_core::AssistantBlock::ServerToolContent {
3416 id: Some("ws-stream".to_string()),
3417 name: "web_search".to_string(),
3418 content: serde_json::json!({
3419 "type": "response.web_search_call.searching",
3420 "item_id": "ws_123"
3421 }),
3422 meta: None,
3423 },
3424 meerkat_core::AssistantBlock::ServerToolContent {
3425 id: Some("ws_123".to_string()),
3426 name: "web_search_call".to_string(),
3427 content: serde_json::json!({
3428 "type": "web_search_call",
3429 "id": "ws_123",
3430 "status": "completed"
3431 }),
3432 meta: None,
3433 },
3434 meerkat_core::AssistantBlock::ServerToolContent {
3435 id: None,
3436 name: "web_search_annotations".to_string(),
3437 content: serde_json::json!({
3438 "type": "message_annotations",
3439 "annotations": []
3440 }),
3441 meta: None,
3442 },
3443 ],
3444 meerkat_core::StopReason::EndTurn,
3445 ),
3446 )],
3447 );
3448
3449 let sanitized = sanitize_llm_request_for_stateless_replay(&request);
3450 let meerkat_core::Message::BlockAssistant(assistant) = &sanitized.messages[0] else {
3451 panic!("expected block assistant");
3452 };
3453
3454 assert_eq!(assistant.blocks.len(), 2);
3455 assert!(matches!(
3456 assistant.blocks[0],
3457 meerkat_core::AssistantBlock::Text { .. }
3458 ));
3459 assert!(matches!(
3460 assistant.blocks[1],
3461 meerkat_core::AssistantBlock::ServerToolContent { ref name, .. }
3462 if name == "web_search_call"
3463 ));
3464 }
3465
3466 #[test]
3467 fn sanitize_llm_request_preserves_generated_images_for_meerkat_062() {
3468 let request = meerkat_client::LlmRequest::new(
3469 "gpt-5.5",
3470 vec![meerkat_core::Message::BlockAssistant(
3471 meerkat_core::BlockAssistantMessage::new(
3472 vec![
3473 meerkat_core::AssistantBlock::Text {
3474 text: "visible".to_string(),
3475 meta: None,
3476 },
3477 generated_image_block_for_test(),
3478 ],
3479 meerkat_core::StopReason::EndTurn,
3480 ),
3481 )],
3482 );
3483
3484 let sanitized = sanitize_llm_request_for_stateless_replay(&request);
3485
3486 let meerkat_core::Message::BlockAssistant(original_assistant) = &request.messages[0] else {
3487 panic!("expected original block assistant");
3488 };
3489 assert!(
3490 original_assistant
3491 .blocks
3492 .iter()
3493 .any(|block| matches!(block, meerkat_core::AssistantBlock::Image { .. })),
3494 "request-view sanitization must not rewrite canonical caller-owned messages"
3495 );
3496
3497 let meerkat_core::Message::BlockAssistant(sanitized_assistant) = &sanitized.messages[0]
3498 else {
3499 panic!("expected sanitized block assistant");
3500 };
3501 assert!(
3502 sanitized_assistant
3503 .blocks
3504 .iter()
3505 .any(|block| matches!(block, meerkat_core::AssistantBlock::Image { .. })),
3506 "Meerkat 0.6.2 owns provider replay projection for generated images"
3507 );
3508 }
3509
3510 #[derive(Default)]
3511 struct CapturingLlmClient {
3512 projected_messages: std::sync::Mutex<Vec<meerkat_core::Message>>,
3513 }
3514
3515 #[async_trait]
3516 impl LlmClient for CapturingLlmClient {
3517 fn project_replay_messages(
3518 &self,
3519 messages: &[meerkat_core::Message],
3520 ) -> Result<Vec<meerkat_core::Message>, meerkat_client::LlmError> {
3521 *self
3522 .projected_messages
3523 .lock()
3524 .unwrap_or_else(std::sync::PoisonError::into_inner) = messages.to_vec();
3525 Ok(messages.to_vec())
3526 }
3527
3528 fn stream<'a>(&'a self, _request: &'a LlmRequest) -> LlmStream<'a> {
3529 Box::pin(futures::stream::iter([Ok(
3530 meerkat_client::LlmEvent::Done {
3531 outcome: meerkat_client::LlmDoneOutcome::Success {
3532 stop_reason: meerkat_core::StopReason::EndTurn,
3533 },
3534 },
3535 )]))
3536 }
3537
3538 fn provider(&self) -> &'static str {
3539 "openai"
3540 }
3541
3542 async fn health_check(&self) -> Result<(), meerkat_client::LlmError> {
3543 Ok(())
3544 }
3545 }
3546
3547 #[test]
3548 fn replay_sanitizing_llm_client_delegates_provider_projection() {
3549 let capture = Arc::new(CapturingLlmClient::default());
3550 let inner: Arc<dyn LlmClient> = capture.clone();
3551 let wrapped = ReplaySanitizingLlmClient::new(inner);
3552 let messages = vec![meerkat_core::Message::BlockAssistant(
3553 meerkat_core::BlockAssistantMessage::new(
3554 vec![
3555 meerkat_core::AssistantBlock::Text {
3556 text: "visible".to_string(),
3557 meta: None,
3558 },
3559 meerkat_core::AssistantBlock::ServerToolContent {
3560 id: Some("ws-stream".to_string()),
3561 name: "web_search".to_string(),
3562 content: serde_json::json!({
3563 "type": "response.web_search_call.searching",
3564 "item_id": "ws_123"
3565 }),
3566 meta: None,
3567 },
3568 ],
3569 meerkat_core::StopReason::EndTurn,
3570 ),
3571 )];
3572
3573 let projected = wrapped
3574 .project_replay_messages(&messages)
3575 .expect("wrapped client should delegate provider projection");
3576
3577 let seen = capture
3578 .projected_messages
3579 .lock()
3580 .unwrap_or_else(std::sync::PoisonError::into_inner)
3581 .clone();
3582 let meerkat_core::Message::BlockAssistant(assistant) = &seen[0] else {
3583 panic!("expected block assistant");
3584 };
3585 assert_eq!(
3586 assistant.blocks.len(),
3587 1,
3588 "MobKit sanitization must happen before Meerkat provider projection"
3589 );
3590 assert!(matches!(
3591 assistant.blocks[0],
3592 meerkat_core::AssistantBlock::Text { .. }
3593 ));
3594 assert_eq!(
3595 serde_json::to_value(&projected).expect("projected messages serialize"),
3596 serde_json::to_value(&seen).expect("seen messages serialize")
3597 );
3598 }
3599
3600 #[derive(Default)]
3601 struct CapturingAgentLlmClient {
3602 seen_messages: std::sync::Mutex<Vec<meerkat_core::Message>>,
3603 }
3604
3605 #[async_trait]
3606 impl meerkat_core::AgentLlmClient for CapturingAgentLlmClient {
3607 async fn stream_response(
3608 &self,
3609 messages: &[meerkat_core::Message],
3610 _tools: &[Arc<meerkat_core::ToolDef>],
3611 _max_tokens: u32,
3612 _temperature: Option<f32>,
3613 _provider_params: Option<
3614 &meerkat_core::lifecycle::run_primitive::ProviderParamsOverride,
3615 >,
3616 ) -> Result<meerkat_core::agent::LlmStreamResult, meerkat_core::AgentError> {
3617 *self
3618 .seen_messages
3619 .lock()
3620 .unwrap_or_else(std::sync::PoisonError::into_inner) = messages.to_vec();
3621 Ok(meerkat_core::agent::LlmStreamResult::new(
3622 Vec::new(),
3623 meerkat_core::StopReason::EndTurn,
3624 meerkat_core::Usage::default(),
3625 ))
3626 }
3627
3628 fn provider(&self) -> &'static str {
3629 "openai"
3630 }
3631
3632 fn model(&self) -> &'static str {
3633 "gpt-5.5"
3634 }
3635 }
3636
3637 #[tokio::test]
3638 async fn sanitize_agent_llm_client_drops_replay_unsafe_server_tool_blocks() {
3639 let capture = Arc::new(CapturingAgentLlmClient::default());
3640 let inner: Arc<dyn meerkat_core::AgentLlmClient> = capture.clone();
3641 let wrapped = ReplaySanitizingAgentLlmClient::wrap(inner);
3642 let messages = vec![meerkat_core::Message::BlockAssistant(
3643 meerkat_core::BlockAssistantMessage::new(
3644 vec![
3645 meerkat_core::AssistantBlock::Text {
3646 text: "visible".to_string(),
3647 meta: None,
3648 },
3649 meerkat_core::AssistantBlock::ServerToolContent {
3650 id: Some("ws-stream".to_string()),
3651 name: "web_search".to_string(),
3652 content: serde_json::json!({
3653 "type": "response.web_search_call.searching",
3654 "item_id": "ws_123"
3655 }),
3656 meta: None,
3657 },
3658 meerkat_core::AssistantBlock::ServerToolContent {
3659 id: Some("ok".to_string()),
3660 name: "web_search_call".to_string(),
3661 content: serde_json::json!({
3662 "type": "web_search_call",
3663 "id": "ws_123",
3664 "status": "completed"
3665 }),
3666 meta: None,
3667 },
3668 ],
3669 meerkat_core::StopReason::EndTurn,
3670 ),
3671 )];
3672 let tools: Vec<Arc<meerkat_core::ToolDef>> = Vec::new();
3673
3674 wrapped
3675 .stream_response(&messages, &tools, 512, None, None)
3676 .await
3677 .expect("wrapped client should delegate");
3678
3679 let seen = capture
3680 .seen_messages
3681 .lock()
3682 .unwrap_or_else(std::sync::PoisonError::into_inner)
3683 .clone();
3684 let meerkat_core::Message::BlockAssistant(assistant) = &seen[0] else {
3685 panic!("expected block assistant");
3686 };
3687 assert_eq!(assistant.blocks.len(), 2);
3688 assert!(matches!(
3689 assistant.blocks[0],
3690 meerkat_core::AssistantBlock::Text { .. }
3691 ));
3692 assert!(matches!(
3693 assistant.blocks[1],
3694 meerkat_core::AssistantBlock::ServerToolContent { ref name, .. }
3695 if name == "web_search_call"
3696 ));
3697 }
3698
3699 fn generated_image_block_for_test() -> meerkat_core::AssistantBlock {
3700 serde_json::from_value(serde_json::json!({
3701 "block_type": "image",
3702 "data": {
3703 "image_id": "00000000-0000-0000-0000-000000000051",
3704 "blob_ref": {
3705 "blob_id": "sha256:test-generated-image",
3706 "media_type": "image/png"
3707 },
3708 "media_type": "image/png",
3709 "width": 1024,
3710 "height": 1024,
3711 "revised_prompt": { "disposition": "not_requested" },
3712 "meta": { "provider": "not_emitted" }
3713 }
3714 }))
3715 .expect("test image block should deserialize")
3716 }
3717
3718 #[test]
3719 fn sanitize_message_preserves_assistant_image_blocks() {
3720 let message =
3721 meerkat_core::Message::BlockAssistant(meerkat_core::BlockAssistantMessage::new(
3722 vec![
3723 meerkat_core::AssistantBlock::Text {
3724 text: "Here is the image.".to_string(),
3725 meta: None,
3726 },
3727 generated_image_block_for_test(),
3728 ],
3729 meerkat_core::StopReason::EndTurn,
3730 ));
3731
3732 let sanitized = sanitize_message_for_stateless_replay(message);
3733 let meerkat_core::Message::BlockAssistant(assistant) = sanitized else {
3734 panic!("expected block assistant");
3735 };
3736
3737 assert_eq!(assistant.blocks.len(), 2);
3738 assert!(matches!(
3739 assistant.blocks[0],
3740 meerkat_core::AssistantBlock::Text { .. }
3741 ));
3742 assert!(
3743 matches!(
3744 assistant.blocks[1],
3745 meerkat_core::AssistantBlock::Image { .. }
3746 ),
3747 "generated image blocks should reach Meerkat's provider projection"
3748 );
3749 }
3750
3751 #[test]
3754 fn persistent_with_hook_wraps_session_service() {
3755 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3756 let store_path = dir.path().to_path_buf();
3757 let Ok(sqlite) = meerkat_store::SqliteSessionStore::open(store_path.join("sessions.db"))
3758 else {
3759 panic!("failed to open sqlite session store");
3760 };
3761 let session_store: Arc<dyn SessionStore> = Arc::new(sqlite);
3762 let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
3763 panic!("failed to parse minimal mob definition");
3764 };
3765
3766 let hook_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
3767 let hook_called_clone = hook_called.clone();
3768
3769 let spec = MobBootstrapSpec::persistent_with_hook(
3770 definition,
3771 meerkat_mob::MobStorage::in_memory(),
3772 store_path.clone(),
3773 4,
3774 session_store,
3775 move |_req: &mut CreateSessionRequest| {
3776 hook_called_clone.store(true, std::sync::atomic::Ordering::Relaxed);
3777 Box::pin(async { Ok(()) })
3778 },
3779 );
3780
3781 assert!(
3789 spec.runtime_adapter.is_some(),
3790 "persistent_with_hook must provide a runtime adapter via spec.runtime_adapter"
3791 );
3792 assert!(
3793 spec.session_service.runtime_adapter().is_some(),
3794 "session service must own a runtime_store so archive/retire don't \
3795 hit the store-only-projection rejection in meerkat-session"
3796 );
3797 assert!(
3798 store_path.join("runtime.sqlite").exists(),
3799 "persistent_inner must open a SqliteRuntimeStore at <store_path>/runtime.sqlite"
3800 );
3801
3802 assert!(
3807 !hook_called.load(std::sync::atomic::Ordering::Relaxed),
3808 "hook must not be called before create_session"
3809 );
3810 }
3811
3812 #[test]
3814 fn ephemeral_with_hook_creates_spec() {
3815 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3816 let store_path = dir.path().to_path_buf();
3817 let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
3818 panic!("failed to parse minimal mob definition");
3819 };
3820
3821 let hook_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
3822 let hook_called_clone = hook_called.clone();
3823
3824 let spec = MobBootstrapSpec::ephemeral_with_hook(
3825 definition,
3826 meerkat_mob::MobStorage::in_memory(),
3827 store_path,
3828 4,
3829 None,
3830 move |_req: &mut CreateSessionRequest| {
3831 hook_called_clone.store(true, std::sync::atomic::Ordering::Relaxed);
3832 Box::pin(async { Ok(()) })
3833 },
3834 );
3835
3836 assert!(spec.runtime_adapter.is_none());
3838
3839 assert!(
3841 !hook_called.load(std::sync::atomic::Ordering::Relaxed),
3842 "hook must not be called before create_session"
3843 );
3844 }
3845
3846 #[tokio::test]
3850 async fn pre_build_hook_mutates_create_session_request() {
3851 use std::sync::Mutex;
3852
3853 let captured = Arc::new(Mutex::new(None::<(String, Option<String>)>));
3854 let captured_clone = captured.clone();
3855
3856 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3858 let factory = AgentFactory::new(dir.path()).builtins(true);
3859 let config = Config::default();
3860 let builder = FactoryAgentBuilder::new(factory, config);
3861 let inner: Arc<dyn MobSessionService> =
3862 Arc::new(meerkat_session::EphemeralSessionService::new(builder, 4));
3863
3864 let hook: PreBuildHook = Arc::new(move |req: &mut CreateSessionRequest| {
3866 req.model = "hooked-model".to_string();
3867 req.system_prompt = Some("injected-prompt".to_string());
3868 let labels = req.labels.get_or_insert_with(Default::default);
3869 labels.insert("hook_label".to_string(), "hook_value".to_string());
3870 let mut lock = captured_clone
3872 .lock()
3873 .unwrap_or_else(std::sync::PoisonError::into_inner);
3874 *lock = Some((req.model.clone(), req.system_prompt.clone()));
3875 Box::pin(async { Ok(()) })
3876 });
3877 let wrapped = PreBuildMobSessionService {
3878 inner,
3879 hook,
3880 after_create_hook: None,
3881 runtime_adapter_override: None,
3882 };
3883
3884 let req = CreateSessionRequest {
3885 model: "original-model".to_string(),
3886 prompt: meerkat_core::ContentInput::Text("test".to_string()),
3887 render_metadata: None,
3888 system_prompt: None,
3889 max_tokens: None,
3890 event_tx: None,
3891 skill_references: None,
3892 initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
3893 build: None,
3894 labels: None,
3895 deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
3896 };
3897
3898 let _ = meerkat_core::service::SessionService::create_session(&wrapped, req).await;
3900
3901 let (model, prompt) = captured
3902 .lock()
3903 .unwrap_or_else(std::sync::PoisonError::into_inner)
3904 .clone()
3905 .expect("hook must have been called");
3906 assert_eq!(model, "hooked-model", "hook must mutate the model");
3907 assert_eq!(
3908 prompt.as_deref(),
3909 Some("injected-prompt"),
3910 "hook must set the system prompt"
3911 );
3912 }
3913
3914 #[derive(Default)]
3915 struct ForwardingProbe {
3916 calls: Mutex<Vec<&'static str>>,
3917 }
3918
3919 impl ForwardingProbe {
3920 fn record(&self, call: &'static str) {
3921 self.calls
3922 .lock()
3923 .unwrap_or_else(std::sync::PoisonError::into_inner)
3924 .push(call);
3925 }
3926
3927 fn calls(&self) -> Vec<&'static str> {
3928 self.calls
3929 .lock()
3930 .unwrap_or_else(std::sync::PoisonError::into_inner)
3931 .clone()
3932 }
3933 }
3934
3935 #[async_trait]
3936 impl meerkat_core::service::SessionService for ForwardingProbe {
3937 async fn create_session(
3938 &self,
3939 _req: CreateSessionRequest,
3940 ) -> Result<meerkat_core::types::RunResult, SessionError> {
3941 Err(SessionError::Unsupported("create_session".to_string()))
3942 }
3943
3944 async fn start_turn(
3945 &self,
3946 _id: &meerkat_core::types::SessionId,
3947 _req: meerkat_core::service::StartTurnRequest,
3948 ) -> Result<meerkat_core::types::RunResult, SessionError> {
3949 Err(SessionError::Unsupported("start_turn".to_string()))
3950 }
3951
3952 async fn interrupt(
3953 &self,
3954 _id: &meerkat_core::types::SessionId,
3955 ) -> Result<(), SessionError> {
3956 self.record("interrupt");
3957 Ok(())
3958 }
3959
3960 async fn read(
3961 &self,
3962 id: &meerkat_core::types::SessionId,
3963 ) -> Result<meerkat_core::service::SessionView, SessionError> {
3964 Err(SessionError::NotFound { id: id.clone() })
3965 }
3966
3967 async fn list(
3968 &self,
3969 _query: meerkat_core::service::SessionQuery,
3970 ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
3971 Ok(Vec::new())
3972 }
3973
3974 async fn archive(&self, _id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
3975 self.record("archive");
3976 Ok(())
3977 }
3978 }
3979
3980 #[async_trait]
3981 impl meerkat_core::service::SessionServiceCommsExt for ForwardingProbe {}
3982
3983 #[async_trait]
3984 impl meerkat_core::service::SessionServiceControlExt for ForwardingProbe {
3985 async fn append_system_context(
3986 &self,
3987 _id: &meerkat_core::types::SessionId,
3988 _req: meerkat_core::service::AppendSystemContextRequest,
3989 ) -> Result<
3990 meerkat_core::service::AppendSystemContextResult,
3991 meerkat_core::service::SessionControlError,
3992 > {
3993 self.record("append_system_context");
3994 Ok(meerkat_core::service::AppendSystemContextResult {
3995 status: meerkat_core::service::AppendSystemContextStatus::Applied,
3996 })
3997 }
3998
3999 async fn stage_tool_results(
4000 &self,
4001 _id: &meerkat_core::types::SessionId,
4002 _req: meerkat_core::service::StageToolResultsRequest,
4003 ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
4004 self.record("stage_tool_results");
4005 Ok(meerkat_core::service::StageToolResultsResult {
4006 accepted_result_count: 7,
4007 })
4008 }
4009 }
4010
4011 #[async_trait]
4012 impl meerkat_core::service::SessionServiceHistoryExt for ForwardingProbe {
4013 async fn read_history(
4014 &self,
4015 id: &meerkat_core::types::SessionId,
4016 _query: meerkat_core::service::SessionHistoryQuery,
4017 ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
4018 Err(SessionError::NotFound { id: id.clone() })
4019 }
4020 }
4021
4022 #[async_trait]
4023 impl MobSessionService for ForwardingProbe {
4024 fn supports_persistent_sessions(&self) -> bool {
4025 true
4026 }
4027
4028 fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
4029 Some(Arc::new(meerkat_runtime::MeerkatMachine::ephemeral()))
4030 }
4031
4032 async fn archive_with_mob_lifecycle_authority(
4033 &self,
4034 _session_id: &meerkat_core::types::SessionId,
4035 ) -> Result<(), SessionError> {
4036 self.record("archive_with_mob_lifecycle_authority");
4037 Ok(())
4038 }
4039
4040 async fn stage_runtime_system_context_for_active_turn(
4041 &self,
4042 _session_id: &meerkat_core::types::SessionId,
4043 _expected_run_id: &meerkat_core::lifecycle::RunId,
4044 _appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
4045 ) -> Result<Option<Vec<u8>>, SessionError> {
4046 self.record("stage_runtime_system_context_for_active_turn");
4047 Ok(Some(b"snapshot".to_vec()))
4048 }
4049
4050 async fn discard_runtime_system_context_for_active_turn(
4051 &self,
4052 _session_id: &meerkat_core::types::SessionId,
4053 _expected_run_id: &meerkat_core::lifecycle::RunId,
4054 _idempotency_keys: Vec<String>,
4055 ) -> Result<(), SessionError> {
4056 self.record("discard_runtime_system_context_for_active_turn");
4057 Ok(())
4058 }
4059
4060 async fn active_turn_system_context_boundary_available(
4061 &self,
4062 _session_id: &meerkat_core::types::SessionId,
4063 ) -> Result<Option<bool>, SessionError> {
4064 self.record("active_turn_system_context_boundary_available");
4065 Ok(Some(true))
4066 }
4067 }
4068
4069 #[tokio::test]
4070 async fn pre_build_wrapper_forwards_mob_authority_and_control_extensions() {
4071 let probe = Arc::new(ForwardingProbe::default());
4072 let inner: Arc<dyn MobSessionService> = probe.clone();
4073 let wrapped = PreBuildMobSessionService {
4074 inner,
4075 hook: no_op_pre_build_hook(),
4076 after_create_hook: None,
4077 runtime_adapter_override: Some(Arc::new(meerkat_runtime::MeerkatMachine::ephemeral())),
4078 };
4079 let session_id = meerkat_core::types::SessionId::new();
4080
4081 MobSessionService::archive_with_mob_lifecycle_authority(&wrapped, &session_id)
4082 .await
4083 .expect("archive_with_mob_lifecycle_authority should forward to inner service");
4084 let staged = meerkat_core::service::SessionServiceControlExt::stage_tool_results(
4085 &wrapped,
4086 &session_id,
4087 meerkat_core::service::StageToolResultsRequest {
4088 results: Vec::new(),
4089 },
4090 )
4091 .await
4092 .expect("stage_tool_results should forward to inner service");
4093
4094 assert_eq!(staged.accepted_result_count, 7);
4095 let boundary_available = wrapped
4096 .active_turn_system_context_boundary_available(&session_id)
4097 .await
4098 .expect("active-turn boundary probe should forward");
4099 assert_eq!(boundary_available, Some(true));
4100 let snapshot = wrapped
4101 .stage_runtime_system_context_for_active_turn(
4102 &session_id,
4103 &meerkat_core::lifecycle::RunId::new(),
4104 vec![meerkat_core::session::PendingSystemContextAppend {
4105 text: "steer".to_string(),
4106 source: Some("test".to_string()),
4107 idempotency_key: Some("test".to_string()),
4108 accepted_at: meerkat_core::time_compat::SystemTime::now(),
4109 }],
4110 )
4111 .await
4112 .expect("active-turn staging should forward");
4113 assert_eq!(snapshot.as_deref(), Some(&b"snapshot"[..]));
4114 wrapped
4115 .discard_runtime_system_context_for_active_turn(
4116 &session_id,
4117 &meerkat_core::lifecycle::RunId::new(),
4118 vec!["test".to_string()],
4119 )
4120 .await
4121 .expect("active-turn rollback should forward");
4122 assert_eq!(
4123 probe.calls(),
4124 vec![
4125 "archive_with_mob_lifecycle_authority",
4126 "stage_tool_results",
4127 "active_turn_system_context_boundary_available",
4128 "stage_runtime_system_context_for_active_turn",
4129 "discard_runtime_system_context_for_active_turn",
4130 ]
4131 );
4132 }
4133
4134 #[test]
4154 fn persistent_bootstrap_uses_sqlite_runtime_store() {
4155 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4156 let store_path = dir.path().to_path_buf();
4157 let Ok(sqlite) = meerkat_store::SqliteSessionStore::open(store_path.join("sessions.db"))
4158 else {
4159 panic!("failed to open sqlite session store");
4160 };
4161 let session_store: Arc<dyn SessionStore> = Arc::new(sqlite);
4162 let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
4163 panic!("failed to parse minimal mob definition");
4164 };
4165 let spec = MobBootstrapSpec::persistent(
4166 definition,
4167 meerkat_mob::MobStorage::in_memory(),
4168 store_path.clone(),
4169 4,
4170 session_store,
4171 );
4172 assert!(
4173 spec.runtime_adapter.is_some(),
4174 "persistent bootstrap must provide its own runtime adapter via spec.runtime_adapter"
4175 );
4176 assert!(
4177 spec.session_service.runtime_adapter().is_some(),
4178 "session service must own a runtime_store so archive/retire don't \
4179 hit the store-only-projection rejection"
4180 );
4181 assert!(
4182 store_path.join("runtime.sqlite").exists(),
4183 "persistent_inner must open a SqliteRuntimeStore at <store_path>/runtime.sqlite"
4184 );
4185 }
4186
4187 #[test]
4191 fn ephemeral_runtime_backed_uses_session_service_runtime_adapter() {
4192 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4193 let store_path = dir.path().to_path_buf();
4194 let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
4195 panic!("failed to parse minimal mob definition");
4196 };
4197 let spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4198 definition,
4199 meerkat_mob::MobStorage::in_memory(),
4200 store_path,
4201 4,
4202 None,
4203 None,
4204 None,
4205 CapabilityFlags::default(),
4206 None,
4207 None,
4208 );
4209 assert!(
4210 spec.runtime_adapter.is_some(),
4211 "ephemeral_runtime_backed_inner must expose the shared runtime authority"
4212 );
4213 assert!(
4214 spec.session_service.runtime_adapter().is_some(),
4215 "session service must still expose a runtime adapter so autonomous-host comms can wire"
4216 );
4217 }
4218
4219 #[tokio::test]
4220 async fn agent_mob_tools_expose_definition_profiles_as_realm_profiles() {
4221 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4222 let store_path = dir.path().to_path_buf();
4223 let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
4224 "[mob]\nid = \"test\"\n\n[profiles.investigation-worker]\nmodel = \"gpt-5.5\"\n[profiles.investigation-worker.tools]\ncomms = true\nmob = true\n\n[profiles.person-worker]\nmodel = \"gpt-5.5\"\n[profiles.person-worker.tools]\ncomms = true\n",
4225 ) else {
4226 panic!("failed to parse mob definition with worker profiles");
4227 };
4228
4229 let spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4230 definition,
4231 meerkat_mob::MobStorage::in_memory(),
4232 store_path,
4233 4,
4234 None,
4235 None,
4236 None,
4237 CapabilityFlags::default(),
4238 None,
4239 None,
4240 );
4241 let state = spec
4242 .agent_mob_mcp_state
4243 .expect("agent mob MCP state should be installed");
4244
4245 let profiles = state
4246 .realm_profile_list()
4247 .await
4248 .expect("definition profiles should list through agent mob tools");
4249 let names = profiles
4250 .iter()
4251 .map(|profile| profile.name.as_str())
4252 .collect::<Vec<_>>();
4253 assert!(
4254 names.contains(&"investigation-worker"),
4255 "definition profiles must be visible to mob_profile_list so agents can create mobs that reference them"
4256 );
4257 assert!(names.contains(&"person-worker"));
4258
4259 let worker = state
4260 .realm_profile_get("investigation-worker")
4261 .await
4262 .expect("definition profile lookup should succeed")
4263 .expect("definition profile should exist");
4264 assert_eq!(worker.profile.model, "gpt-5.5");
4265 assert_eq!(
4266 worker.revision, 0,
4267 "definition-backed profiles are immutable runtime seeds, not persisted realm revisions"
4268 );
4269 }
4270
4271 #[tokio::test]
4272 async fn agent_created_mobs_can_spawn_definition_seeded_realm_profiles() {
4273 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4274 let store_path = dir.path().to_path_buf();
4275 let Ok(parent_definition) = meerkat_mob::MobDefinition::from_toml(
4276 "[mob]\nid = \"parent\"\n\n[profiles.investigation-worker]\nmodel = \"gpt-5.5\"\n[profiles.investigation-worker.tools]\ncomms = true\nmob = true\n",
4277 ) else {
4278 panic!("failed to parse parent mob definition");
4279 };
4280
4281 let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4282 parent_definition,
4283 meerkat_mob::MobStorage::in_memory(),
4284 store_path,
4285 4,
4286 None,
4287 None,
4288 None,
4289 CapabilityFlags::default(),
4290 None,
4291 None,
4292 );
4293 spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
4294 let runtime = MobRuntime::bootstrap(spec)
4295 .await
4296 .unwrap_or_else(|e| panic!("{e}"));
4297 let state = runtime
4298 .agent_mob_mcp_state
4299 .clone()
4300 .expect("agent mob MCP state should be installed");
4301 let Ok(child_definition) = meerkat_mob::MobDefinition::from_toml(
4302 "[mob]\nid = \"child\"\n\n[profiles.investigation-worker]\nrealm_profile = \"investigation-worker\"\n",
4303 ) else {
4304 panic!("failed to parse child mob definition");
4305 };
4306
4307 let mob_id = state
4308 .mob_create_definition(child_definition)
4309 .await
4310 .expect("child mob should be created");
4311 state
4312 .mob_spawn_spec(
4313 &mob_id,
4314 SpawnMemberSpec::new(
4315 ProfileName::from("investigation-worker"),
4316 meerkat_mob::AgentIdentity::from("investigation-worker:one"),
4317 ),
4318 )
4319 .await
4320 .expect("created mob should resolve definition-seeded realm profile at spawn time");
4321 }
4322
4323 #[tokio::test]
4324 async fn ephemeral_runtime_backed_custom_session_store_persists_created_session() {
4325 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4326 let store_path = dir.path().to_path_buf();
4327 let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
4328 "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\n[profiles.worker.tools]\ncomms = true\n",
4329 ) else {
4330 panic!("failed to parse minimal mob definition");
4331 };
4332 let custom_store: Arc<dyn SessionStore> = Arc::new(meerkat_store::MemoryStore::new());
4333 let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4334 definition,
4335 meerkat_mob::MobStorage::in_memory(),
4336 store_path,
4337 4,
4338 Some(custom_store.clone()),
4339 None,
4340 None,
4341 CapabilityFlags::default(),
4342 None,
4343 None,
4344 );
4345 spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
4346
4347 let runtime = MobRuntime::bootstrap(spec)
4348 .await
4349 .unwrap_or_else(|e| panic!("{e}"));
4350 let mid = meerkat_mob::ids::MeerkatId::from("worker:one");
4351 runtime
4352 .handle
4353 .spawn_spec(SpawnMemberSpec::new(
4354 ProfileName::from("worker"),
4355 mid.clone(),
4356 ))
4357 .await
4358 .unwrap_or_else(|e| panic!("{e}"));
4359 let session_id = runtime
4360 .handle
4361 .resolve_bridge_session_id(&mid)
4362 .await
4363 .unwrap_or_else(|| panic!("spawned worker has no bridge session id"));
4364
4365 let stored = custom_store
4366 .load(&session_id)
4367 .await
4368 .unwrap_or_else(|e| panic!("{e}"));
4369 assert!(
4370 stored.is_some(),
4371 "ephemeral runtime-backed builds with a custom store must persist through that store"
4372 );
4373 }
4374
4375 #[tokio::test]
4376 async fn ephemeral_runtime_backed_custom_session_store_resumes_after_runtime_restart() {
4377 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4378 let store_path = dir.path().to_path_buf();
4379 let definition_toml = "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\n[profiles.worker.tools]\ncomms = true\n";
4380 let Ok(definition) = meerkat_mob::MobDefinition::from_toml(definition_toml) else {
4381 panic!("failed to parse minimal mob definition");
4382 };
4383 let custom_store: Arc<dyn SessionStore> = Arc::new(meerkat_store::MemoryStore::new());
4384 let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4385 definition,
4386 meerkat_mob::MobStorage::in_memory(),
4387 store_path.clone(),
4388 4,
4389 Some(custom_store.clone()),
4390 None,
4391 None,
4392 CapabilityFlags::default(),
4393 None,
4394 None,
4395 );
4396 spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
4397
4398 let runtime = MobRuntime::bootstrap(spec)
4399 .await
4400 .unwrap_or_else(|e| panic!("{e}"));
4401 let mid = meerkat_mob::ids::MeerkatId::from("worker:one");
4402 runtime
4403 .handle
4404 .spawn_spec(SpawnMemberSpec::new(
4405 ProfileName::from("worker"),
4406 mid.clone(),
4407 ))
4408 .await
4409 .unwrap_or_else(|e| panic!("{e}"));
4410 let session_id = runtime
4411 .handle
4412 .resolve_bridge_session_id(&mid)
4413 .await
4414 .unwrap_or_else(|| panic!("spawned worker has no bridge session id"));
4415 drop(runtime);
4416
4417 let Ok(definition) = meerkat_mob::MobDefinition::from_toml(definition_toml) else {
4418 panic!("failed to parse minimal mob definition");
4419 };
4420 let mut restarted_spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4421 definition,
4422 meerkat_mob::MobStorage::in_memory(),
4423 store_path,
4424 4,
4425 Some(custom_store),
4426 None,
4427 None,
4428 CapabilityFlags::default(),
4429 None,
4430 None,
4431 );
4432 restarted_spec.options.default_llm_client =
4433 Some(Arc::new(meerkat_client::TestClient::default()));
4434
4435 let restarted = MobRuntime::bootstrap(restarted_spec)
4436 .await
4437 .unwrap_or_else(|e| panic!("{e}"));
4438 let mut resume_spec = SpawnMemberSpec::new(ProfileName::from("worker"), mid.clone());
4439 resume_spec.launch_mode = meerkat_mob::MemberLaunchMode::Resume {
4440 bridge_session_id: session_id.clone(),
4441 };
4442 restarted
4443 .handle
4444 .spawn_spec(resume_spec)
4445 .await
4446 .unwrap_or_else(|e| panic!("resume should load the external session snapshot: {e}"));
4447
4448 let resumed_session_id = restarted
4449 .handle
4450 .resolve_bridge_session_id(&mid)
4451 .await
4452 .unwrap_or_else(|| panic!("resumed worker has no bridge session id"));
4453 assert_eq!(resumed_session_id, session_id);
4454 }
4455
4456 #[test]
4459 fn ephemeral_bootstrap_without_image_generation_stays_direct() {
4460 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4461 let store_path = dir.path().to_path_buf();
4462 let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
4463 "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\nruntime_mode = \"autonomous_host\"\n[profiles.worker.tools]\ncomms = true\n",
4464 ) else {
4465 panic!("failed to parse definition");
4466 };
4467 let spec = MobBootstrapSpec::ephemeral_inner(
4468 definition,
4469 meerkat_mob::MobStorage::in_memory(),
4470 store_path,
4471 4,
4472 None,
4473 None,
4474 CapabilityFlags::default(),
4475 None,
4476 None,
4477 );
4478 assert!(
4479 spec.runtime_adapter.is_none(),
4480 "public ephemeral builds only need a runtime adapter when the definition may use image generation"
4481 );
4482 }
4483
4484 #[test]
4489 fn ephemeral_bootstrap_with_image_generation_shares_runtime_adapter() {
4490 let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4491 let store_path = dir.path().to_path_buf();
4492 let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
4493 r#"
4494[mob]
4495id = "test"
4496
4497[profiles.commander]
4498model = "gpt-5.5"
4499
4500[profiles.commander.tools]
4501builtins = true
4502image_generation = true
4503"#,
4504 ) else {
4505 panic!("failed to parse image-generation definition");
4506 };
4507 let spec = MobBootstrapSpec::ephemeral(
4508 definition,
4509 meerkat_mob::MobStorage::in_memory(),
4510 store_path,
4511 4,
4512 None,
4513 );
4514 let spec_adapter = spec
4515 .runtime_adapter
4516 .as_ref()
4517 .expect("image-generation ephemeral builds must expose a runtime adapter");
4518 let service_adapter = spec
4519 .session_service
4520 .runtime_adapter()
4521 .expect("session service must expose the same runtime adapter");
4522 assert!(
4523 spec_adapter.shares_runtime_persistence_with(&service_adapter),
4524 "image-generation tool state and session state must share one runtime authority"
4525 );
4526 }
4527
4528 #[test]
4531 fn normalize_runtime_turn_request_strips_runtime_owned_semantics() {
4532 let req = meerkat_core::service::StartTurnRequest {
4533 prompt: meerkat_core::ContentInput::Text("checkpoint".to_string()),
4534 system_prompt: Some("system".to_string()),
4535 event_tx: None,
4536 runtime: meerkat_core::service::StartTurnRuntimeSemantics {
4537 render_metadata: Some(meerkat_core::types::RenderMetadata {
4538 class: meerkat_core::types::RenderClass::OpsProgress,
4539 salience: meerkat_core::types::RenderSalience::Urgent,
4540 }),
4541 handling_mode: meerkat_core::types::HandlingMode::Steer,
4542 skill_references: None,
4543 flow_tool_overlay: None,
4544 pre_turn_context_appends: Vec::new(),
4545 typed_turn_appends: Vec::new(),
4546 turn_metadata: None,
4547 },
4548 };
4549
4550 let expected_prompt = req.prompt.clone();
4551 let expected_system_prompt = req.system_prompt.clone();
4552
4553 let normalized = normalize_runtime_turn_request(req);
4554
4555 assert_eq!(
4556 normalized.runtime.handling_mode,
4557 meerkat_core::types::HandlingMode::Queue,
4558 "runtime-applied turns must downgrade Steer before reaching direct session services"
4559 );
4560 assert!(
4561 normalized.runtime.render_metadata.is_none(),
4562 "runtime-owned render metadata must not be forwarded through the direct agent path"
4563 );
4564 assert_eq!(normalized.prompt, expected_prompt);
4565 assert_eq!(normalized.system_prompt, expected_system_prompt);
4566 }
4567
4568 #[test]
4570 fn session_created_context_fields() {
4571 let ctx = SessionCreatedContext {
4572 model: "claude-sonnet-4-5".to_string(),
4573 labels: std::collections::BTreeMap::from([(
4574 "agent_type".to_string(),
4575 "lead".to_string(),
4576 )]),
4577 system_prompt: Some("You are a lead agent.".to_string()),
4578 };
4579 assert_eq!(ctx.model, "claude-sonnet-4-5");
4580 assert_eq!(ctx.labels["agent_type"], "lead");
4581 assert_eq!(ctx.system_prompt.as_deref(), Some("You are a lead agent."));
4582 }
4583
4584 #[tokio::test]
4586 async fn session_hook_default_impls_are_noop() {
4587 struct EmptyHook;
4588 #[async_trait]
4589 impl SessionHook for EmptyHook {}
4590
4591 let hook = EmptyHook;
4592 let mut req = CreateSessionRequest {
4593 model: "test".to_string(),
4594 prompt: meerkat_core::ContentInput::Text("test".to_string()),
4595 render_metadata: None,
4596 system_prompt: None,
4597 max_tokens: None,
4598 event_tx: None,
4599 skill_references: None,
4600 initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
4601 build: None,
4602 labels: None,
4603 deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
4604 };
4605 hook.before_create(&mut req).await.unwrap();
4607 let ctx = SessionCreatedContext {
4609 model: "test".to_string(),
4610 labels: Default::default(),
4611 system_prompt: None,
4612 };
4613 hook.after_create(&meerkat_core::types::SessionId::new(), &ctx)
4614 .await;
4615 }
4616
4617 #[tokio::test]
4619 async fn session_hook_before_create_can_abort() {
4620 struct AbortHook;
4621 #[async_trait]
4622 impl SessionHook for AbortHook {
4623 async fn before_create(
4624 &self,
4625 _req: &mut CreateSessionRequest,
4626 ) -> Result<(), SessionError> {
4627 Err(SessionError::Unsupported("hook abort".into()))
4628 }
4629 }
4630
4631 let hook = AbortHook;
4632 let mut req = CreateSessionRequest {
4633 model: "test".to_string(),
4634 prompt: meerkat_core::ContentInput::Text("test".to_string()),
4635 render_metadata: None,
4636 system_prompt: None,
4637 max_tokens: None,
4638 event_tx: None,
4639 skill_references: None,
4640 initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
4641 build: None,
4642 labels: None,
4643 deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
4644 };
4645 let result = hook.before_create(&mut req).await;
4646 assert!(result.is_err());
4647 }
4648
4649 #[tokio::test]
4651 async fn session_hook_before_create_mutates_request() {
4652 struct MutatingHook;
4653 #[async_trait]
4654 impl SessionHook for MutatingHook {
4655 async fn before_create(
4656 &self,
4657 req: &mut CreateSessionRequest,
4658 ) -> Result<(), SessionError> {
4659 req.model = "hook-overridden".to_string();
4660 req.system_prompt = Some("injected by hook".to_string());
4661 Ok(())
4662 }
4663 }
4664
4665 let hook = MutatingHook;
4666 let mut req = CreateSessionRequest {
4667 model: "original".to_string(),
4668 prompt: meerkat_core::ContentInput::Text("test".to_string()),
4669 render_metadata: None,
4670 system_prompt: None,
4671 max_tokens: None,
4672 event_tx: None,
4673 skill_references: None,
4674 initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
4675 build: None,
4676 labels: None,
4677 deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
4678 };
4679 hook.before_create(&mut req).await.unwrap();
4680 assert_eq!(req.model, "hook-overridden");
4681 assert_eq!(req.system_prompt.as_deref(), Some("injected by hook"));
4682 }
4683
4684 #[test]
4685 fn recoverable_lifecycle_cleanup_accepts_ambiguous_member_cleanup() {
4686 let error = "previous member cleanup ambiguous for member rt:deep-investigator:singleton:0";
4687
4688 assert!(is_previous_member_cleanup_ambiguous_error(error));
4689 assert!(is_recoverable_lifecycle_cleanup_error(error));
4690 }
4691
4692 #[test]
4693 fn recoverable_lifecycle_cleanup_preserves_archive_cancel_race() {
4694 let error = "internal error: disposal completed but ArchiveSession failed: \
4695 session error: agent error: Internal error: runtime cancel-before-retire failed \
4696 for 019e3c52-0f1b-73d3-a5c7-4b21c2bbf131: Runtime not ready: running";
4697
4698 assert!(is_recoverable_lifecycle_cleanup_error(error));
4699 }
4700
4701 #[test]
4702 fn recoverable_lifecycle_cleanup_rejects_unrelated_errors() {
4703 assert!(!is_recoverable_lifecycle_cleanup_error(
4704 "actor task dropped"
4705 ));
4706 assert!(!is_recoverable_lifecycle_cleanup_error(
4707 "model provider returned rate limit"
4708 ));
4709 }
4710}