Skip to main content

meerkat_mobkit/
mob_handle_runtime.rs

1//! Mob member lifecycle management — bootstrap, spawn, reconcile, and roster queries.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use meerkat::{AgentFactory, Config, FactoryAgentBuilder, SessionStore};
10use meerkat_client::types::LlmStream;
11use meerkat_client::{LlmClient, LlmRequest};
12use meerkat_core::agent::CommsRuntime;
13use meerkat_core::service::{
14    CreateSessionRequest, SessionError, SessionHistoryPage, SessionHistoryQuery,
15    SessionServiceHistoryExt,
16};
17use meerkat_core::{AgentSessionStore, AssistantBlock, Message, Provider};
18use meerkat_mob::{
19    MobBuilder, MobDefinition, MobError, MobHandle, MobSessionService, MobStorage, Profile,
20    ProfileName, SpawnMemberSpec,
21};
22use meerkat_runtime::input_state::StoredInputState;
23use meerkat_runtime::store::MachineLifecycleCommit;
24use meerkat_store::StoreAdapter;
25use serde_json::Value;
26
27use crate::blob_store::{
28    Base64BlobStoreAdapter, BinaryBlobStore, BinaryBlobStoreAdapter, ObjectStoreBlobStore,
29};
30
31pub(crate) const DELEGATE_IDLE_RETIRE_SECS_LABEL: &str = "implicit_delegate_idle_retire_secs";
32pub(crate) const DELEGATE_IDLE_RETIRE_DISABLED_LABEL: &str = "disabled";
33
34#[cfg(test)]
35use std::sync::{Mutex, OnceLock};
36
37/// Member state constant for active members.
38pub const MEMBER_STATE_ACTIVE: &str = "active";
39/// Member state constant for members transitioning to retired.
40pub const MEMBER_STATE_RETIRING: &str = "retiring";
41
42/// Options for bootstrapping a mob runtime.
43#[derive(Clone, Default)]
44pub struct MobBootstrapOptions {
45    pub allow_ephemeral_sessions: bool,
46    pub notify_orchestrator_on_resume: bool,
47    pub default_llm_client: Option<Arc<dyn LlmClient>>,
48}
49
50/// Wraps an LLM client and strips provider-emitted evidence blocks that are
51/// useful for UI/citation projection but unsafe to replay into the next
52/// stateless provider request.
53pub struct ReplaySanitizingLlmClient {
54    inner: Arc<dyn LlmClient>,
55}
56
57type SharedDefaultLlmClientSlot = Arc<std::sync::RwLock<Option<Arc<dyn LlmClient>>>>;
58
59impl ReplaySanitizingLlmClient {
60    pub fn new(inner: Arc<dyn LlmClient>) -> Self {
61        Self { inner }
62    }
63
64    pub fn wrap(inner: Arc<dyn LlmClient>) -> Arc<dyn LlmClient> {
65        Arc::new(Self::new(inner))
66    }
67}
68
69/// Agent-layer companion to [`ReplaySanitizingLlmClient`].
70///
71/// Meerkat session services can also receive already-adapted
72/// `AgentLlmClient`s through live replacement and hot-swap APIs. Sanitize at
73/// that boundary too so provider-emitted server tool telemetry is never
74/// replayed into the next stateless model request just because the client
75/// entered below the raw `LlmClient` adapter seam.
76pub struct ReplaySanitizingAgentLlmClient {
77    inner: Arc<dyn meerkat_core::AgentLlmClient>,
78}
79
80impl ReplaySanitizingAgentLlmClient {
81    pub fn new(inner: Arc<dyn meerkat_core::AgentLlmClient>) -> Self {
82        Self { inner }
83    }
84
85    pub fn wrap(
86        inner: Arc<dyn meerkat_core::AgentLlmClient>,
87    ) -> Arc<dyn meerkat_core::AgentLlmClient> {
88        Arc::new(Self::new(inner))
89    }
90}
91
92#[async_trait]
93impl meerkat_core::AgentLlmClient for ReplaySanitizingAgentLlmClient {
94    async fn stream_response(
95        &self,
96        messages: &[Message],
97        tools: &[Arc<meerkat_core::ToolDef>],
98        max_tokens: u32,
99        temperature: Option<f32>,
100        provider_params: Option<&meerkat_core::lifecycle::run_primitive::ProviderParamsOverride>,
101    ) -> Result<meerkat_core::agent::LlmStreamResult, meerkat_core::AgentError> {
102        let sanitized: Vec<Message> = messages
103            .iter()
104            .cloned()
105            .map(sanitize_message_for_stateless_replay)
106            .collect();
107        self.inner
108            .stream_response(&sanitized, tools, max_tokens, temperature, provider_params)
109            .await
110    }
111
112    fn provider(&self) -> &'static str {
113        self.inner.provider()
114    }
115
116    fn model(&self) -> &str {
117        self.inner.model()
118    }
119
120    fn compile_schema(
121        &self,
122        output_schema: &meerkat_core::OutputSchema,
123    ) -> Result<meerkat_core::schema::CompiledSchema, meerkat_core::schema::SchemaError> {
124        self.inner.compile_schema(output_schema)
125    }
126}
127
128#[async_trait]
129impl LlmClient for ReplaySanitizingLlmClient {
130    fn project_replay_messages(
131        &self,
132        messages: &[Message],
133    ) -> Result<Vec<Message>, meerkat_client::LlmError> {
134        let sanitized: Vec<Message> = messages
135            .iter()
136            .cloned()
137            .map(sanitize_message_for_stateless_replay)
138            .collect();
139        self.inner.project_replay_messages(&sanitized)
140    }
141
142    fn stream<'a>(&'a self, request: &'a LlmRequest) -> LlmStream<'a> {
143        let inner = Arc::clone(&self.inner);
144        let sanitized = sanitize_llm_request_for_stateless_replay(request);
145        Box::pin(async_stream::stream! {
146            let mut stream = inner.stream(&sanitized);
147            while let Some(event) = stream.next().await {
148                if runtime_turn_diagnostics_enabled()
149                    && let Err(error) = &event
150                {
151                    tracing::error!(
152                        error = %error,
153                        error_debug = ?error,
154                        "mobkit llm client stream error"
155                    );
156                }
157                yield event;
158            }
159        })
160    }
161
162    fn provider(&self) -> &'static str {
163        self.inner.provider()
164    }
165
166    async fn health_check(&self) -> Result<(), meerkat_client::LlmError> {
167        self.inner.health_check().await
168    }
169
170    fn compile_schema(
171        &self,
172        output_schema: &meerkat_core::OutputSchema,
173    ) -> Result<meerkat_core::schema::CompiledSchema, meerkat_core::schema::SchemaError> {
174        self.inner.compile_schema(output_schema)
175    }
176}
177
178/// Async hook called before each session is created. Receives the mutable
179/// `CreateSessionRequest` so the app can inject external tools, augment the
180/// system prompt, set labels, override the model, load session resume data
181/// from external stores, etc.
182///
183/// The hook runs **before** `create_session` captures labels and LLM identity,
184/// so all mutations are reflected in session metadata, not just the agent build.
185///
186/// ```rust,ignore
187/// let spec = MobBootstrapSpec::persistent_with_hook(
188///     definition, storage, store_path, 64, session_store,
189///     |req: &mut CreateSessionRequest| {
190///         Box::pin(async move {
191///             // Async: load session from external store
192///             let session = my_store.load_by_owner(&owner_id).await;
193///             if let Some(s) = session {
194///                 let build = req.build.get_or_insert_with(SessionBuildOptions::default);
195///                 build.resume_session = Some(s);
196///             }
197///             // Sync: inject tools, augment prompt
198///             let build = req.build.get_or_insert_with(SessionBuildOptions::default);
199///             build.external_tools = Some(my_tools());
200///             Ok(())
201///         })
202///     },
203/// );
204/// ```
205pub(crate) type PreBuildHook = Arc<
206    dyn Fn(
207            &mut CreateSessionRequest,
208        ) -> std::pin::Pin<
209            Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
210        > + Send
211        + Sync,
212>;
213
214/// Optional post-creation hook invoked after `create_session` succeeds.
215pub type AfterCreateHook = Arc<
216    dyn Fn(
217            meerkat_core::types::SessionId,
218            SessionCreatedContext,
219        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
220        + Send
221        + Sync,
222>;
223
224/// Wraps a `MobSessionService`, applying a `PreBuildHook` to the
225/// `CreateSessionRequest` in `create_session()` before delegating.
226///
227/// The hook runs before labels and LLM identity are captured by the inner
228/// session service, so mutations to `req.labels`, `req.model`, `req.build`,
229/// and `req.system_prompt` are fully reflected in session metadata.
230struct PreBuildMobSessionService {
231    inner: Arc<dyn MobSessionService>,
232    hook: PreBuildHook,
233    after_create_hook: Option<AfterCreateHook>,
234    runtime_adapter_override: Option<Arc<meerkat_runtime::MeerkatMachine>>,
235}
236
237fn no_op_pre_build_hook() -> PreBuildHook {
238    Arc::new(|_req: &mut CreateSessionRequest| Box::pin(async { Ok(()) }))
239}
240
241#[derive(Debug, Clone, Copy, PartialEq, Eq)]
242pub(crate) enum DelegateIdleRetireOverride {
243    Disabled,
244    Seconds(u64),
245}
246
247#[derive(Clone, Default)]
248pub(crate) struct ImplicitDelegateRetirementOverrides {
249    inner: Arc<tokio::sync::RwLock<BTreeMap<(String, String), DelegateIdleRetireOverride>>>,
250}
251
252impl ImplicitDelegateRetirementOverrides {
253    pub(crate) async fn set(
254        &self,
255        mob_id: impl Into<String>,
256        member_id: impl Into<String>,
257        override_policy: DelegateIdleRetireOverride,
258    ) {
259        self.inner
260            .write()
261            .await
262            .insert((mob_id.into(), member_id.into()), override_policy);
263    }
264
265    pub(crate) async fn get(
266        &self,
267        mob_id: &str,
268        member_id: &str,
269    ) -> Option<DelegateIdleRetireOverride> {
270        self.inner
271            .read()
272            .await
273            .get(&(mob_id.to_string(), member_id.to_string()))
274            .copied()
275    }
276}
277
278struct AutoWireParentMobToolsFactory {
279    inner: Arc<dyn meerkat_core::service::MobToolsFactory>,
280    implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides,
281}
282
283#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
284#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
285impl meerkat_core::service::MobToolsFactory for AutoWireParentMobToolsFactory {
286    async fn build_mob_tools(
287        &self,
288        args: meerkat_core::service::MobToolsBuildArgs,
289    ) -> Result<Arc<dyn meerkat_core::AgentToolDispatcher>, Box<dyn std::error::Error + Send + Sync>>
290    {
291        let inner = self.inner.build_mob_tools(args).await?;
292        Ok(Arc::new(AutoWireParentMobToolDispatcher {
293            inner,
294            implicit_delegate_retirement_overrides: self
295                .implicit_delegate_retirement_overrides
296                .clone(),
297        }))
298    }
299}
300
301struct AutoWireParentMobToolDispatcher {
302    inner: Arc<dyn meerkat_core::AgentToolDispatcher>,
303    implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides,
304}
305
306#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
307#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
308impl meerkat_core::AgentToolDispatcher for AutoWireParentMobToolDispatcher {
309    fn tools(&self) -> Arc<[Arc<meerkat_core::types::ToolDef>]> {
310        self.inner
311            .tools()
312            .iter()
313            .map(|tool| {
314                if tool.name == "delegate" {
315                    Arc::new(delegate_tool_def_with_idle_retire_secs(tool))
316                } else if tool.name == "mob_spawn_member" {
317                    Arc::new(mob_spawn_tool_def_with_idle_retire_secs(tool))
318                } else {
319                    Arc::clone(tool)
320                }
321            })
322            .collect::<Vec<_>>()
323            .into()
324    }
325
326    async fn dispatch(
327        &self,
328        call: meerkat_core::types::ToolCallView<'_>,
329    ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
330        if call.name == "delegate" {
331            return self.dispatch_delegate(call).await;
332        }
333        if call.name != "mob_spawn_member" {
334            return self.inner.dispatch(call).await;
335        }
336        self.dispatch_mob_spawn_member(call).await
337    }
338
339    fn capabilities(&self) -> meerkat_core::agent::DispatcherCapabilities {
340        self.inner.capabilities()
341    }
342
343    fn bind_ops_lifecycle(
344        self: Arc<Self>,
345        registry: Arc<dyn meerkat_core::ops_lifecycle::OpsLifecycleRegistry>,
346        owner_bridge_session_id: meerkat_core::types::SessionId,
347    ) -> Result<meerkat_core::agent::BindOutcome, meerkat_core::agent::OpsLifecycleBindError> {
348        let owned = Arc::try_unwrap(self)
349            .map_err(|_| meerkat_core::agent::OpsLifecycleBindError::SharedOwnership)?;
350        let outcome = owned
351            .inner
352            .bind_ops_lifecycle(registry, owner_bridge_session_id)?;
353        let was_bound = outcome.was_bound();
354        let dispatcher = Arc::new(Self {
355            inner: outcome.into_dispatcher(),
356            implicit_delegate_retirement_overrides: owned.implicit_delegate_retirement_overrides,
357        });
358        Ok(if was_bound {
359            meerkat_core::agent::BindOutcome::Bound(dispatcher)
360        } else {
361            meerkat_core::agent::BindOutcome::Skipped(dispatcher)
362        })
363    }
364}
365
366impl AutoWireParentMobToolDispatcher {
367    async fn dispatch_mob_spawn_member(
368        &self,
369        call: meerkat_core::types::ToolCallView<'_>,
370    ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
371        let mut args = serde_json::from_str::<Value>(call.args.get()).map_err(|error| {
372            meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
373        })?;
374        let idle_retire_override = delegate_idle_retire_override_from_args(call.name, &mut args)?;
375        let idle_retire_targets = idle_retire_targets_from_spawn_args(&args);
376        if let Some(object) = args.as_object_mut() {
377            object
378                .entry("auto_wire_parent".to_string())
379                .or_insert(Value::Bool(true));
380        }
381        let args = serde_json::value::RawValue::from_string(args.to_string()).map_err(|error| {
382            meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
383        })?;
384        let call = meerkat_core::types::ToolCallView {
385            id: call.id,
386            name: call.name,
387            args: &args,
388        };
389        let outcome = self.inner.dispatch(call).await?;
390        self.register_idle_retire_override_from_outcome(
391            &outcome,
392            idle_retire_override,
393            &idle_retire_targets,
394        )
395        .await;
396
397        Ok(outcome)
398    }
399
400    async fn dispatch_delegate(
401        &self,
402        call: meerkat_core::types::ToolCallView<'_>,
403    ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
404        let mut args = serde_json::from_str::<Value>(call.args.get()).map_err(|error| {
405            meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
406        })?;
407        let idle_retire_override = delegate_idle_retire_override_from_args(call.name, &mut args)?;
408        let args = serde_json::value::RawValue::from_string(args.to_string()).map_err(|error| {
409            meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
410        })?;
411        let call = meerkat_core::types::ToolCallView {
412            id: call.id,
413            name: call.name,
414            args: &args,
415        };
416        let outcome = self.inner.dispatch(call).await?;
417
418        self.register_idle_retire_override_from_outcome(&outcome, idle_retire_override, &[])
419            .await;
420
421        Ok(outcome)
422    }
423
424    async fn register_idle_retire_override_from_outcome(
425        &self,
426        outcome: &meerkat_core::ToolDispatchOutcome,
427        idle_retire_override: Option<DelegateIdleRetireOverride>,
428        fallback_targets: &[IdleRetireTarget],
429    ) {
430        if outcome.result.is_error {
431            return;
432        }
433        let Some(override_policy) = idle_retire_override else {
434            return;
435        };
436        for target in
437            idle_retire_targets_from_outcome_text(&outcome.result.text_content(), fallback_targets)
438        {
439            self.implicit_delegate_retirement_overrides
440                .set(&target.mob_id, &target.member_id, override_policy)
441                .await;
442        }
443    }
444}
445
446#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
447struct IdleRetireTarget {
448    mob_id: String,
449    member_id: String,
450}
451
452fn text_field<'a>(value: &'a Value, key: &str) -> Option<&'a str> {
453    value
454        .get(key)
455        .and_then(Value::as_str)
456        .filter(|text| !text.is_empty())
457}
458
459fn member_identity_field(value: &Value) -> Option<&str> {
460    text_field(value, "agent_identity")
461        .or_else(|| text_field(value, "member_id"))
462        .or_else(|| text_field(value, "identity"))
463}
464
465fn target_from_value(value: &Value, default_mob_id: Option<&str>) -> Option<IdleRetireTarget> {
466    let mob_id = text_field(value, "mob_id").or(default_mob_id)?;
467    let member_id = member_identity_field(value)?;
468    Some(IdleRetireTarget {
469        mob_id: mob_id.to_string(),
470        member_id: member_id.to_string(),
471    })
472}
473
474fn idle_retire_targets_from_spawn_args(args: &Value) -> Vec<IdleRetireTarget> {
475    let default_mob_id = text_field(args, "mob_id");
476    let mut targets = BTreeSet::new();
477    if let Some(target) = target_from_value(args, default_mob_id) {
478        targets.insert(target);
479    }
480    for key in ["specs", "members"] {
481        let Some(values) = args.get(key).and_then(Value::as_array) else {
482            continue;
483        };
484        for value in values {
485            if let Some(target) = target_from_value(value, default_mob_id) {
486                targets.insert(target);
487            }
488        }
489    }
490    targets.into_iter().collect()
491}
492
493fn target_from_result_value(
494    value: &Value,
495    fallback_targets: &[IdleRetireTarget],
496) -> Option<IdleRetireTarget> {
497    if let Some(target) = target_from_value(value, None) {
498        return Some(target);
499    }
500    let member_id = member_identity_field(value)?;
501    let mut matches = fallback_targets
502        .iter()
503        .filter(|target| target.member_id == member_id);
504    let target = matches.next()?;
505    if matches.next().is_some() {
506        return None;
507    }
508    Some(target.clone())
509}
510
511fn collect_idle_retire_result_targets(
512    value: &Value,
513    fallback_targets: &[IdleRetireTarget],
514    targets: &mut BTreeSet<IdleRetireTarget>,
515) {
516    if let Some(target) = target_from_result_value(value, fallback_targets) {
517        targets.insert(target);
518    }
519    for key in ["members", "specs", "spawned", "results"] {
520        let Some(values) = value.get(key).and_then(Value::as_array) else {
521            continue;
522        };
523        for value in values {
524            collect_idle_retire_result_targets(value, fallback_targets, targets);
525        }
526    }
527}
528
529fn idle_retire_targets_from_outcome_text(
530    text: &str,
531    fallback_targets: &[IdleRetireTarget],
532) -> Vec<IdleRetireTarget> {
533    let Ok(payload) = serde_json::from_str::<Value>(text) else {
534        return fallback_targets.to_vec();
535    };
536    let mut targets = BTreeSet::new();
537    collect_idle_retire_result_targets(&payload, fallback_targets, &mut targets);
538    if targets.is_empty() {
539        targets.extend(fallback_targets.iter().cloned());
540    }
541    targets.into_iter().collect()
542}
543
544struct DefinitionSeededRealmProfileStore {
545    inner: Arc<dyn meerkat_mob::RealmProfileStore>,
546    profiles: BTreeMap<String, Profile>,
547}
548
549impl DefinitionSeededRealmProfileStore {
550    fn new(
551        definition: &MobDefinition,
552        inner: Arc<dyn meerkat_mob::RealmProfileStore>,
553    ) -> Option<Self> {
554        let profiles = definition
555            .profiles
556            .iter()
557            .filter_map(|(name, binding)| {
558                binding
559                    .as_inline()
560                    .cloned()
561                    .map(|profile| (name.to_string(), profile))
562            })
563            .collect::<BTreeMap<_, _>>();
564
565        (!profiles.is_empty()).then_some(Self { inner, profiles })
566    }
567
568    fn stored(&self, name: &str, profile: &Profile) -> meerkat_mob::StoredRealmProfile {
569        let now = chrono::Utc::now();
570        meerkat_mob::StoredRealmProfile {
571            name: name.to_string(),
572            profile: profile.clone(),
573            revision: 0,
574            created_at: now,
575            updated_at: now,
576        }
577    }
578}
579
580#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
581#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
582impl meerkat_mob::RealmProfileStore for DefinitionSeededRealmProfileStore {
583    async fn create(
584        &self,
585        name: &str,
586        profile: &Profile,
587    ) -> Result<meerkat_mob::StoredRealmProfile, meerkat_mob::MobStoreError> {
588        if self.profiles.contains_key(name) {
589            return Err(meerkat_mob::MobStoreError::CasConflict(format!(
590                "realm profile already exists: {name}"
591            )));
592        }
593        self.inner.create(name, profile).await
594    }
595
596    async fn get(
597        &self,
598        name: &str,
599    ) -> Result<Option<meerkat_mob::StoredRealmProfile>, meerkat_mob::MobStoreError> {
600        if let Some(profile) = self.profiles.get(name) {
601            return Ok(Some(self.stored(name, profile)));
602        }
603        self.inner.get(name).await
604    }
605
606    async fn list(
607        &self,
608    ) -> Result<Vec<meerkat_mob::StoredRealmProfile>, meerkat_mob::MobStoreError> {
609        let mut merged = self.inner.list().await?;
610        merged.retain(|profile| !self.profiles.contains_key(profile.name.as_str()));
611        merged.extend(
612            self.profiles
613                .iter()
614                .map(|(name, profile)| self.stored(name, profile)),
615        );
616        merged.sort_by(|a, b| a.name.cmp(&b.name));
617        Ok(merged)
618    }
619
620    async fn update(
621        &self,
622        name: &str,
623        profile: &Profile,
624        expected_revision: u64,
625    ) -> Result<meerkat_mob::StoredRealmProfile, meerkat_mob::MobStoreError> {
626        if self.profiles.contains_key(name) {
627            return Err(meerkat_mob::MobStoreError::CasConflict(format!(
628                "realm profile '{name}' is provided by the mob definition"
629            )));
630        }
631        self.inner.update(name, profile, expected_revision).await
632    }
633
634    async fn delete(
635        &self,
636        name: &str,
637        expected_revision: u64,
638    ) -> Result<meerkat_mob::StoredRealmProfile, meerkat_mob::MobStoreError> {
639        if self.profiles.contains_key(name) {
640            return Err(meerkat_mob::MobStoreError::CasConflict(format!(
641                "realm profile '{name}' is provided by the mob definition"
642            )));
643        }
644        self.inner.delete(name, expected_revision).await
645    }
646}
647
648fn delegate_idle_retire_override_from_args(
649    tool_name: &str,
650    args: &mut Value,
651) -> Result<Option<DelegateIdleRetireOverride>, meerkat_core::ToolError> {
652    let Some(object) = args.as_object_mut() else {
653        return Ok(None);
654    };
655    let Some(value) = object.remove("idle_retire_secs") else {
656        return Ok(None);
657    };
658    if value.is_null() {
659        return Ok(Some(DelegateIdleRetireOverride::Disabled));
660    }
661    value
662        .as_u64()
663        .map(DelegateIdleRetireOverride::Seconds)
664        .map(Some)
665        .ok_or_else(|| {
666            meerkat_core::ToolError::invalid_arguments(
667                tool_name,
668                "idle_retire_secs must be a non-negative integer or null",
669            )
670        })
671}
672
673fn delegate_tool_def_with_idle_retire_secs(
674    tool: &meerkat_core::types::ToolDef,
675) -> meerkat_core::types::ToolDef {
676    let mut patched = tool.clone();
677    if !patched.description.contains("IDLE RETIREMENT:") {
678        patched.description.push_str(
679            "\n\nIDLE RETIREMENT:\n\
680             Omit idle_retire_secs to use the runtime default. Pass an integer \
681             number of seconds to override idle auto-retirement for this helper. \
682             Pass null to disable auto-retirement for this helper.",
683        );
684    }
685    if let Some(properties) = patched
686        .input_schema
687        .get_mut("properties")
688        .and_then(Value::as_object_mut)
689    {
690        properties
691            .entry("idle_retire_secs".to_string())
692            .or_insert_with(|| {
693                serde_json::json!({
694                    "description": "Override idle auto-retirement for this helper. Omit to use the runtime default, use an integer number of seconds to override, or null to disable auto-retirement for this helper.",
695                    "anyOf": [
696                        {"type": "integer", "minimum": 0},
697                        {"type": "null"}
698                    ]
699                })
700            });
701    }
702    patched
703}
704
705fn mob_spawn_tool_def_with_idle_retire_secs(
706    tool: &meerkat_core::types::ToolDef,
707) -> meerkat_core::types::ToolDef {
708    let mut patched = tool.clone();
709    if !patched.description.contains("IDLE RETIREMENT:") {
710        patched.description.push_str(
711            "\n\nIDLE RETIREMENT:\n\
712             Omit idle_retire_secs to leave this spawned member out of auto-retirement. \
713             Pass an integer number of seconds to retire the member after it has been \
714             idle for that long. Pass null to explicitly disable auto-retirement.",
715        );
716    }
717    if let Some(properties) = patched
718        .input_schema
719        .get_mut("properties")
720        .and_then(Value::as_object_mut)
721    {
722        properties
723            .entry("idle_retire_secs".to_string())
724            .or_insert_with(|| {
725                serde_json::json!({
726                    "description": "Opt this spawned member into idle auto-retirement. Omit to keep the member indefinitely, use an integer number of seconds to retire after that much idle time, or null to explicitly disable auto-retirement.",
727                    "anyOf": [
728                        {"type": "integer", "minimum": 0},
729                        {"type": "null"}
730                    ]
731                })
732            });
733    }
734    patched
735}
736
737fn install_agent_mob_tools(
738    definition: &MobDefinition,
739    slot: Arc<std::sync::RwLock<Option<Arc<dyn meerkat_core::service::MobToolsFactory>>>>,
740    session_service: Arc<dyn MobSessionService>,
741) -> (
742    Arc<meerkat_mob_mcp::MobMcpState>,
743    ImplicitDelegateRetirementOverrides,
744    SharedDefaultLlmClientSlot,
745) {
746    let default_llm_client_slot = Arc::new(std::sync::RwLock::new(None::<Arc<dyn LlmClient>>));
747    let default_llm_client_provider_slot = Arc::clone(&default_llm_client_slot);
748    let mut state = meerkat_mob_mcp::MobMcpState::new(session_service);
749    if let Some(base_store) = state.realm_profile_store().cloned()
750        && let Some(store) = DefinitionSeededRealmProfileStore::new(definition, base_store)
751    {
752        state = state.with_realm_profile_store(Some(Arc::new(store)));
753    }
754    state = state
755        .with_realm_skill_sources(definition.skills.clone())
756        .with_default_llm_client_provider(Some(Arc::new(move || {
757            default_llm_client_provider_slot
758                .read()
759                .unwrap_or_else(std::sync::PoisonError::into_inner)
760                .clone()
761        })));
762    let state = Arc::new(state);
763    let implicit_delegate_retirement_overrides = ImplicitDelegateRetirementOverrides::default();
764    let inner = Arc::new(meerkat_mob_mcp::AgentMobToolSurfaceFactory::new(
765        Arc::clone(&state),
766    ));
767    let factory = Arc::new(AutoWireParentMobToolsFactory {
768        inner,
769        implicit_delegate_retirement_overrides: implicit_delegate_retirement_overrides.clone(),
770    });
771    *slot
772        .write()
773        .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(factory);
774    (
775        state,
776        implicit_delegate_retirement_overrides,
777        default_llm_client_slot,
778    )
779}
780
781#[cfg(test)]
782#[allow(dead_code)]
783#[derive(Debug, Clone)]
784pub(crate) struct RuntimeTurnTrace {
785    pub(crate) session_id: String,
786    pub(crate) boundary: String,
787    pub(crate) contributing_input_count: usize,
788    pub(crate) outcome: String,
789}
790
791fn is_replay_unsafe_server_tool_content(name: &str, content: &Value) -> bool {
792    name == "web_search_annotations"
793        || content
794            .get("type")
795            .and_then(Value::as_str)
796            .is_some_and(|kind| kind.starts_with("response."))
797}
798
799fn sanitize_llm_request_for_stateless_replay(request: &LlmRequest) -> LlmRequest {
800    let mut sanitized = request.clone();
801    sanitized.messages = request
802        .messages
803        .iter()
804        .cloned()
805        .map(sanitize_message_for_stateless_replay)
806        .collect();
807    sanitized
808}
809
810fn sanitize_create_session_request_llm_override(req: &mut CreateSessionRequest) {
811    let Some(build) = req.build.as_mut() else {
812        return;
813    };
814    let Some(client) = build
815        .llm_client_override
816        .as_ref()
817        .and_then(meerkat::decode_llm_client_override_from_service)
818    else {
819        return;
820    };
821    build.llm_client_override = Some(meerkat::encode_llm_client_override_for_service(
822        ReplaySanitizingLlmClient::wrap(client),
823    ));
824}
825
826fn sanitize_message_for_stateless_replay(message: Message) -> Message {
827    match message {
828        Message::BlockAssistant(mut assistant) => {
829            assistant.blocks = assistant
830                .blocks
831                .into_iter()
832                .filter_map(|block| match block {
833                    AssistantBlock::ServerToolContent { name, content, .. }
834                        if is_replay_unsafe_server_tool_content(&name, &content) =>
835                    {
836                        None
837                    }
838                    other => Some(other),
839                })
840                .collect();
841            Message::BlockAssistant(assistant)
842        }
843        other => other,
844    }
845}
846
847/// Open the persistent runtime store that holds the authoritative
848/// session snapshot used by `load_persisted_session` (resume path) and
849/// `load_persisted_session_for_control` (archive/retire path). Lives at
850/// `<store_path>/runtime.sqlite` — separate file from the session
851/// store so we don't depend on the session_store's concrete type. If
852/// the SQLite open fails (rare: disk full, permissions), fall back to
853/// `InMemoryRuntimeStore` so the runtime can still bootstrap. In that
854/// degraded mode resume across restart and archive operations will
855/// fail; the warning makes the cause visible in operator logs.
856fn build_persistent_runtime_store(store_path: &Path) -> Arc<dyn meerkat_runtime::RuntimeStore> {
857    let runtime_db = store_path.join("runtime.sqlite");
858    match meerkat_runtime::store::SqliteRuntimeStore::new(&runtime_db) {
859        Ok(store) => Arc::new(store),
860        Err(err) => {
861            tracing::warn!(
862                path = %runtime_db.display(),
863                error = %err,
864                "failed to open SqliteRuntimeStore; falling back to InMemoryRuntimeStore. \
865                 Sessions will not survive process restart and archive operations may fail.",
866            );
867            Arc::new(meerkat_runtime::InMemoryRuntimeStore::new())
868        }
869    }
870}
871
872/// RuntimeStore facade for external-authoritative identity-first apps.
873///
874/// `PersistentSessionService` treats `RuntimeStore` as the authoritative
875/// session snapshot source whenever one is installed. External apps such as
876/// OB3 supply a durable `SessionStore` through `ContinuitySessionStoreAdapter`,
877/// so this bridge makes that store visible to the runtime snapshot path while
878/// delegating non-session runtime bookkeeping to the process-local store.
879struct SessionStoreBackedRuntimeStore {
880    inner: Arc<dyn meerkat_runtime::RuntimeStore>,
881}
882
883impl SessionStoreBackedRuntimeStore {
884    fn new(
885        inner: Arc<dyn meerkat_runtime::RuntimeStore>,
886        _session_store: Arc<dyn SessionStore>,
887    ) -> Self {
888        Self { inner }
889    }
890}
891
892#[async_trait]
893impl meerkat_runtime::RuntimeStore for SessionStoreBackedRuntimeStore {
894    fn auth_authority_key(&self) -> Option<String> {
895        self.inner.auth_authority_key()
896    }
897
898    fn persist_auth_oauth_flow_snapshot(
899        &self,
900        snapshot_json: &[u8],
901    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
902        self.inner.persist_auth_oauth_flow_snapshot(snapshot_json)
903    }
904
905    fn load_auth_oauth_flow_snapshot(
906        &self,
907    ) -> Result<Option<Vec<u8>>, meerkat_runtime::store::RuntimeStoreError> {
908        self.inner.load_auth_oauth_flow_snapshot()
909    }
910
911    fn update_auth_oauth_flow_snapshot(
912        &self,
913        update: &mut meerkat_runtime::store::AuthOAuthFlowSnapshotUpdate<'_>,
914    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
915        self.inner.update_auth_oauth_flow_snapshot(update)
916    }
917
918    async fn commit_session_snapshot(
919        &self,
920        runtime_id: &meerkat_runtime::LogicalRuntimeId,
921        session_delta: meerkat_runtime::store::SessionDelta,
922    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
923        self.inner
924            .commit_session_snapshot(runtime_id, session_delta)
925            .await
926    }
927
928    async fn commit_session_transcript_rewrite_snapshot(
929        &self,
930        runtime_id: &meerkat_runtime::LogicalRuntimeId,
931        session_delta: meerkat_runtime::store::SessionDelta,
932        commit: &meerkat_core::TranscriptRewriteCommit,
933    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
934        self.inner
935            .commit_session_transcript_rewrite_snapshot(runtime_id, session_delta, commit)
936            .await
937    }
938
939    async fn atomic_apply(
940        &self,
941        runtime_id: &meerkat_runtime::LogicalRuntimeId,
942        session_delta: Option<meerkat_runtime::store::SessionDelta>,
943        receipt: meerkat_core::lifecycle::RunBoundaryReceipt,
944        input_updates: Vec<StoredInputState>,
945        session_store_key: Option<meerkat_core::types::SessionId>,
946    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
947        self.inner
948            .atomic_apply(
949                runtime_id,
950                session_delta,
951                receipt,
952                input_updates,
953                session_store_key,
954            )
955            .await
956    }
957
958    async fn load_input_states(
959        &self,
960        runtime_id: &meerkat_runtime::LogicalRuntimeId,
961    ) -> Result<Vec<StoredInputState>, meerkat_runtime::store::RuntimeStoreError> {
962        self.inner.load_input_states(runtime_id).await
963    }
964
965    async fn load_boundary_receipt(
966        &self,
967        runtime_id: &meerkat_runtime::LogicalRuntimeId,
968        run_id: &meerkat_core::lifecycle::RunId,
969        sequence: u64,
970    ) -> Result<
971        Option<meerkat_core::lifecycle::RunBoundaryReceipt>,
972        meerkat_runtime::store::RuntimeStoreError,
973    > {
974        self.inner
975            .load_boundary_receipt(runtime_id, run_id, sequence)
976            .await
977    }
978
979    async fn load_session_snapshot(
980        &self,
981        runtime_id: &meerkat_runtime::LogicalRuntimeId,
982    ) -> Result<Option<Vec<u8>>, meerkat_runtime::store::RuntimeStoreError> {
983        self.inner.load_session_snapshot(runtime_id).await
984    }
985
986    async fn clear_session_snapshot(
987        &self,
988        runtime_id: &meerkat_runtime::LogicalRuntimeId,
989    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
990        self.inner.clear_session_snapshot(runtime_id).await
991    }
992
993    async fn replace_session_snapshot_if_current(
994        &self,
995        runtime_id: &meerkat_runtime::LogicalRuntimeId,
996        expected_current: &[u8],
997        replacement: Vec<u8>,
998    ) -> Result<bool, meerkat_runtime::store::RuntimeStoreError> {
999        self.inner
1000            .replace_session_snapshot_if_current(runtime_id, expected_current, replacement)
1001            .await
1002    }
1003
1004    async fn clear_session_snapshot_if_current(
1005        &self,
1006        runtime_id: &meerkat_runtime::LogicalRuntimeId,
1007        expected_current: &[u8],
1008    ) -> Result<bool, meerkat_runtime::store::RuntimeStoreError> {
1009        self.inner
1010            .clear_session_snapshot_if_current(runtime_id, expected_current)
1011            .await
1012    }
1013
1014    async fn persist_input_state(
1015        &self,
1016        runtime_id: &meerkat_runtime::LogicalRuntimeId,
1017        state: &StoredInputState,
1018    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
1019        self.inner.persist_input_state(runtime_id, state).await
1020    }
1021
1022    async fn load_input_state(
1023        &self,
1024        runtime_id: &meerkat_runtime::LogicalRuntimeId,
1025        input_id: &meerkat_core::lifecycle::InputId,
1026    ) -> Result<Option<StoredInputState>, meerkat_runtime::store::RuntimeStoreError> {
1027        self.inner.load_input_state(runtime_id, input_id).await
1028    }
1029
1030    async fn load_runtime_state(
1031        &self,
1032        runtime_id: &meerkat_runtime::LogicalRuntimeId,
1033    ) -> Result<Option<meerkat_runtime::RuntimeState>, meerkat_runtime::store::RuntimeStoreError>
1034    {
1035        self.inner.load_runtime_state(runtime_id).await
1036    }
1037
1038    async fn commit_machine_lifecycle(
1039        &self,
1040        runtime_id: &meerkat_runtime::LogicalRuntimeId,
1041        commit: MachineLifecycleCommit,
1042        input_states: &[StoredInputState],
1043    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
1044        self.inner
1045            .commit_machine_lifecycle(runtime_id, commit, input_states)
1046            .await
1047    }
1048
1049    async fn persist_ops_lifecycle(
1050        &self,
1051        runtime_id: &meerkat_runtime::LogicalRuntimeId,
1052        snapshot: &meerkat_runtime::ops_lifecycle::PersistedOpsSnapshot,
1053    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
1054        self.inner.persist_ops_lifecycle(runtime_id, snapshot).await
1055    }
1056
1057    async fn load_ops_lifecycle(
1058        &self,
1059        runtime_id: &meerkat_runtime::LogicalRuntimeId,
1060    ) -> Result<
1061        Option<meerkat_runtime::ops_lifecycle::PersistedOpsSnapshot>,
1062        meerkat_runtime::store::RuntimeStoreError,
1063    > {
1064        self.inner.load_ops_lifecycle(runtime_id).await
1065    }
1066}
1067
1068#[cfg(test)]
1069static RUNTIME_TURN_TRACES: OnceLock<Mutex<Vec<RuntimeTurnTrace>>> = OnceLock::new();
1070
1071#[cfg(test)]
1072fn runtime_turn_traces() -> &'static Mutex<Vec<RuntimeTurnTrace>> {
1073    RUNTIME_TURN_TRACES.get_or_init(|| Mutex::new(Vec::new()))
1074}
1075
1076#[cfg(test)]
1077#[allow(clippy::expect_used)]
1078fn record_runtime_turn_trace(trace: RuntimeTurnTrace) {
1079    runtime_turn_traces()
1080        .lock()
1081        .expect("runtime turn traces mutex")
1082        .push(trace);
1083}
1084
1085#[cfg(test)]
1086#[allow(dead_code)]
1087#[allow(clippy::expect_used)]
1088pub(crate) fn take_runtime_turn_traces() -> Vec<RuntimeTurnTrace> {
1089    std::mem::take(
1090        &mut *runtime_turn_traces()
1091            .lock()
1092            .expect("runtime turn traces mutex"),
1093    )
1094}
1095
1096#[cfg(not(test))]
1097#[allow(dead_code)]
1098fn record_runtime_turn_trace(_trace: ()) {}
1099
1100fn runtime_turn_diagnostics_enabled() -> bool {
1101    std::env::var_os("MOBKIT_TRACE_RUNTIME_TURNS").is_some()
1102}
1103
1104fn summarize_runtime_prompt(prompt: &meerkat_core::ContentInput) -> String {
1105    match prompt {
1106        meerkat_core::ContentInput::Text(text) => {
1107            text.lines().take(6).collect::<Vec<_>>().join(" ")
1108        }
1109        meerkat_core::ContentInput::Blocks(blocks) => blocks
1110            .iter()
1111            .map(|block| block.text_projection().to_string())
1112            .collect::<Vec<_>>()
1113            .join(" ")
1114            .lines()
1115            .take(6)
1116            .collect::<Vec<_>>()
1117            .join(" "),
1118    }
1119}
1120
1121/// Whether the session factory should wire the image-generation substrate for
1122/// this definition. Meerkat owns the per-profile visibility decision via
1123/// `profile.tools.image_generation`; MobKit only needs to make the runtime
1124/// machine available when a profile opts in, or when a realm profile may resolve
1125/// to an opt-in profile at spawn time.
1126pub fn mob_definition_may_use_image_generation(definition: &MobDefinition) -> bool {
1127    definition.profiles.values().any(|binding| {
1128        binding
1129            .as_inline()
1130            .is_none_or(|profile| profile.tools.image_generation)
1131    })
1132}
1133
1134fn normalize_runtime_turn_request(
1135    mut req: meerkat_core::service::StartTurnRequest,
1136) -> meerkat_core::service::StartTurnRequest {
1137    // Queue/Steer and render metadata are runtime-owned semantics. By the
1138    // time apply_runtime_turn() invokes the session service, the runtime
1139    // has already chosen the boundary and recorded the metadata it needs.
1140    // The direct agent/session path is queue-only, so forward a normalized
1141    // turn request to avoid re-injecting runtime-only semantics.
1142    req.runtime.handling_mode = meerkat_core::types::HandlingMode::Queue;
1143    req.runtime.render_metadata = None;
1144    req
1145}
1146
1147/// Implement all `MobSessionService` super-traits by delegating to `self.inner`,
1148/// overriding only `create_session` to apply the pre-build hook.
1149macro_rules! delegate_mob_session_service {
1150    ($wrapper:ty) => {
1151        #[async_trait]
1152        impl meerkat_core::service::SessionService for $wrapper {
1153            async fn create_session(
1154                &self,
1155                mut req: CreateSessionRequest,
1156            ) -> Result<meerkat_core::types::RunResult, SessionError> {
1157                (self.hook)(&mut req).await?;
1158                sanitize_create_session_request_llm_override(&mut req);
1159
1160                // Capture context before create_session consumes the request.
1161                let ctx = SessionCreatedContext {
1162                    model: req.model.clone(),
1163                    labels: req.labels.clone().unwrap_or_default(),
1164                    system_prompt: req.system_prompt.clone(),
1165                };
1166                let result = self.inner.create_session(req).await?;
1167
1168                // Best-effort after_create — errors logged, not propagated.
1169                if let Some(ref after_hook) = self.after_create_hook {
1170                    after_hook(result.session_id.clone(), ctx).await;
1171                }
1172
1173                Ok(result)
1174            }
1175            async fn start_turn(
1176                &self,
1177                id: &meerkat_core::types::SessionId,
1178                req: meerkat_core::service::StartTurnRequest,
1179            ) -> Result<meerkat_core::types::RunResult, SessionError> {
1180                self.inner.start_turn(id, req).await
1181            }
1182            async fn interrupt(
1183                &self,
1184                id: &meerkat_core::types::SessionId,
1185            ) -> Result<(), SessionError> {
1186                self.inner.interrupt(id).await
1187            }
1188            async fn cancel_after_boundary(
1189                &self,
1190                id: &meerkat_core::types::SessionId,
1191            ) -> Result<(), SessionError> {
1192                self.inner.cancel_after_boundary(id).await
1193            }
1194            async fn set_session_client(
1195                &self,
1196                id: &meerkat_core::types::SessionId,
1197                client: Arc<dyn meerkat_core::AgentLlmClient>,
1198            ) -> Result<(), SessionError> {
1199                self.inner
1200                    .set_session_client(id, ReplaySanitizingAgentLlmClient::wrap(client))
1201                    .await
1202            }
1203            async fn hot_swap_session_llm_identity(
1204                &self,
1205                id: &meerkat_core::types::SessionId,
1206                client: Arc<dyn meerkat_core::AgentLlmClient>,
1207                identity: meerkat_core::session::SessionLlmIdentity,
1208                request_policy: meerkat_core::SessionLlmRequestPolicy,
1209            ) -> Result<(), SessionError> {
1210                self.inner
1211                    .hot_swap_session_llm_identity(
1212                        id,
1213                        ReplaySanitizingAgentLlmClient::wrap(client),
1214                        identity,
1215                        request_policy,
1216                    )
1217                    .await
1218            }
1219            async fn update_session_keep_alive(
1220                &self,
1221                id: &meerkat_core::types::SessionId,
1222                keep_alive: bool,
1223            ) -> Result<(), SessionError> {
1224                self.inner.update_session_keep_alive(id, keep_alive).await
1225            }
1226            async fn update_session_mob_authority_context(
1227                &self,
1228                id: &meerkat_core::types::SessionId,
1229                authority_context: Option<meerkat_core::service::MobToolAuthorityContext>,
1230            ) -> Result<(), SessionError> {
1231                self.inner
1232                    .update_session_mob_authority_context(id, authority_context)
1233                    .await
1234            }
1235            async fn has_live_session(
1236                &self,
1237                id: &meerkat_core::types::SessionId,
1238            ) -> Result<bool, SessionError> {
1239                self.inner.has_live_session(id).await
1240            }
1241            async fn set_session_tool_visibility_state(
1242                &self,
1243                id: &meerkat_core::types::SessionId,
1244                state: Option<meerkat_core::SessionToolVisibilityState>,
1245            ) -> Result<(), SessionError> {
1246                self.inner
1247                    .set_session_tool_visibility_state(id, state)
1248                    .await
1249            }
1250            async fn set_session_tool_filter(
1251                &self,
1252                id: &meerkat_core::types::SessionId,
1253                filter: meerkat_core::ToolFilter,
1254            ) -> Result<(), SessionError> {
1255                self.inner.set_session_tool_filter(id, filter).await
1256            }
1257            async fn read(
1258                &self,
1259                id: &meerkat_core::types::SessionId,
1260            ) -> Result<meerkat_core::service::SessionView, SessionError> {
1261                self.inner.read(id).await
1262            }
1263            async fn list(
1264                &self,
1265                query: meerkat_core::service::SessionQuery,
1266            ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
1267                self.inner.list(query).await
1268            }
1269            async fn archive(
1270                &self,
1271                id: &meerkat_core::types::SessionId,
1272            ) -> Result<(), SessionError> {
1273                self.inner.archive(id).await
1274            }
1275            async fn subscribe_session_events(
1276                &self,
1277                id: &meerkat_core::types::SessionId,
1278            ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1279                meerkat_core::service::SessionService::subscribe_session_events(
1280                    self.inner.as_ref(),
1281                    id,
1282                )
1283                .await
1284            }
1285        }
1286
1287        #[async_trait]
1288        impl meerkat_core::service::SessionServiceCommsExt for $wrapper {
1289            async fn comms_runtime(
1290                &self,
1291                id: &meerkat_core::types::SessionId,
1292            ) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
1293                self.inner.comms_runtime(id).await
1294            }
1295
1296            async fn event_injector(
1297                &self,
1298                id: &meerkat_core::types::SessionId,
1299            ) -> Option<Arc<dyn meerkat_core::EventInjector>> {
1300                self.inner.event_injector(id).await
1301            }
1302
1303            async fn interaction_event_injector(
1304                &self,
1305                id: &meerkat_core::types::SessionId,
1306            ) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
1307                self.inner.interaction_event_injector(id).await
1308            }
1309        }
1310
1311        #[async_trait]
1312        impl meerkat_core::service::SessionServiceControlExt for $wrapper {
1313            async fn append_system_context(
1314                &self,
1315                id: &meerkat_core::types::SessionId,
1316                req: meerkat_core::service::AppendSystemContextRequest,
1317            ) -> Result<
1318                meerkat_core::service::AppendSystemContextResult,
1319                meerkat_core::service::SessionControlError,
1320            > {
1321                self.inner.append_system_context(id, req).await
1322            }
1323            async fn stage_tool_results(
1324                &self,
1325                id: &meerkat_core::types::SessionId,
1326                req: meerkat_core::service::StageToolResultsRequest,
1327            ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
1328                self.inner.stage_tool_results(id, req).await
1329            }
1330        }
1331
1332        #[async_trait]
1333        impl meerkat_core::service::SessionServiceHistoryExt for $wrapper {
1334            async fn read_history(
1335                &self,
1336                id: &meerkat_core::types::SessionId,
1337                query: meerkat_core::service::SessionHistoryQuery,
1338            ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
1339                self.inner.read_history(id, query).await
1340            }
1341        }
1342
1343        #[async_trait]
1344        impl MobSessionService for $wrapper {
1345            fn supports_persistent_sessions(&self) -> bool {
1346                self.inner.supports_persistent_sessions()
1347            }
1348            fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
1349                self.runtime_adapter_override
1350                    .clone()
1351                    .or_else(|| self.inner.runtime_adapter())
1352            }
1353            async fn interrupt_with_machine_authority(
1354                &self,
1355                session_id: &meerkat_core::types::SessionId,
1356                authority: meerkat_runtime::MachineSessionControlAuthority,
1357            ) -> Result<(), SessionError> {
1358                self.inner
1359                    .interrupt_with_machine_authority(session_id, authority)
1360                    .await
1361            }
1362            async fn cancel_after_boundary_with_machine_authority(
1363                &self,
1364                session_id: &meerkat_core::types::SessionId,
1365                authority: meerkat_runtime::MachineSessionControlAuthority,
1366            ) -> Result<(), SessionError> {
1367                self.inner
1368                    .cancel_after_boundary_with_machine_authority(session_id, authority)
1369                    .await
1370            }
1371            async fn session_belongs_to_mob(
1372                &self,
1373                session_id: &meerkat_core::types::SessionId,
1374                mob_id: &meerkat_mob::MobId,
1375            ) -> bool {
1376                self.inner.session_belongs_to_mob(session_id, mob_id).await
1377            }
1378            async fn load_persisted_session(
1379                &self,
1380                session_id: &meerkat_core::types::SessionId,
1381            ) -> Result<Option<meerkat_core::session::Session>, SessionError> {
1382                self.inner.load_persisted_session(session_id).await
1383            }
1384            async fn subscribe_session_events(
1385                &self,
1386                session_id: &meerkat_core::types::SessionId,
1387            ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1388                meerkat_mob::MobSessionService::subscribe_session_events(
1389                    self.inner.as_ref(),
1390                    session_id,
1391                )
1392                .await
1393            }
1394            async fn archive_with_mob_lifecycle_authority(
1395                &self,
1396                session_id: &meerkat_core::types::SessionId,
1397            ) -> Result<(), SessionError> {
1398                self.inner
1399                    .archive_with_mob_lifecycle_authority(session_id)
1400                    .await
1401            }
1402            async fn execution_snapshot(
1403                &self,
1404                session_id: &meerkat_core::types::SessionId,
1405            ) -> Result<Option<meerkat_core::agent::AgentExecutionSnapshot>, SessionError> {
1406                self.inner.execution_snapshot(session_id).await
1407            }
1408            async fn tool_scope_snapshot(
1409                &self,
1410                session_id: &meerkat_core::types::SessionId,
1411            ) -> Result<Option<meerkat_core::ToolScopeSnapshot>, SessionError> {
1412                self.inner.tool_scope_snapshot(session_id).await
1413            }
1414            async fn external_tool_surface_snapshot(
1415                &self,
1416                session_id: &meerkat_core::types::SessionId,
1417            ) -> Result<Option<meerkat_core::ExternalToolSurfaceSnapshot>, SessionError> {
1418                self.inner.external_tool_surface_snapshot(session_id).await
1419            }
1420            async fn peer_ingress_runtime_snapshot(
1421                &self,
1422                session_id: &meerkat_core::types::SessionId,
1423            ) -> Result<Option<meerkat_core::PeerIngressRuntimeSnapshot>, SessionError> {
1424                self.inner.peer_ingress_runtime_snapshot(session_id).await
1425            }
1426            async fn apply_runtime_turn(
1427                &self,
1428                session_id: &meerkat_core::types::SessionId,
1429                run_id: meerkat_core::lifecycle::RunId,
1430                req: meerkat_core::service::StartTurnRequest,
1431                boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1432                contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1433            ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1434                #[cfg(test)]
1435                let boundary_name = format!("{boundary:?}");
1436                #[cfg(test)]
1437                let contributing_count = contributing_input_ids.len();
1438                let run_id_for_log = run_id.to_string();
1439                let prompt_summary = if runtime_turn_diagnostics_enabled() {
1440                    Some(summarize_runtime_prompt(&req.prompt))
1441                } else {
1442                    None
1443                };
1444                if let Some(summary) = prompt_summary.as_ref() {
1445                    tracing::warn!(
1446                        session_id = %session_id,
1447                        run_id = %run_id_for_log,
1448                        boundary = ?boundary,
1449                        contributing_inputs = contributing_input_ids.len(),
1450                        prompt = %summary,
1451                        runtime = ?req.runtime,
1452                        "mobkit runtime turn start"
1453                    );
1454                }
1455                let result = self
1456                    .inner
1457                    .apply_runtime_turn(
1458                        session_id,
1459                        run_id,
1460                        normalize_runtime_turn_request(req),
1461                        boundary,
1462                        contributing_input_ids,
1463                    )
1464                    .await;
1465                #[cfg(test)]
1466                record_runtime_turn_trace(RuntimeTurnTrace {
1467                    session_id: session_id.to_string(),
1468                    boundary: boundary_name,
1469                    contributing_input_count: contributing_count,
1470                    outcome: match &result {
1471                        Ok(_) => "ok".to_string(),
1472                        Err(error) => format!("err:{error}"),
1473                    },
1474                });
1475                if runtime_turn_diagnostics_enabled() {
1476                    match &result {
1477                        Ok(_) => tracing::warn!(
1478                            session_id = %session_id,
1479                            run_id = %run_id_for_log,
1480                            "mobkit runtime turn ok"
1481                        ),
1482                        Err(error) => tracing::error!(
1483                            session_id = %session_id,
1484                            run_id = %run_id_for_log,
1485                            error = %error,
1486                            error_debug = ?error,
1487                            "mobkit runtime turn error"
1488                        ),
1489                    }
1490                }
1491                result
1492            }
1493            async fn apply_runtime_context_appends(
1494                &self,
1495                session_id: &meerkat_core::types::SessionId,
1496                run_id: meerkat_core::lifecycle::RunId,
1497                appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1498                contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1499            ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1500                self.inner
1501                    .apply_runtime_context_appends(
1502                        session_id,
1503                        run_id,
1504                        appends,
1505                        contributing_input_ids,
1506                    )
1507                    .await
1508            }
1509            async fn apply_runtime_context_appends_with_boundary(
1510                &self,
1511                session_id: &meerkat_core::types::SessionId,
1512                run_id: meerkat_core::lifecycle::RunId,
1513                appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1514                boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1515                contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1516            ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1517                self.inner
1518                    .apply_runtime_context_appends_with_boundary(
1519                        session_id,
1520                        run_id,
1521                        appends,
1522                        boundary,
1523                        contributing_input_ids,
1524                    )
1525                    .await
1526            }
1527            async fn apply_runtime_system_context_for_turn(
1528                &self,
1529                session_id: &meerkat_core::types::SessionId,
1530                appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1531            ) -> Result<(), SessionError> {
1532                self.inner
1533                    .apply_runtime_system_context_for_turn(session_id, appends)
1534                    .await
1535            }
1536            async fn stage_runtime_system_context_for_active_turn(
1537                &self,
1538                session_id: &meerkat_core::types::SessionId,
1539                expected_run_id: &meerkat_core::lifecycle::RunId,
1540                appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1541            ) -> Result<Option<Vec<u8>>, SessionError> {
1542                self.inner
1543                    .stage_runtime_system_context_for_active_turn(
1544                        session_id,
1545                        expected_run_id,
1546                        appends,
1547                    )
1548                    .await
1549            }
1550            async fn discard_runtime_system_context_for_active_turn(
1551                &self,
1552                session_id: &meerkat_core::types::SessionId,
1553                expected_run_id: &meerkat_core::lifecycle::RunId,
1554                idempotency_keys: Vec<String>,
1555            ) -> Result<(), SessionError> {
1556                self.inner
1557                    .discard_runtime_system_context_for_active_turn(
1558                        session_id,
1559                        expected_run_id,
1560                        idempotency_keys,
1561                    )
1562                    .await
1563            }
1564            async fn active_turn_system_context_boundary_available(
1565                &self,
1566                session_id: &meerkat_core::types::SessionId,
1567            ) -> Result<Option<bool>, SessionError> {
1568                self.inner
1569                    .active_turn_system_context_boundary_available(session_id)
1570                    .await
1571            }
1572            async fn discard_live_session(
1573                &self,
1574                session_id: &meerkat_core::types::SessionId,
1575            ) -> Result<(), SessionError> {
1576                self.inner.discard_live_session(session_id).await
1577            }
1578            async fn checkpoint_committed_runtime_session_snapshot(
1579                &self,
1580                session_id: &meerkat_core::types::SessionId,
1581                session_snapshot: &[u8],
1582            ) -> Result<(), SessionError> {
1583                self.inner
1584                    .checkpoint_committed_runtime_session_snapshot(session_id, session_snapshot)
1585                    .await
1586            }
1587            async fn cancel_all_checkpointers(&self) {
1588                self.inner.cancel_all_checkpointers().await;
1589            }
1590            async fn rearm_all_checkpointers(&self) {
1591                self.inner.rearm_all_checkpointers().await;
1592            }
1593        }
1594    };
1595}
1596
1597delegate_mob_session_service!(PreBuildMobSessionService);
1598
1599/// Wraps a `MobSessionService` to fire an `AfterCreateHook` after each
1600/// successful `create_session`. Unlike `PreBuildMobSessionService`, this
1601/// captures context **after** the inner service (including any pre-build hooks)
1602/// has finished, so the context reflects all mutations.
1603struct AfterCreateMobSessionService {
1604    inner: Arc<dyn MobSessionService>,
1605    after_hook: AfterCreateHook,
1606}
1607
1608#[async_trait]
1609impl meerkat_core::service::SessionService for AfterCreateMobSessionService {
1610    async fn create_session(
1611        &self,
1612        mut req: CreateSessionRequest,
1613    ) -> Result<meerkat_core::types::RunResult, SessionError> {
1614        sanitize_create_session_request_llm_override(&mut req);
1615        // Capture pre-create context from the request (before inner consumes it).
1616        // The inner service's pre-build hooks may mutate the request further,
1617        // but we capture here because we can't read the request after inner
1618        // consumes it. The pre-build hook runs inside inner.create_session.
1619        //
1620        // For accurate post-mutation context, we re-read from the request
1621        // that was already mutated by any outer hooks, and accept that inner
1622        // hooks are not visible here. This is the correct trade-off: the
1623        // after_create context matches the request as seen by this layer.
1624        let ctx = SessionCreatedContext {
1625            model: req.model.clone(),
1626            labels: req.labels.clone().unwrap_or_default(),
1627            system_prompt: req.system_prompt.clone(),
1628        };
1629        let result = self.inner.create_session(req).await?;
1630        (self.after_hook)(result.session_id.clone(), ctx).await;
1631        Ok(result)
1632    }
1633    async fn start_turn(
1634        &self,
1635        id: &meerkat_core::types::SessionId,
1636        req: meerkat_core::service::StartTurnRequest,
1637    ) -> Result<meerkat_core::types::RunResult, SessionError> {
1638        self.inner.start_turn(id, req).await
1639    }
1640    async fn interrupt(&self, id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
1641        self.inner.interrupt(id).await
1642    }
1643    async fn cancel_after_boundary(
1644        &self,
1645        id: &meerkat_core::types::SessionId,
1646    ) -> Result<(), SessionError> {
1647        self.inner.cancel_after_boundary(id).await
1648    }
1649    async fn set_session_client(
1650        &self,
1651        id: &meerkat_core::types::SessionId,
1652        client: Arc<dyn meerkat_core::AgentLlmClient>,
1653    ) -> Result<(), SessionError> {
1654        self.inner
1655            .set_session_client(id, ReplaySanitizingAgentLlmClient::wrap(client))
1656            .await
1657    }
1658    async fn hot_swap_session_llm_identity(
1659        &self,
1660        id: &meerkat_core::types::SessionId,
1661        client: Arc<dyn meerkat_core::AgentLlmClient>,
1662        identity: meerkat_core::session::SessionLlmIdentity,
1663        request_policy: meerkat_core::SessionLlmRequestPolicy,
1664    ) -> Result<(), SessionError> {
1665        self.inner
1666            .hot_swap_session_llm_identity(
1667                id,
1668                ReplaySanitizingAgentLlmClient::wrap(client),
1669                identity,
1670                request_policy,
1671            )
1672            .await
1673    }
1674    async fn update_session_keep_alive(
1675        &self,
1676        id: &meerkat_core::types::SessionId,
1677        keep_alive: bool,
1678    ) -> Result<(), SessionError> {
1679        self.inner.update_session_keep_alive(id, keep_alive).await
1680    }
1681    async fn update_session_mob_authority_context(
1682        &self,
1683        id: &meerkat_core::types::SessionId,
1684        authority_context: Option<meerkat_core::service::MobToolAuthorityContext>,
1685    ) -> Result<(), SessionError> {
1686        self.inner
1687            .update_session_mob_authority_context(id, authority_context)
1688            .await
1689    }
1690    async fn has_live_session(
1691        &self,
1692        id: &meerkat_core::types::SessionId,
1693    ) -> Result<bool, SessionError> {
1694        self.inner.has_live_session(id).await
1695    }
1696    async fn set_session_tool_visibility_state(
1697        &self,
1698        id: &meerkat_core::types::SessionId,
1699        state: Option<meerkat_core::SessionToolVisibilityState>,
1700    ) -> Result<(), SessionError> {
1701        self.inner
1702            .set_session_tool_visibility_state(id, state)
1703            .await
1704    }
1705    async fn set_session_tool_filter(
1706        &self,
1707        id: &meerkat_core::types::SessionId,
1708        filter: meerkat_core::ToolFilter,
1709    ) -> Result<(), SessionError> {
1710        self.inner.set_session_tool_filter(id, filter).await
1711    }
1712    async fn read(
1713        &self,
1714        id: &meerkat_core::types::SessionId,
1715    ) -> Result<meerkat_core::service::SessionView, SessionError> {
1716        self.inner.read(id).await
1717    }
1718    async fn list(
1719        &self,
1720        query: meerkat_core::service::SessionQuery,
1721    ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
1722        self.inner.list(query).await
1723    }
1724    async fn archive(&self, id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
1725        self.inner.archive(id).await
1726    }
1727    async fn subscribe_session_events(
1728        &self,
1729        id: &meerkat_core::types::SessionId,
1730    ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1731        meerkat_core::service::SessionService::subscribe_session_events(self.inner.as_ref(), id)
1732            .await
1733    }
1734}
1735
1736#[async_trait]
1737impl meerkat_core::service::SessionServiceCommsExt for AfterCreateMobSessionService {
1738    async fn comms_runtime(
1739        &self,
1740        id: &meerkat_core::types::SessionId,
1741    ) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
1742        self.inner.comms_runtime(id).await
1743    }
1744
1745    async fn event_injector(
1746        &self,
1747        id: &meerkat_core::types::SessionId,
1748    ) -> Option<Arc<dyn meerkat_core::EventInjector>> {
1749        self.inner.event_injector(id).await
1750    }
1751
1752    async fn interaction_event_injector(
1753        &self,
1754        id: &meerkat_core::types::SessionId,
1755    ) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
1756        self.inner.interaction_event_injector(id).await
1757    }
1758}
1759
1760#[async_trait]
1761impl meerkat_core::service::SessionServiceControlExt for AfterCreateMobSessionService {
1762    async fn append_system_context(
1763        &self,
1764        id: &meerkat_core::types::SessionId,
1765        req: meerkat_core::service::AppendSystemContextRequest,
1766    ) -> Result<
1767        meerkat_core::service::AppendSystemContextResult,
1768        meerkat_core::service::SessionControlError,
1769    > {
1770        self.inner.append_system_context(id, req).await
1771    }
1772    async fn stage_tool_results(
1773        &self,
1774        id: &meerkat_core::types::SessionId,
1775        req: meerkat_core::service::StageToolResultsRequest,
1776    ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
1777        self.inner.stage_tool_results(id, req).await
1778    }
1779}
1780
1781#[async_trait]
1782impl meerkat_core::service::SessionServiceHistoryExt for AfterCreateMobSessionService {
1783    async fn read_history(
1784        &self,
1785        id: &meerkat_core::types::SessionId,
1786        query: meerkat_core::service::SessionHistoryQuery,
1787    ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
1788        self.inner.read_history(id, query).await
1789    }
1790}
1791
1792#[async_trait]
1793impl MobSessionService for AfterCreateMobSessionService {
1794    fn supports_persistent_sessions(&self) -> bool {
1795        self.inner.supports_persistent_sessions()
1796    }
1797    fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
1798        self.inner.runtime_adapter()
1799    }
1800    async fn interrupt_with_machine_authority(
1801        &self,
1802        session_id: &meerkat_core::types::SessionId,
1803        authority: meerkat_runtime::MachineSessionControlAuthority,
1804    ) -> Result<(), SessionError> {
1805        self.inner
1806            .interrupt_with_machine_authority(session_id, authority)
1807            .await
1808    }
1809    async fn cancel_after_boundary_with_machine_authority(
1810        &self,
1811        session_id: &meerkat_core::types::SessionId,
1812        authority: meerkat_runtime::MachineSessionControlAuthority,
1813    ) -> Result<(), SessionError> {
1814        self.inner
1815            .cancel_after_boundary_with_machine_authority(session_id, authority)
1816            .await
1817    }
1818    async fn session_belongs_to_mob(
1819        &self,
1820        session_id: &meerkat_core::types::SessionId,
1821        mob_id: &meerkat_mob::MobId,
1822    ) -> bool {
1823        self.inner.session_belongs_to_mob(session_id, mob_id).await
1824    }
1825    async fn load_persisted_session(
1826        &self,
1827        session_id: &meerkat_core::types::SessionId,
1828    ) -> Result<Option<meerkat_core::session::Session>, SessionError> {
1829        self.inner.load_persisted_session(session_id).await
1830    }
1831    async fn subscribe_session_events(
1832        &self,
1833        session_id: &meerkat_core::types::SessionId,
1834    ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1835        meerkat_mob::MobSessionService::subscribe_session_events(self.inner.as_ref(), session_id)
1836            .await
1837    }
1838    async fn archive_with_mob_lifecycle_authority(
1839        &self,
1840        session_id: &meerkat_core::types::SessionId,
1841    ) -> Result<(), SessionError> {
1842        self.inner
1843            .archive_with_mob_lifecycle_authority(session_id)
1844            .await
1845    }
1846    async fn execution_snapshot(
1847        &self,
1848        session_id: &meerkat_core::types::SessionId,
1849    ) -> Result<Option<meerkat_core::agent::AgentExecutionSnapshot>, SessionError> {
1850        self.inner.execution_snapshot(session_id).await
1851    }
1852    async fn tool_scope_snapshot(
1853        &self,
1854        session_id: &meerkat_core::types::SessionId,
1855    ) -> Result<Option<meerkat_core::ToolScopeSnapshot>, SessionError> {
1856        self.inner.tool_scope_snapshot(session_id).await
1857    }
1858    async fn external_tool_surface_snapshot(
1859        &self,
1860        session_id: &meerkat_core::types::SessionId,
1861    ) -> Result<Option<meerkat_core::ExternalToolSurfaceSnapshot>, SessionError> {
1862        self.inner.external_tool_surface_snapshot(session_id).await
1863    }
1864    async fn peer_ingress_runtime_snapshot(
1865        &self,
1866        session_id: &meerkat_core::types::SessionId,
1867    ) -> Result<Option<meerkat_core::PeerIngressRuntimeSnapshot>, SessionError> {
1868        self.inner.peer_ingress_runtime_snapshot(session_id).await
1869    }
1870    async fn apply_runtime_turn(
1871        &self,
1872        session_id: &meerkat_core::types::SessionId,
1873        run_id: meerkat_core::lifecycle::RunId,
1874        req: meerkat_core::service::StartTurnRequest,
1875        boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1876        contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1877    ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1878        self.inner
1879            .apply_runtime_turn(session_id, run_id, req, boundary, contributing_input_ids)
1880            .await
1881    }
1882    async fn apply_runtime_context_appends(
1883        &self,
1884        session_id: &meerkat_core::types::SessionId,
1885        run_id: meerkat_core::lifecycle::RunId,
1886        appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1887        contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1888    ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1889        self.inner
1890            .apply_runtime_context_appends(session_id, run_id, appends, contributing_input_ids)
1891            .await
1892    }
1893    async fn apply_runtime_context_appends_with_boundary(
1894        &self,
1895        session_id: &meerkat_core::types::SessionId,
1896        run_id: meerkat_core::lifecycle::RunId,
1897        appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1898        boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1899        contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1900    ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1901        self.inner
1902            .apply_runtime_context_appends_with_boundary(
1903                session_id,
1904                run_id,
1905                appends,
1906                boundary,
1907                contributing_input_ids,
1908            )
1909            .await
1910    }
1911    async fn apply_runtime_system_context_for_turn(
1912        &self,
1913        session_id: &meerkat_core::types::SessionId,
1914        appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1915    ) -> Result<(), SessionError> {
1916        self.inner
1917            .apply_runtime_system_context_for_turn(session_id, appends)
1918            .await
1919    }
1920    async fn stage_runtime_system_context_for_active_turn(
1921        &self,
1922        session_id: &meerkat_core::types::SessionId,
1923        expected_run_id: &meerkat_core::lifecycle::RunId,
1924        appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1925    ) -> Result<Option<Vec<u8>>, SessionError> {
1926        self.inner
1927            .stage_runtime_system_context_for_active_turn(session_id, expected_run_id, appends)
1928            .await
1929    }
1930    async fn discard_runtime_system_context_for_active_turn(
1931        &self,
1932        session_id: &meerkat_core::types::SessionId,
1933        expected_run_id: &meerkat_core::lifecycle::RunId,
1934        idempotency_keys: Vec<String>,
1935    ) -> Result<(), SessionError> {
1936        self.inner
1937            .discard_runtime_system_context_for_active_turn(
1938                session_id,
1939                expected_run_id,
1940                idempotency_keys,
1941            )
1942            .await
1943    }
1944    async fn active_turn_system_context_boundary_available(
1945        &self,
1946        session_id: &meerkat_core::types::SessionId,
1947    ) -> Result<Option<bool>, SessionError> {
1948        self.inner
1949            .active_turn_system_context_boundary_available(session_id)
1950            .await
1951    }
1952    async fn discard_live_session(
1953        &self,
1954        session_id: &meerkat_core::types::SessionId,
1955    ) -> Result<(), SessionError> {
1956        self.inner.discard_live_session(session_id).await
1957    }
1958    async fn checkpoint_committed_runtime_session_snapshot(
1959        &self,
1960        session_id: &meerkat_core::types::SessionId,
1961        session_snapshot: &[u8],
1962    ) -> Result<(), SessionError> {
1963        self.inner
1964            .checkpoint_committed_runtime_session_snapshot(session_id, session_snapshot)
1965            .await
1966    }
1967    async fn cancel_all_checkpointers(&self) {
1968        self.inner.cancel_all_checkpointers().await;
1969    }
1970    async fn rearm_all_checkpointers(&self) {
1971        self.inner.rearm_all_checkpointers().await;
1972    }
1973}
1974
1975/// Specification for bootstrapping a mob runtime from a definition, storage, and session service.
1976pub struct MobBootstrapSpec {
1977    pub definition: MobDefinition,
1978    pub storage: MobStorage,
1979    pub session_service: Arc<dyn MobSessionService>,
1980    pub binary_blob_store: Option<Arc<dyn BinaryBlobStore>>,
1981    pub(crate) agent_mob_mcp_state: Option<Arc<meerkat_mob_mcp::MobMcpState>>,
1982    pub(crate) implicit_delegate_retirement_overrides: Option<ImplicitDelegateRetirementOverrides>,
1983    pub(crate) agent_mob_default_llm_client_slot: Option<SharedDefaultLlmClientSlot>,
1984    pub options: MobBootstrapOptions,
1985    /// Explicit runtime adapter — bypasses `session_service.runtime_adapter()`.
1986    ///
1987    /// Used by `persistent()` to supply the adapter directly so the session
1988    /// service's `runtime_store` can stay `None` (keeping the checkpointer
1989    /// enabled). See meerkat-session#checkpointer-enabled-flag.
1990    pub runtime_adapter: Option<Arc<meerkat_runtime::MeerkatMachine>>,
1991    /// Holds the ephemeral temp directory alive for the lifetime of the spec.
1992    /// Only populated when the builder creates an ephemeral runtime.
1993    pub(crate) _ephemeral_dir: Option<Arc<tempfile::TempDir>>,
1994}
1995
1996impl MobBootstrapSpec {
1997    pub fn new(
1998        definition: MobDefinition,
1999        storage: MobStorage,
2000        session_service: Arc<dyn MobSessionService>,
2001    ) -> Self {
2002        let session_service = Arc::new(PreBuildMobSessionService {
2003            inner: session_service,
2004            hook: no_op_pre_build_hook(),
2005            after_create_hook: None,
2006            runtime_adapter_override: None,
2007        }) as Arc<dyn MobSessionService>;
2008        Self {
2009            definition,
2010            storage,
2011            session_service,
2012            binary_blob_store: None,
2013            agent_mob_mcp_state: None,
2014            implicit_delegate_retirement_overrides: None,
2015            agent_mob_default_llm_client_slot: None,
2016            options: MobBootstrapOptions {
2017                allow_ephemeral_sessions: true,
2018                notify_orchestrator_on_resume: true,
2019                default_llm_client: None,
2020            },
2021            runtime_adapter: None,
2022            _ephemeral_dir: None,
2023        }
2024    }
2025
2026    pub fn with_options(mut self, options: MobBootstrapOptions) -> Self {
2027        self.options = options;
2028        self
2029    }
2030
2031    /// Expose a runtime adapter through the session-service facade.
2032    ///
2033    /// Custom embedders that construct their own `MobSessionService` still need
2034    /// MobKit's session-service surface to report the same runtime authority
2035    /// that `MobBuilder::with_runtime_adapter(...)` receives. This keeps
2036    /// autonomous-host comms, runtime inspection, and control paths pointed at
2037    /// one machine without forcing embedders through the stock factory helpers.
2038    pub fn with_session_runtime_adapter(
2039        mut self,
2040        adapter: Arc<meerkat_runtime::MeerkatMachine>,
2041    ) -> Self {
2042        self.session_service = Arc::new(PreBuildMobSessionService {
2043            inner: self.session_service,
2044            hook: no_op_pre_build_hook(),
2045            after_create_hook: None,
2046            runtime_adapter_override: Some(adapter),
2047        });
2048        self
2049    }
2050
2051    /// Wrap the session service with an after-create hook that fires after
2052    /// each successful `create_session`. The hook is best-effort: errors are
2053    /// not propagated. Uses `AfterCreateMobSessionService` which wraps the
2054    /// inner service without a pre-build hook, so any pre-build mutations
2055    /// from inner wrappers are fully reflected in the context.
2056    pub fn with_after_create_hook(mut self, hook: AfterCreateHook) -> Self {
2057        self.session_service = Arc::new(AfterCreateMobSessionService {
2058            inner: self.session_service,
2059            after_hook: hook,
2060        });
2061        self
2062    }
2063
2064    /// Build an ephemeral session service with a correctly wired `AgentFactory`.
2065    ///
2066    /// If `session_store` is provided, it is set on the `FactoryAgentBuilder` so
2067    /// that agents use the given store instead of falling back to JSONL.
2068    pub fn ephemeral(
2069        definition: MobDefinition,
2070        storage: MobStorage,
2071        store_path: PathBuf,
2072        max_sessions: usize,
2073        session_store: Option<Arc<dyn AgentSessionStore>>,
2074    ) -> Self {
2075        Self::ephemeral_inner(
2076            definition,
2077            storage,
2078            store_path,
2079            max_sessions,
2080            session_store,
2081            None,
2082            CapabilityFlags::default(),
2083            None,
2084            None,
2085        )
2086    }
2087
2088    /// Like [`ephemeral`](Self::ephemeral), but with a pre-build hook that is
2089    /// called before each agent is constructed. Use this to inject external
2090    /// tools, augment system prompts, or set per-agent labels.
2091    pub fn ephemeral_with_hook(
2092        definition: MobDefinition,
2093        storage: MobStorage,
2094        store_path: PathBuf,
2095        max_sessions: usize,
2096        session_store: Option<Arc<dyn AgentSessionStore>>,
2097        hook: impl Fn(
2098            &mut CreateSessionRequest,
2099        ) -> std::pin::Pin<
2100            Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
2101        > + Send
2102        + Sync
2103        + 'static,
2104    ) -> Self {
2105        Self::ephemeral_inner(
2106            definition,
2107            storage,
2108            store_path,
2109            max_sessions,
2110            session_store,
2111            Some(Arc::new(hook)),
2112            CapabilityFlags::default(),
2113            None,
2114            None,
2115        )
2116    }
2117
2118    #[allow(clippy::too_many_arguments)]
2119    pub(crate) fn ephemeral_inner(
2120        definition: MobDefinition,
2121        storage: MobStorage,
2122        store_path: PathBuf,
2123        max_sessions: usize,
2124        session_store: Option<Arc<dyn AgentSessionStore>>,
2125        hook: Option<PreBuildHook>,
2126        mut caps: CapabilityFlags,
2127        after_create_hook: Option<AfterCreateHook>,
2128        agent_config: Option<Config>,
2129    ) -> Self {
2130        caps.image_generation |= mob_definition_may_use_image_generation(&definition);
2131        let binary_blob_store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
2132        let blob_store: Arc<dyn meerkat_core::BlobStore> =
2133            Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
2134        let runtime_adapter = if caps.image_generation {
2135            let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2136                Arc::new(meerkat_runtime::InMemoryRuntimeStore::new());
2137            Some(Arc::new(meerkat_runtime::MeerkatMachine::persistent(
2138                runtime_store,
2139                Arc::clone(&blob_store),
2140            )))
2141        } else {
2142            None
2143        };
2144        let mut factory = AgentFactory::new(&store_path)
2145            .builtins(caps.builtins)
2146            .shell(caps.shell)
2147            .mob(caps.mob)
2148            .comms(caps.comms)
2149            .memory(caps.memory);
2150        if let Some(machine) = runtime_adapter.clone() {
2151            factory = factory.with_image_generation_machine(machine);
2152        }
2153        let config = agent_config.unwrap_or_default();
2154        let mut builder = FactoryAgentBuilder::new(factory, config);
2155        builder.default_blob_store = Some(blob_store);
2156        if let Some(store) = session_store {
2157            builder.default_session_store = Some(store);
2158        }
2159        let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
2160        let session_service: Arc<dyn MobSessionService> = Arc::new(
2161            meerkat_session::EphemeralSessionService::new(builder, max_sessions),
2162        );
2163        let hook = hook.unwrap_or_else(no_op_pre_build_hook);
2164        let after_create_hook = if let Some(runtime_adapter) = runtime_adapter.clone() {
2165            let user_after_create_hook = after_create_hook.clone();
2166            Some(Arc::new(
2167                move |session_id: meerkat_core::types::SessionId, ctx: SessionCreatedContext| {
2168                    let runtime_adapter = runtime_adapter.clone();
2169                    let user_after_create_hook = user_after_create_hook.clone();
2170                    Box::pin(async move {
2171                        runtime_adapter.register_session(session_id.clone()).await;
2172                        if let Some(user_after_create_hook) = user_after_create_hook {
2173                            user_after_create_hook(session_id, ctx).await;
2174                        }
2175                    })
2176                        as std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
2177                },
2178            ) as AfterCreateHook)
2179        } else {
2180            after_create_hook
2181        };
2182        let session_service = Arc::new(PreBuildMobSessionService {
2183            inner: session_service,
2184            hook,
2185            after_create_hook,
2186            runtime_adapter_override: runtime_adapter.clone(),
2187        }) as Arc<dyn MobSessionService>;
2188        let (
2189            agent_mob_mcp_state,
2190            implicit_delegate_retirement_overrides,
2191            agent_mob_default_llm_client_slot,
2192        ) = install_agent_mob_tools(&definition, mob_tools_slot, Arc::clone(&session_service));
2193        let mut spec = Self::new(definition, storage, session_service);
2194        spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
2195        spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
2196        spec.agent_mob_default_llm_client_slot = Some(agent_mob_default_llm_client_slot);
2197        spec.runtime_adapter = runtime_adapter;
2198        spec.binary_blob_store = Some(binary_blob_store);
2199        spec
2200    }
2201
2202    /// Build a persistent session service with a correctly wired `AgentFactory`.
2203    ///
2204    /// The `session_store` is used in two places:
2205    /// 1. As the persistence backend for `PersistentSessionService` (checkpoint/restore).
2206    /// 2. Adapted via `StoreAdapter` and set on `FactoryAgentBuilder.default_session_store`
2207    ///    so that agents use it directly instead of falling back to JSONL.
2208    pub fn persistent(
2209        definition: MobDefinition,
2210        storage: MobStorage,
2211        store_path: PathBuf,
2212        max_sessions: usize,
2213        session_store: Arc<dyn SessionStore>,
2214    ) -> Self {
2215        Self::persistent_inner(
2216            definition,
2217            storage,
2218            store_path,
2219            max_sessions,
2220            session_store,
2221            None,
2222            None,
2223            CapabilityFlags::default(),
2224            None,
2225            None,
2226        )
2227    }
2228
2229    /// Like [`persistent`](Self::persistent), but with a pre-build hook that
2230    /// is called before each agent is constructed. Use this to inject external
2231    /// tools, augment system prompts, or set per-agent labels.
2232    pub fn persistent_with_hook(
2233        definition: MobDefinition,
2234        storage: MobStorage,
2235        store_path: PathBuf,
2236        max_sessions: usize,
2237        session_store: Arc<dyn SessionStore>,
2238        hook: impl Fn(
2239            &mut CreateSessionRequest,
2240        ) -> std::pin::Pin<
2241            Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
2242        > + Send
2243        + Sync
2244        + 'static,
2245    ) -> Self {
2246        Self::persistent_inner(
2247            definition,
2248            storage,
2249            store_path,
2250            max_sessions,
2251            session_store,
2252            None,
2253            Some(Arc::new(hook)),
2254            CapabilityFlags::default(),
2255            None,
2256            None,
2257        )
2258    }
2259
2260    #[allow(clippy::too_many_arguments)]
2261    pub(crate) fn persistent_inner(
2262        definition: MobDefinition,
2263        storage: MobStorage,
2264        store_path: PathBuf,
2265        max_sessions: usize,
2266        session_store: Arc<dyn SessionStore>,
2267        custom_blob_store: Option<Arc<dyn meerkat_core::BlobStore>>,
2268        hook: Option<PreBuildHook>,
2269        mut caps: CapabilityFlags,
2270        after_create_hook: Option<AfterCreateHook>,
2271        agent_config: Option<Config>,
2272    ) -> Self {
2273        caps.image_generation |= mob_definition_may_use_image_generation(&definition);
2274        let (binary_blob_store, blob_store): (
2275            Arc<dyn BinaryBlobStore>,
2276            Arc<dyn meerkat_core::BlobStore>,
2277        ) = if let Some(blob_store) = custom_blob_store {
2278            (
2279                Arc::new(BinaryBlobStoreAdapter::new(blob_store.clone())),
2280                blob_store,
2281            )
2282        } else {
2283            let binary_blob_store: Arc<dyn BinaryBlobStore> = match ObjectStoreBlobStore::local(
2284                store_path.join("blobs"),
2285            ) {
2286                Ok(store) => Arc::new(store),
2287                Err(err) => {
2288                    tracing::warn!(
2289                        error = %err,
2290                        "failed to initialize persistent binary blob store; falling back to in-memory blobs"
2291                    );
2292                    Arc::new(ObjectStoreBlobStore::memory())
2293                }
2294            };
2295            let blob_store: Arc<dyn meerkat_core::BlobStore> =
2296                Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
2297            (binary_blob_store, blob_store)
2298        };
2299        // Use a SQLite-backed runtime store so we get BOTH durability across
2300        // process restart AND control-op authority (archive/retire). The
2301        // earlier 0.6.1 wiring used `Some(InMemoryRuntimeStore)`, which was
2302        // a half-fix: it kept the session-service's runtime_store path on
2303        // (so `load_authoritative_session` resolved through runtime_store —
2304        // good for control ops), but the in-memory store died on restart so
2305        // resume failed. Switching the in-memory store for a persistent one
2306        // satisfies both. The store lives at `store_path/runtime.sqlite`,
2307        // sibling to whatever path the caller's `session_store` uses.
2308        let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2309            build_persistent_runtime_store(&store_path);
2310        let runtime_adapter = Arc::new(meerkat_runtime::MeerkatMachine::persistent(
2311            Arc::clone(&runtime_store),
2312            Arc::clone(&blob_store),
2313        ));
2314        let mut factory = AgentFactory::new(&store_path)
2315            .builtins(caps.builtins)
2316            .shell(caps.shell)
2317            .mob(caps.mob)
2318            .comms(caps.comms)
2319            .memory(caps.memory);
2320        if caps.image_generation {
2321            factory = factory.with_image_generation_machine(runtime_adapter.clone());
2322        }
2323        let config = agent_config.unwrap_or_default();
2324        let mut builder = FactoryAgentBuilder::new(factory, config);
2325        builder.default_session_store = Some(Arc::new(StoreAdapter::new(session_store.clone())));
2326        builder.default_blob_store = Some(blob_store.clone());
2327        let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
2328        let session_service: Arc<dyn MobSessionService> =
2329            Arc::new(meerkat_session::PersistentSessionService::new(
2330                builder,
2331                max_sessions,
2332                session_store,
2333                Some(runtime_store),
2334                blob_store,
2335            ));
2336        let hook = hook.unwrap_or_else(no_op_pre_build_hook);
2337        let session_service = Arc::new(PreBuildMobSessionService {
2338            inner: session_service,
2339            hook,
2340            after_create_hook,
2341            runtime_adapter_override: None,
2342        }) as Arc<dyn MobSessionService>;
2343        let (
2344            agent_mob_mcp_state,
2345            implicit_delegate_retirement_overrides,
2346            agent_mob_default_llm_client_slot,
2347        ) = install_agent_mob_tools(&definition, mob_tools_slot, Arc::clone(&session_service));
2348        let mut spec = Self::new(definition, storage, session_service);
2349        spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
2350        spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
2351        spec.agent_mob_default_llm_client_slot = Some(agent_mob_default_llm_client_slot);
2352        spec.runtime_adapter = Some(runtime_adapter);
2353        spec.binary_blob_store = Some(binary_blob_store);
2354        spec
2355    }
2356
2357    #[allow(clippy::too_many_arguments)]
2358    pub(crate) fn ephemeral_runtime_backed_inner(
2359        definition: MobDefinition,
2360        storage: MobStorage,
2361        store_path: PathBuf,
2362        max_sessions: usize,
2363        custom_session_store: Option<Arc<dyn SessionStore>>,
2364        custom_blob_store: Option<Arc<dyn meerkat_core::BlobStore>>,
2365        hook: Option<PreBuildHook>,
2366        mut caps: CapabilityFlags,
2367        after_create_hook: Option<AfterCreateHook>,
2368        agent_config: Option<Config>,
2369    ) -> Self {
2370        caps.image_generation |= mob_definition_may_use_image_generation(&definition);
2371        let config = agent_config.unwrap_or_default();
2372        let session_store: Arc<dyn SessionStore> = custom_session_store
2373            .clone()
2374            .unwrap_or_else(|| Arc::new(meerkat_store::MemoryStore::new()));
2375        let (binary_blob_store, blob_store): (
2376            Arc<dyn BinaryBlobStore>,
2377            Arc<dyn meerkat_core::BlobStore>,
2378        ) = if let Some(blob_store) = custom_blob_store {
2379            (
2380                Arc::new(BinaryBlobStoreAdapter::new(blob_store.clone())),
2381                blob_store,
2382            )
2383        } else {
2384            let binary_blob_store: Arc<dyn BinaryBlobStore> =
2385                Arc::new(ObjectStoreBlobStore::memory());
2386            let blob_store: Arc<dyn meerkat_core::BlobStore> =
2387                Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
2388            (binary_blob_store, blob_store)
2389        };
2390        // Runtime-backed ephemeral mode keeps the live EphemeralSessionService
2391        // as the comms authority, but registers each created session with the
2392        // same in-memory machine used by image generation. Meerkat 0.6.4's
2393        // persistent runtime-backed create path does not expose member comms
2394        // handles early enough for mob edge reconciliation; this bounded bridge
2395        // preserves live comms while avoiding the old "image tool sees the
2396        // session as destroyed" split-machine bug.
2397        let base_runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2398            Arc::new(meerkat_runtime::InMemoryRuntimeStore::new());
2399        let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2400            if let Some(custom_session_store) = custom_session_store.clone() {
2401                Arc::new(SessionStoreBackedRuntimeStore::new(
2402                    Arc::clone(&base_runtime_store),
2403                    custom_session_store,
2404                ))
2405            } else {
2406                Arc::clone(&base_runtime_store)
2407            };
2408        let runtime_adapter = Arc::new(meerkat_runtime::MeerkatMachine::persistent(
2409            Arc::clone(&runtime_store),
2410            Arc::clone(&blob_store),
2411        ));
2412        let mut factory = AgentFactory::new(&store_path)
2413            .builtins(caps.builtins)
2414            .shell(caps.shell)
2415            .mob(caps.mob)
2416            .comms(caps.comms)
2417            .memory(caps.memory);
2418        if caps.image_generation {
2419            factory = factory.with_image_generation_machine(runtime_adapter.clone());
2420        }
2421        let mut builder = FactoryAgentBuilder::new(factory, config);
2422        builder.default_session_store = Some(Arc::new(StoreAdapter::new(session_store.clone())));
2423        builder.default_blob_store = Some(blob_store.clone());
2424        let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
2425        let session_service: Arc<dyn MobSessionService> =
2426            if let Some(custom_session_store) = custom_session_store {
2427                Arc::new(meerkat_session::PersistentSessionService::new(
2428                    builder,
2429                    max_sessions,
2430                    custom_session_store,
2431                    Some(runtime_store.clone()),
2432                    blob_store,
2433                ))
2434            } else {
2435                Arc::new(meerkat_session::EphemeralSessionService::new(
2436                    builder,
2437                    max_sessions,
2438                ))
2439            };
2440        let hook = hook.unwrap_or_else(no_op_pre_build_hook);
2441        let runtime_adapter_for_after_create = runtime_adapter.clone();
2442        let combined_after_create_hook: AfterCreateHook = Arc::new(move |session_id, ctx| {
2443            let runtime_adapter = runtime_adapter_for_after_create.clone();
2444            let after_create_hook = after_create_hook.clone();
2445            Box::pin(async move {
2446                runtime_adapter.register_session(session_id.clone()).await;
2447                if let Some(after_create_hook) = after_create_hook {
2448                    after_create_hook(session_id, ctx).await;
2449                }
2450            })
2451        });
2452        let session_service = Arc::new(PreBuildMobSessionService {
2453            inner: session_service,
2454            hook,
2455            after_create_hook: Some(combined_after_create_hook),
2456            runtime_adapter_override: Some(runtime_adapter.clone()),
2457        }) as Arc<dyn MobSessionService>;
2458        let (
2459            agent_mob_mcp_state,
2460            implicit_delegate_retirement_overrides,
2461            agent_mob_default_llm_client_slot,
2462        ) = install_agent_mob_tools(&definition, mob_tools_slot, Arc::clone(&session_service));
2463        let mut spec = Self::new(definition, storage, session_service);
2464        spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
2465        spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
2466        spec.agent_mob_default_llm_client_slot = Some(agent_mob_default_llm_client_slot);
2467        spec.runtime_adapter = Some(runtime_adapter);
2468        spec.binary_blob_store = Some(binary_blob_store);
2469        spec
2470    }
2471}
2472
2473/// Error returned by mob runtime operations.
2474#[derive(Debug)]
2475pub enum MobRuntimeError {
2476    Mob(MobError),
2477    InvalidInput(&'static str),
2478    InvalidConfig(String),
2479}
2480
2481impl std::fmt::Display for MobRuntimeError {
2482    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2483        match self {
2484            Self::Mob(err) => write!(f, "{err}"),
2485            Self::InvalidInput(message) => write!(f, "{message}"),
2486            Self::InvalidConfig(message) => write!(f, "{message}"),
2487        }
2488    }
2489}
2490
2491impl std::error::Error for MobRuntimeError {}
2492
2493impl From<MobError> for MobRuntimeError {
2494    fn from(value: MobError) -> Self {
2495        Self::Mob(value)
2496    }
2497}
2498
2499// Mobkit's `MobMemberSnapshot`, `MobReconcileReport`, `MobReconcileOptions`
2500// wrapper types were removed as part of the meerkat 0.6 thin-shell cleanup.
2501// Consumers now use `meerkat_mob::runtime::MobMemberListEntry` and
2502// `meerkat_mob::runtime::reconcile::{ReconcileReport, ReconcileOptions,
2503// MemberFilter}` directly.
2504
2505/// Context delivered to [`SessionHook::after_create`] after a session is
2506/// successfully created.
2507#[derive(Clone, Debug)]
2508pub struct SessionCreatedContext {
2509    pub model: String,
2510    pub labels: std::collections::BTreeMap<String, String>,
2511    pub system_prompt: Option<String>,
2512}
2513
2514/// Hook trait for customising session lifecycle.
2515///
2516/// - `before_create` — runs before `create_session`. Returning `Err` aborts
2517///   session creation (both Rust-native and Python/TS boundary).
2518/// - `after_create` — runs after session creation succeeds. Best-effort: errors
2519///   are logged at `warn`, not propagated. The session is already live.
2520#[async_trait]
2521pub trait SessionHook: Send + Sync {
2522    /// Called before session creation. Mutate the request to inject tools,
2523    /// augment prompts, set labels, override model, etc. Return `Err` to
2524    /// abort session creation.
2525    async fn before_create(&self, _req: &mut CreateSessionRequest) -> Result<(), SessionError> {
2526        Ok(())
2527    }
2528
2529    /// Called after a session is successfully created. Best-effort — errors
2530    /// logged, not propagated.
2531    async fn after_create(
2532        &self,
2533        _session_id: &meerkat_core::types::SessionId,
2534        _ctx: &SessionCreatedContext,
2535    ) {
2536    }
2537}
2538
2539/// Capability flags controlling which agent capabilities are enabled.
2540#[derive(Clone, Copy, Debug)]
2541pub struct CapabilityFlags {
2542    pub builtins: bool,
2543    pub shell: bool,
2544    pub mob: bool,
2545    pub comms: bool,
2546    pub memory: bool,
2547    pub image_generation: bool,
2548}
2549
2550impl Default for CapabilityFlags {
2551    fn default() -> Self {
2552        Self {
2553            builtins: true,
2554            shell: true,
2555            mob: true,
2556            comms: true,
2557            memory: true,
2558            image_generation: false,
2559        }
2560    }
2561}
2562
2563/// Backward-compatible alias for [`MobRuntime`].
2564pub type RealMobRuntime = MobRuntime;
2565
2566/// Live mob runtime backed by a `MobHandle`.
2567#[derive(Clone)]
2568pub struct MobRuntime {
2569    handle: MobHandle,
2570    session_service: Option<Arc<dyn MobSessionService>>,
2571    agent_mob_mcp_state: Option<Arc<meerkat_mob_mcp::MobMcpState>>,
2572    implicit_delegate_retirement_overrides: Option<ImplicitDelegateRetirementOverrides>,
2573    binary_blob_store: Option<Arc<dyn BinaryBlobStore>>,
2574    baseline_member_specs: Arc<tokio::sync::RwLock<Vec<SpawnMemberSpec>>>,
2575    /// Keeps the ephemeral temp directory alive for the lifetime of the runtime.
2576    /// Dropped when the runtime is dropped, cleaning up the temp dir.
2577    _ephemeral_dir: Option<Arc<tempfile::TempDir>>,
2578}
2579
2580impl MobRuntime {
2581    pub async fn bootstrap(spec: MobBootstrapSpec) -> Result<Self, MobRuntimeError> {
2582        let ephemeral_dir = spec._ephemeral_dir.clone();
2583        let session_service = spec.session_service.clone();
2584        let binary_blob_store = spec.binary_blob_store.clone();
2585        let mob_id = spec.definition.id.clone();
2586        let agent_mob_mcp_state = spec.agent_mob_mcp_state.clone();
2587        let implicit_delegate_retirement_overrides =
2588            spec.implicit_delegate_retirement_overrides.clone();
2589        let default_llm_client = spec
2590            .options
2591            .default_llm_client
2592            .clone()
2593            .map(ReplaySanitizingLlmClient::wrap);
2594        if let Some(slot) = spec.agent_mob_default_llm_client_slot.as_ref() {
2595            *slot
2596                .write()
2597                .unwrap_or_else(std::sync::PoisonError::into_inner) = default_llm_client.clone();
2598        }
2599        let effective_runtime_adapter = spec
2600            .runtime_adapter
2601            .clone()
2602            .or_else(|| session_service.runtime_adapter());
2603
2604        let mut builder = MobBuilder::new(spec.definition, spec.storage);
2605
2606        // MobActor's autonomous readiness/comms-drain path consults the
2607        // builder-published runtime adapter directly. For session services
2608        // that already embed a runtime adapter (definition-based ephemeral
2609        // and persistent-with-runtime-backed-service), forward that adapter
2610        // explicitly so autonomous members do not come up session-backed but
2611        // runtime-unattached.
2612        if let Some(adapter) = effective_runtime_adapter {
2613            builder = builder.with_runtime_adapter(adapter);
2614        }
2615
2616        builder = builder
2617            .with_session_service(session_service.clone())
2618            .allow_ephemeral_sessions(spec.options.allow_ephemeral_sessions)
2619            .notify_orchestrator_on_resume(spec.options.notify_orchestrator_on_resume);
2620
2621        if let Some(client) = default_llm_client {
2622            builder = builder.with_default_llm_client(client);
2623        }
2624
2625        let handle = builder.create().await?;
2626        if let Some(state) = agent_mob_mcp_state.as_ref() {
2627            state.mob_insert_handle(mob_id, handle.clone()).await;
2628        }
2629        Ok(Self {
2630            handle,
2631            session_service: Some(session_service),
2632            agent_mob_mcp_state,
2633            implicit_delegate_retirement_overrides,
2634            binary_blob_store,
2635            baseline_member_specs: Arc::new(tokio::sync::RwLock::new(Vec::new())),
2636            _ephemeral_dir: ephemeral_dir,
2637        })
2638    }
2639
2640    pub fn from_handle(handle: MobHandle) -> Self {
2641        Self {
2642            handle,
2643            session_service: None,
2644            agent_mob_mcp_state: None,
2645            implicit_delegate_retirement_overrides: None,
2646            binary_blob_store: None,
2647            baseline_member_specs: Arc::new(tokio::sync::RwLock::new(Vec::new())),
2648            _ephemeral_dir: None,
2649        }
2650    }
2651
2652    pub fn handle(&self) -> MobHandle {
2653        self.handle.clone()
2654    }
2655
2656    pub fn agent_mob_mcp_state(&self) -> Option<Arc<meerkat_mob_mcp::MobMcpState>> {
2657        self.agent_mob_mcp_state.clone()
2658    }
2659
2660    pub(crate) fn implicit_delegate_retirement_overrides(
2661        &self,
2662    ) -> Option<ImplicitDelegateRetirementOverrides> {
2663        self.implicit_delegate_retirement_overrides.clone()
2664    }
2665
2666    pub async fn set_baseline_member_specs(&self, specs: Vec<SpawnMemberSpec>) {
2667        *self.baseline_member_specs.write().await = specs;
2668    }
2669
2670    pub async fn baseline_member_specs(&self) -> Vec<SpawnMemberSpec> {
2671        self.baseline_member_specs.read().await.clone()
2672    }
2673
2674    pub async fn read_session_history(
2675        &self,
2676        session_id_str: &str,
2677        offset: usize,
2678        limit: Option<usize>,
2679    ) -> Result<SessionHistoryPage, MobRuntimeError> {
2680        if session_id_str.trim().is_empty() {
2681            return Err(MobRuntimeError::InvalidInput(
2682                "session_id must not be empty",
2683            ));
2684        }
2685        let Some(session_service) = self.session_service.as_ref() else {
2686            return Err(MobRuntimeError::InvalidInput(
2687                "session history unavailable for this runtime",
2688            ));
2689        };
2690        let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2691            .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2692        SessionServiceHistoryExt::read_history(
2693            session_service.as_ref(),
2694            &session_id,
2695            SessionHistoryQuery { offset, limit },
2696        )
2697        .await
2698        .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))
2699    }
2700
2701    #[allow(dead_code)]
2702    pub(crate) async fn runtime_state_for_session(
2703        &self,
2704        session_id_str: &str,
2705    ) -> Result<Option<meerkat_runtime::RuntimeState>, MobRuntimeError> {
2706        if session_id_str.trim().is_empty() {
2707            return Err(MobRuntimeError::InvalidInput(
2708                "session_id must not be empty",
2709            ));
2710        }
2711        let Some(session_service) = self.session_service.as_ref() else {
2712            return Ok(None);
2713        };
2714        let Some(runtime_adapter) = session_service.runtime_adapter() else {
2715            return Ok(None);
2716        };
2717        let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2718            .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2719        let state = meerkat_runtime::service_ext::SessionServiceRuntimeExt::runtime_state(
2720            runtime_adapter.as_ref(),
2721            &session_id,
2722        )
2723        .await
2724        .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))?;
2725        Ok(Some(state))
2726    }
2727
2728    #[allow(dead_code)]
2729    pub(crate) async fn comms_runtime_for_session(
2730        &self,
2731        session_id_str: &str,
2732    ) -> Result<Option<Arc<dyn CommsRuntime>>, MobRuntimeError> {
2733        if session_id_str.trim().is_empty() {
2734            return Err(MobRuntimeError::InvalidInput(
2735                "session_id must not be empty",
2736            ));
2737        }
2738        let Some(session_service) = self.session_service.as_ref() else {
2739            return Ok(None);
2740        };
2741        let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2742            .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2743        Ok(
2744            meerkat_core::service::SessionServiceCommsExt::comms_runtime(
2745                session_service.as_ref(),
2746                &session_id,
2747            )
2748            .await,
2749        )
2750    }
2751
2752    #[allow(dead_code)]
2753    pub(crate) async fn active_input_ids_for_session(
2754        &self,
2755        session_id_str: &str,
2756    ) -> Result<Option<Vec<String>>, MobRuntimeError> {
2757        if session_id_str.trim().is_empty() {
2758            return Err(MobRuntimeError::InvalidInput(
2759                "session_id must not be empty",
2760            ));
2761        }
2762        let Some(session_service) = self.session_service.as_ref() else {
2763            return Ok(None);
2764        };
2765        let Some(runtime_adapter) = session_service.runtime_adapter() else {
2766            return Ok(None);
2767        };
2768        let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2769            .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2770        let input_ids = meerkat_runtime::service_ext::SessionServiceRuntimeExt::list_active_inputs(
2771            runtime_adapter.as_ref(),
2772            &session_id,
2773        )
2774        .await
2775        .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))?;
2776        Ok(Some(
2777            input_ids.into_iter().map(|id| id.to_string()).collect(),
2778        ))
2779    }
2780
2781    #[allow(dead_code)]
2782    pub(crate) async fn ensure_comms_drain_for_session(
2783        &self,
2784        session_id_str: &str,
2785    ) -> Result<Option<bool>, MobRuntimeError> {
2786        if session_id_str.trim().is_empty() {
2787            return Err(MobRuntimeError::InvalidInput(
2788                "session_id must not be empty",
2789            ));
2790        }
2791        let Some(session_service) = self.session_service.as_ref() else {
2792            return Ok(None);
2793        };
2794        let Some(runtime_adapter) = session_service.runtime_adapter() else {
2795            return Ok(None);
2796        };
2797        let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2798            .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2799        let comms_runtime = meerkat_core::service::SessionServiceCommsExt::comms_runtime(
2800            session_service.as_ref(),
2801            &session_id,
2802        )
2803        .await;
2804        if let Some(comms) = comms_runtime {
2805            let _handle = meerkat_runtime::comms_drain::spawn_comms_drain(
2806                runtime_adapter.clone(),
2807                session_id,
2808                comms,
2809                None,
2810            );
2811            Ok(Some(true))
2812        } else {
2813            Ok(Some(false))
2814        }
2815    }
2816
2817    /// Access the session service this runtime was bootstrapped with, if any.
2818    ///
2819    /// Present for `MobRuntime::bootstrap(...)`-produced runtimes; `None` for
2820    /// `MobRuntime::from_handle(...)`. HTTP handlers that need to read session
2821    /// history reach through this accessor.
2822    pub fn session_service(&self) -> Option<&Arc<dyn MobSessionService>> {
2823        self.session_service.as_ref()
2824    }
2825
2826    pub fn binary_blob_store(&self) -> Option<Arc<dyn BinaryBlobStore>> {
2827        self.binary_blob_store.clone()
2828    }
2829}
2830
2831/// Project a meerkat `MobMemberListEntry` into mobkit's HTTP JSON shape.
2832///
2833/// Aligns with meerkat 0.6's lightweight-roster design: list entries do
2834/// not carry a bridge `session_id`. Callers needing the realtime session
2835/// for a member must use `mobkit/member_status`, which serializes
2836/// `MobMemberSnapshot.current_session_id` natively.
2837pub fn member_entry_to_json(entry: &meerkat_mob::runtime::MobMemberListEntry) -> serde_json::Value {
2838    serde_json::to_value(entry).unwrap_or(serde_json::Value::Null)
2839}
2840
2841pub fn content_input_has_images(content: &meerkat_core::ContentInput) -> bool {
2842    match content {
2843        meerkat_core::ContentInput::Text(_) => false,
2844        meerkat_core::ContentInput::Blocks(blocks) => blocks
2845            .iter()
2846            .any(|block| matches!(block, meerkat_core::ContentBlock::Image { .. })),
2847    }
2848}
2849
2850pub fn model_capabilities_for_model(
2851    provider: Provider,
2852    model: &str,
2853) -> crate::runtime::ConsoleModelCapabilities {
2854    let image_input = meerkat_core::model_profile::profile_for(provider, model)
2855        .map(|profile| profile.vision)
2856        .unwrap_or(false);
2857    crate::runtime::ConsoleModelCapabilities { image_input }
2858}
2859
2860pub fn model_capabilities_for_profile(
2861    profile: &Profile,
2862) -> crate::runtime::ConsoleModelCapabilities {
2863    let image_input = Provider::infer_from_model(&profile.model)
2864        .and_then(|provider| meerkat_core::model_profile::profile_for(provider, &profile.model))
2865        .map(|profile| profile.vision)
2866        .unwrap_or(false);
2867    crate::runtime::ConsoleModelCapabilities { image_input }
2868}
2869
2870pub fn model_capabilities_for_role(
2871    definition: &MobDefinition,
2872    role: &str,
2873) -> crate::runtime::ConsoleModelCapabilities {
2874    let profile_name = ProfileName::from(role);
2875    definition
2876        .resolve_inline_profile(&profile_name)
2877        .map(model_capabilities_for_profile)
2878        .unwrap_or(crate::runtime::ConsoleModelCapabilities { image_input: false })
2879}
2880
2881pub fn model_capabilities_for_member_entry(
2882    definition: &MobDefinition,
2883    entry: &meerkat_mob::runtime::MobMemberListEntry,
2884) -> crate::runtime::ConsoleModelCapabilities {
2885    model_capabilities_for_role(definition, entry.role.as_str())
2886}
2887
2888pub async fn model_capabilities_for_member(
2889    handle: &MobHandle,
2890    session_service: Option<&Arc<dyn MobSessionService>>,
2891    member_id: &meerkat_mob::ids::MeerkatId,
2892) -> crate::runtime::ConsoleModelCapabilities {
2893    if let Some(service) = session_service
2894        && let Some(session_id) = handle.resolve_bridge_session_id(member_id).await
2895        && let Ok(view) = service.read(&session_id).await
2896    {
2897        return model_capabilities_for_model(view.state.provider, &view.state.model);
2898    }
2899
2900    handle
2901        .get_member(member_id)
2902        .await
2903        .map(|member| model_capabilities_for_role(handle.definition(), member.role.as_str()))
2904        .unwrap_or(crate::runtime::ConsoleModelCapabilities { image_input: false })
2905}
2906
2907pub async fn assert_member_accepts_images(
2908    handle: &MobHandle,
2909    session_service: Option<&Arc<dyn MobSessionService>>,
2910    member_id: &str,
2911    content: &meerkat_core::ContentInput,
2912) -> Result<(), MobRuntimeError> {
2913    if !content_input_has_images(content) {
2914        return Ok(());
2915    }
2916    let mid = meerkat_mob::ids::MeerkatId::from(member_id);
2917    let Some(member) = handle.get_member(&mid).await else {
2918        return Err(MobRuntimeError::InvalidInput("member not found"));
2919    };
2920    let caps = model_capabilities_for_member(handle, session_service, &member.agent_identity).await;
2921    if caps.image_input {
2922        Ok(())
2923    } else {
2924        Err(MobRuntimeError::InvalidInput(
2925            "target member model cannot accept image input",
2926        ))
2927    }
2928}
2929
2930/// Send content to a mob member and return the bridge session id that
2931/// accepted the injection.
2932///
2933/// Validates that `member_id` and `content` are non-empty, calls
2934/// `handle.member(&id).send(...)`, then queries the mob handle for the
2935/// currently-bound bridge session id. Meerkat 0.6 removed `session_id` from
2936/// `MemberDeliveryReceipt`; this helper is mobkit's glue for the
2937/// send-and-learn-what-session-took-it pattern used by HTTP/RPC handlers and
2938/// the scheduled-dispatch injection path.
2939pub async fn send_message_on_mob(
2940    handle: &MobHandle,
2941    member_id: &str,
2942    content: impl Into<meerkat_core::ContentInput>,
2943) -> Result<String, MobRuntimeError> {
2944    send_message_on_mob_with_mode(
2945        handle,
2946        member_id,
2947        content,
2948        meerkat_core::types::HandlingMode::Queue,
2949    )
2950    .await
2951}
2952
2953/// Variant that accepts the console's `Queue`/`Steer` wire contract while
2954/// delivering through MobKit's direct member-send path.
2955pub async fn send_message_on_mob_with_mode(
2956    handle: &MobHandle,
2957    member_id: &str,
2958    content: impl Into<meerkat_core::ContentInput>,
2959    handling_mode: meerkat_core::types::HandlingMode,
2960) -> Result<String, MobRuntimeError> {
2961    if member_id.trim().is_empty() {
2962        return Err(MobRuntimeError::InvalidInput("member_id must not be empty"));
2963    }
2964    let content = content.into();
2965    let is_empty = match &content {
2966        meerkat_core::ContentInput::Text(s) => s.trim().is_empty(),
2967        meerkat_core::ContentInput::Blocks(blocks) => blocks.is_empty(),
2968    };
2969    if is_empty {
2970        return Err(MobRuntimeError::InvalidInput("content must not be empty"));
2971    }
2972    let mid = meerkat_mob::ids::MeerkatId::from(member_id);
2973    let _receipt = handle
2974        .member(&mid)
2975        .await?
2976        .send(content, handling_mode)
2977        .await?;
2978    let session_id = handle
2979        .resolve_bridge_session_id(&mid)
2980        .await
2981        .ok_or_else(|| {
2982            MobRuntimeError::Mob(MobError::Internal(
2983                "member has no bridge session after send".to_string(),
2984            ))
2985        })?;
2986    Ok(session_id.to_string())
2987}
2988
2989#[cfg(test)]
2990#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2991mod tests {
2992    use super::*;
2993
2994    struct EmptyDispatcher;
2995
2996    #[async_trait::async_trait]
2997    impl meerkat_core::AgentToolDispatcher for EmptyDispatcher {
2998        fn tools(&self) -> Arc<[Arc<meerkat_core::types::ToolDef>]> {
2999            Vec::<Arc<meerkat_core::types::ToolDef>>::new().into()
3000        }
3001
3002        async fn dispatch(
3003            &self,
3004            call: meerkat_core::types::ToolCallView<'_>,
3005        ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
3006            Err(meerkat_core::ToolError::not_found(call.name))
3007        }
3008
3009        fn capabilities(&self) -> meerkat_core::agent::DispatcherCapabilities {
3010            meerkat_core::agent::DispatcherCapabilities::default()
3011        }
3012    }
3013
3014    fn wrapper_with_overrides(
3015        overrides: ImplicitDelegateRetirementOverrides,
3016    ) -> AutoWireParentMobToolDispatcher {
3017        AutoWireParentMobToolDispatcher {
3018            inner: Arc::new(EmptyDispatcher),
3019            implicit_delegate_retirement_overrides: overrides,
3020        }
3021    }
3022
3023    #[test]
3024    fn delegate_tool_schema_exposes_idle_retire_secs() {
3025        let tool = meerkat_core::types::ToolDef::new(
3026            "delegate",
3027            "Delegate work",
3028            serde_json::json!({
3029                "type": "object",
3030                "properties": {
3031                    "task": {"type": "string"}
3032                },
3033                "required": ["task"]
3034            }),
3035        );
3036
3037        let patched = delegate_tool_def_with_idle_retire_secs(&tool);
3038        let idle_retire_secs = &patched.input_schema["properties"]["idle_retire_secs"];
3039
3040        assert!(patched.description.contains("IDLE RETIREMENT:"));
3041        assert_eq!(idle_retire_secs["anyOf"][0]["type"], "integer");
3042        assert_eq!(idle_retire_secs["anyOf"][0]["minimum"], 0);
3043        assert_eq!(idle_retire_secs["anyOf"][1]["type"], "null");
3044    }
3045
3046    #[test]
3047    fn mob_spawn_tool_schema_exposes_opt_in_idle_retire_secs() {
3048        let tool = meerkat_core::types::ToolDef::new(
3049            "mob_spawn_member",
3050            "Spawn member",
3051            serde_json::json!({
3052                "type": "object",
3053                "properties": {
3054                    "profile": {"type": "string"},
3055                    "member_id": {"type": "string"}
3056                },
3057                "required": ["profile", "member_id"]
3058            }),
3059        );
3060
3061        let patched = mob_spawn_tool_def_with_idle_retire_secs(&tool);
3062        let idle_retire_secs = &patched.input_schema["properties"]["idle_retire_secs"];
3063
3064        assert!(
3065            patched
3066                .description
3067                .contains("Omit idle_retire_secs to leave this spawned member out")
3068        );
3069        assert_eq!(idle_retire_secs["anyOf"][0]["type"], "integer");
3070        assert_eq!(idle_retire_secs["anyOf"][0]["minimum"], 0);
3071        assert_eq!(idle_retire_secs["anyOf"][1]["type"], "null");
3072    }
3073
3074    #[tokio::test]
3075    async fn auto_wire_wrapper_preserves_ops_lifecycle_binding() {
3076        use meerkat_core::AgentToolDispatcher;
3077        use std::sync::atomic::{AtomicBool, Ordering};
3078
3079        struct BindAwareDispatcher {
3080            bound: Arc<AtomicBool>,
3081        }
3082
3083        #[async_trait::async_trait]
3084        impl meerkat_core::AgentToolDispatcher for BindAwareDispatcher {
3085            fn tools(&self) -> Arc<[Arc<meerkat_core::types::ToolDef>]> {
3086                Vec::<Arc<meerkat_core::types::ToolDef>>::new().into()
3087            }
3088
3089            async fn dispatch(
3090                &self,
3091                call: meerkat_core::types::ToolCallView<'_>,
3092            ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
3093                Err(meerkat_core::ToolError::not_found(call.name))
3094            }
3095
3096            fn capabilities(&self) -> meerkat_core::agent::DispatcherCapabilities {
3097                meerkat_core::agent::DispatcherCapabilities {
3098                    ops_lifecycle: true,
3099                }
3100            }
3101
3102            fn bind_ops_lifecycle(
3103                self: Arc<Self>,
3104                _registry: Arc<dyn meerkat_core::ops_lifecycle::OpsLifecycleRegistry>,
3105                _owner_bridge_session_id: meerkat_core::types::SessionId,
3106            ) -> Result<meerkat_core::agent::BindOutcome, meerkat_core::agent::OpsLifecycleBindError>
3107            {
3108                self.bound.store(true, Ordering::SeqCst);
3109                Ok(meerkat_core::agent::BindOutcome::Bound(self))
3110            }
3111        }
3112
3113        let bound = Arc::new(AtomicBool::new(false));
3114        let dispatcher = Arc::new(AutoWireParentMobToolDispatcher {
3115            inner: Arc::new(BindAwareDispatcher {
3116                bound: Arc::clone(&bound),
3117            }),
3118            implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides::default(),
3119        });
3120
3121        assert!(dispatcher.capabilities().ops_lifecycle);
3122        let outcome = dispatcher
3123            .bind_ops_lifecycle(
3124                Arc::new(meerkat_runtime::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
3125                meerkat_core::types::SessionId::new(),
3126            )
3127            .expect("wrapper should delegate ops lifecycle binding");
3128
3129        assert!(outcome.was_bound());
3130        assert!(bound.load(Ordering::SeqCst));
3131        assert!(outcome.into_dispatcher().capabilities().ops_lifecycle);
3132    }
3133
3134    #[test]
3135    fn delegate_idle_retire_secs_arg_is_stripped_and_parsed() {
3136        let mut args = serde_json::json!({
3137            "task": "inspect",
3138            "idle_retire_secs": 42
3139        });
3140
3141        let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
3142            .expect("valid idle retire arg");
3143
3144        assert_eq!(parsed, Some(DelegateIdleRetireOverride::Seconds(42)));
3145        assert!(args.get("idle_retire_secs").is_none());
3146    }
3147
3148    #[test]
3149    fn delegate_idle_retire_secs_null_disables_member_retirement() {
3150        let mut args = serde_json::json!({
3151            "task": "inspect",
3152            "idle_retire_secs": null
3153        });
3154
3155        let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
3156            .expect("valid idle retire arg");
3157
3158        assert_eq!(parsed, Some(DelegateIdleRetireOverride::Disabled));
3159        assert!(args.get("idle_retire_secs").is_none());
3160    }
3161
3162    #[test]
3163    fn delegate_idle_retire_secs_omitted_inherits_runtime_default() {
3164        let mut args = serde_json::json!({"task": "inspect"});
3165
3166        let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
3167            .expect("omitted idle retire arg");
3168
3169        assert_eq!(parsed, None);
3170        assert_eq!(args, serde_json::json!({"task": "inspect"}));
3171    }
3172
3173    #[test]
3174    fn delegate_idle_retire_secs_rejects_negative_or_fractional_values() {
3175        let mut negative = serde_json::json!({"task": "inspect", "idle_retire_secs": -1});
3176        let mut fractional = serde_json::json!({"task": "inspect", "idle_retire_secs": 1.5});
3177
3178        assert!(delegate_idle_retire_override_from_args("delegate", &mut negative).is_err());
3179        assert!(delegate_idle_retire_override_from_args("delegate", &mut fractional).is_err());
3180    }
3181
3182    #[test]
3183    fn mob_spawn_idle_retire_targets_use_args_when_result_omits_mob_id() {
3184        let args = serde_json::json!({
3185            "mob_id": "ob3",
3186            "profile": "review-worker",
3187            "member_id": "review-worker-vibe-forward",
3188        });
3189        let fallback_targets = idle_retire_targets_from_spawn_args(&args);
3190
3191        assert_eq!(
3192            fallback_targets,
3193            vec![IdleRetireTarget {
3194                mob_id: "ob3".to_string(),
3195                member_id: "review-worker-vibe-forward".to_string(),
3196            }]
3197        );
3198        assert_eq!(
3199            idle_retire_targets_from_outcome_text(
3200                r#"{"agent_identity":"review-worker-vibe-forward","member_ref":"opaque"}"#,
3201                &fallback_targets,
3202            ),
3203            fallback_targets
3204        );
3205    }
3206
3207    #[test]
3208    fn mob_spawn_idle_retire_targets_support_canonical_specs_shape() {
3209        let args = serde_json::json!({
3210            "mob_id": "ob3",
3211            "specs": [
3212                {"profile": "person-worker", "agent_identity": "person-worker-a"},
3213                {"profile": "person-worker", "member_id": "person-worker-b", "mob_id": "other"}
3214            ]
3215        });
3216        let fallback_targets = idle_retire_targets_from_spawn_args(&args);
3217
3218        assert_eq!(
3219            fallback_targets,
3220            vec![
3221                IdleRetireTarget {
3222                    mob_id: "ob3".to_string(),
3223                    member_id: "person-worker-a".to_string(),
3224                },
3225                IdleRetireTarget {
3226                    mob_id: "other".to_string(),
3227                    member_id: "person-worker-b".to_string(),
3228                },
3229            ]
3230        );
3231        assert_eq!(
3232            idle_retire_targets_from_outcome_text(
3233                r#"{"members":[{"agent_identity":"person-worker-a"},{"agent_identity":"person-worker-b","mob_id":"other"}]}"#,
3234                &fallback_targets,
3235            ),
3236            fallback_targets
3237        );
3238    }
3239
3240    #[tokio::test]
3241    async fn implicit_delegate_retirement_overrides_round_trip_per_member() {
3242        let overrides = ImplicitDelegateRetirementOverrides::default();
3243
3244        overrides
3245            .set("mob-a", "worker-1", DelegateIdleRetireOverride::Seconds(12))
3246            .await;
3247        overrides
3248            .set("mob-a", "worker-2", DelegateIdleRetireOverride::Disabled)
3249            .await;
3250
3251        assert_eq!(
3252            overrides.get("mob-a", "worker-1").await,
3253            Some(DelegateIdleRetireOverride::Seconds(12))
3254        );
3255        assert_eq!(
3256            overrides.get("mob-a", "worker-2").await,
3257            Some(DelegateIdleRetireOverride::Disabled)
3258        );
3259        assert_eq!(overrides.get("mob-a", "worker-3").await, None);
3260    }
3261
3262    #[tokio::test]
3263    async fn mob_spawn_idle_retire_registration_uses_spawn_args_when_result_omits_mob_id() {
3264        let overrides = ImplicitDelegateRetirementOverrides::default();
3265        let dispatcher = wrapper_with_overrides(overrides.clone());
3266        let fallback_targets = idle_retire_targets_from_spawn_args(&serde_json::json!({
3267            "mob_id": "ob3",
3268            "member_id": "review-worker-vibe-forward",
3269        }));
3270        let outcome =
3271            meerkat_core::ToolDispatchOutcome::sync_result(meerkat_core::types::ToolResult::new(
3272                "spawn-1".to_string(),
3273                r#"{"agent_identity":"review-worker-vibe-forward","member_ref":"opaque"}"#
3274                    .to_string(),
3275                false,
3276            ));
3277
3278        dispatcher
3279            .register_idle_retire_override_from_outcome(
3280                &outcome,
3281                Some(DelegateIdleRetireOverride::Seconds(900)),
3282                &fallback_targets,
3283            )
3284            .await;
3285
3286        assert_eq!(
3287            overrides.get("ob3", "review-worker-vibe-forward").await,
3288            Some(DelegateIdleRetireOverride::Seconds(900))
3289        );
3290    }
3291
3292    #[test]
3293    fn image_generation_substrate_defaults_off_for_inline_profiles() {
3294        let definition = meerkat_mob::MobDefinition::from_toml(
3295            r#"
3296[mob]
3297id = "test"
3298
3299[profiles.worker]
3300model = "gpt-5.5"
3301
3302[profiles.worker.tools]
3303builtins = true
3304"#,
3305        )
3306        .unwrap_or_else(|e| panic!("{e}"));
3307
3308        assert!(
3309            !mob_definition_may_use_image_generation(&definition),
3310            "inline profiles should not wire the image substrate unless a profile opts in"
3311        );
3312    }
3313
3314    #[test]
3315    fn image_generation_substrate_follows_profile_tool_config() {
3316        let definition = meerkat_mob::MobDefinition::from_toml(
3317            r#"
3318[mob]
3319id = "test"
3320
3321[profiles.commander]
3322model = "gpt-5.5"
3323
3324[profiles.commander.tools]
3325builtins = true
3326image_generation = true
3327
3328[profiles.investigator]
3329model = "gpt-5.5"
3330
3331[profiles.investigator.tools]
3332builtins = true
3333image_generation = false
3334"#,
3335        )
3336        .unwrap_or_else(|e| panic!("{e}"));
3337
3338        let commander = definition.profiles["commander"].as_inline().unwrap();
3339        let investigator = definition.profiles["investigator"].as_inline().unwrap();
3340        assert!(commander.tools.image_generation);
3341        assert!(!investigator.tools.image_generation);
3342        assert!(
3343            mob_definition_may_use_image_generation(&definition),
3344            "one opt-in profile is enough to wire substrate; Meerkat gates visibility per profile"
3345        );
3346    }
3347
3348    #[test]
3349    fn image_generation_profiles_can_disable_builtins_with_meerkat_062() {
3350        let definition = meerkat_mob::MobDefinition::from_toml(
3351            r#"
3352[mob]
3353id = "test"
3354
3355[profiles.commander]
3356model = "gpt-5.5"
3357
3358[profiles.commander.tools]
3359builtins = false
3360image_generation = true
3361"#,
3362        )
3363        .unwrap_or_else(|e| panic!("{e}"));
3364
3365        let commander = definition.profiles["commander"].as_inline().unwrap();
3366        assert!(!commander.tools.builtins);
3367        assert!(commander.tools.image_generation);
3368        assert!(
3369            mob_definition_may_use_image_generation(&definition),
3370            "image generation now has its own Meerkat tool gate"
3371        );
3372    }
3373
3374    #[test]
3375    fn image_generation_substrate_is_conservative_for_realm_profile_refs() {
3376        let definition = meerkat_mob::MobDefinition::from_toml(
3377            r#"
3378[mob]
3379id = "test"
3380
3381[profiles.worker]
3382realm_profile = "worker-v2"
3383"#,
3384        )
3385        .unwrap_or_else(|e| panic!("{e}"));
3386
3387        assert!(
3388            mob_definition_may_use_image_generation(&definition),
3389            "realm profiles resolve at spawn time, so MobKit wires substrate and lets Meerkat enforce profile policy"
3390        );
3391    }
3392
3393    #[test]
3394    fn sanitize_llm_request_drops_replay_unsafe_server_tool_blocks() {
3395        let request = meerkat_client::LlmRequest::new(
3396            "gpt-5.5",
3397            vec![meerkat_core::Message::BlockAssistant(
3398                meerkat_core::BlockAssistantMessage::new(
3399                    vec![
3400                        meerkat_core::AssistantBlock::Text {
3401                            text: "done".to_string(),
3402                            meta: None,
3403                        },
3404                        meerkat_core::AssistantBlock::ServerToolContent {
3405                            id: Some("ws-stream".to_string()),
3406                            name: "web_search".to_string(),
3407                            content: serde_json::json!({
3408                                "type": "response.web_search_call.searching",
3409                                "item_id": "ws_123"
3410                            }),
3411                            meta: None,
3412                        },
3413                        meerkat_core::AssistantBlock::ServerToolContent {
3414                            id: Some("ws_123".to_string()),
3415                            name: "web_search_call".to_string(),
3416                            content: serde_json::json!({
3417                                "type": "web_search_call",
3418                                "id": "ws_123",
3419                                "status": "completed"
3420                            }),
3421                            meta: None,
3422                        },
3423                        meerkat_core::AssistantBlock::ServerToolContent {
3424                            id: None,
3425                            name: "web_search_annotations".to_string(),
3426                            content: serde_json::json!({
3427                                "type": "message_annotations",
3428                                "annotations": []
3429                            }),
3430                            meta: None,
3431                        },
3432                    ],
3433                    meerkat_core::StopReason::EndTurn,
3434                ),
3435            )],
3436        );
3437
3438        let sanitized = sanitize_llm_request_for_stateless_replay(&request);
3439        let meerkat_core::Message::BlockAssistant(assistant) = &sanitized.messages[0] else {
3440            panic!("expected block assistant");
3441        };
3442
3443        assert_eq!(assistant.blocks.len(), 2);
3444        assert!(matches!(
3445            assistant.blocks[0],
3446            meerkat_core::AssistantBlock::Text { .. }
3447        ));
3448        assert!(matches!(
3449            assistant.blocks[1],
3450            meerkat_core::AssistantBlock::ServerToolContent { ref name, .. }
3451                if name == "web_search_call"
3452        ));
3453    }
3454
3455    #[test]
3456    fn sanitize_llm_request_preserves_generated_images_for_meerkat_062() {
3457        let request = meerkat_client::LlmRequest::new(
3458            "gpt-5.5",
3459            vec![meerkat_core::Message::BlockAssistant(
3460                meerkat_core::BlockAssistantMessage::new(
3461                    vec![
3462                        meerkat_core::AssistantBlock::Text {
3463                            text: "visible".to_string(),
3464                            meta: None,
3465                        },
3466                        generated_image_block_for_test(),
3467                    ],
3468                    meerkat_core::StopReason::EndTurn,
3469                ),
3470            )],
3471        );
3472
3473        let sanitized = sanitize_llm_request_for_stateless_replay(&request);
3474
3475        let meerkat_core::Message::BlockAssistant(original_assistant) = &request.messages[0] else {
3476            panic!("expected original block assistant");
3477        };
3478        assert!(
3479            original_assistant
3480                .blocks
3481                .iter()
3482                .any(|block| matches!(block, meerkat_core::AssistantBlock::Image { .. })),
3483            "request-view sanitization must not rewrite canonical caller-owned messages"
3484        );
3485
3486        let meerkat_core::Message::BlockAssistant(sanitized_assistant) = &sanitized.messages[0]
3487        else {
3488            panic!("expected sanitized block assistant");
3489        };
3490        assert!(
3491            sanitized_assistant
3492                .blocks
3493                .iter()
3494                .any(|block| matches!(block, meerkat_core::AssistantBlock::Image { .. })),
3495            "Meerkat 0.6.2 owns provider replay projection for generated images"
3496        );
3497    }
3498
3499    #[derive(Default)]
3500    struct CapturingLlmClient {
3501        projected_messages: std::sync::Mutex<Vec<meerkat_core::Message>>,
3502    }
3503
3504    #[async_trait]
3505    impl LlmClient for CapturingLlmClient {
3506        fn project_replay_messages(
3507            &self,
3508            messages: &[meerkat_core::Message],
3509        ) -> Result<Vec<meerkat_core::Message>, meerkat_client::LlmError> {
3510            *self
3511                .projected_messages
3512                .lock()
3513                .unwrap_or_else(std::sync::PoisonError::into_inner) = messages.to_vec();
3514            Ok(messages.to_vec())
3515        }
3516
3517        fn stream<'a>(&'a self, _request: &'a LlmRequest) -> LlmStream<'a> {
3518            Box::pin(futures::stream::iter([Ok(
3519                meerkat_client::LlmEvent::Done {
3520                    outcome: meerkat_client::LlmDoneOutcome::Success {
3521                        stop_reason: meerkat_core::StopReason::EndTurn,
3522                    },
3523                },
3524            )]))
3525        }
3526
3527        fn provider(&self) -> &'static str {
3528            "openai"
3529        }
3530
3531        async fn health_check(&self) -> Result<(), meerkat_client::LlmError> {
3532            Ok(())
3533        }
3534    }
3535
3536    #[test]
3537    fn replay_sanitizing_llm_client_delegates_provider_projection() {
3538        let capture = Arc::new(CapturingLlmClient::default());
3539        let inner: Arc<dyn LlmClient> = capture.clone();
3540        let wrapped = ReplaySanitizingLlmClient::new(inner);
3541        let messages = vec![meerkat_core::Message::BlockAssistant(
3542            meerkat_core::BlockAssistantMessage::new(
3543                vec![
3544                    meerkat_core::AssistantBlock::Text {
3545                        text: "visible".to_string(),
3546                        meta: None,
3547                    },
3548                    meerkat_core::AssistantBlock::ServerToolContent {
3549                        id: Some("ws-stream".to_string()),
3550                        name: "web_search".to_string(),
3551                        content: serde_json::json!({
3552                            "type": "response.web_search_call.searching",
3553                            "item_id": "ws_123"
3554                        }),
3555                        meta: None,
3556                    },
3557                ],
3558                meerkat_core::StopReason::EndTurn,
3559            ),
3560        )];
3561
3562        let projected = wrapped
3563            .project_replay_messages(&messages)
3564            .expect("wrapped client should delegate provider projection");
3565
3566        let seen = capture
3567            .projected_messages
3568            .lock()
3569            .unwrap_or_else(std::sync::PoisonError::into_inner)
3570            .clone();
3571        let meerkat_core::Message::BlockAssistant(assistant) = &seen[0] else {
3572            panic!("expected block assistant");
3573        };
3574        assert_eq!(
3575            assistant.blocks.len(),
3576            1,
3577            "MobKit sanitization must happen before Meerkat provider projection"
3578        );
3579        assert!(matches!(
3580            assistant.blocks[0],
3581            meerkat_core::AssistantBlock::Text { .. }
3582        ));
3583        assert_eq!(
3584            serde_json::to_value(&projected).expect("projected messages serialize"),
3585            serde_json::to_value(&seen).expect("seen messages serialize")
3586        );
3587    }
3588
3589    #[derive(Default)]
3590    struct CapturingAgentLlmClient {
3591        seen_messages: std::sync::Mutex<Vec<meerkat_core::Message>>,
3592    }
3593
3594    #[async_trait]
3595    impl meerkat_core::AgentLlmClient for CapturingAgentLlmClient {
3596        async fn stream_response(
3597            &self,
3598            messages: &[meerkat_core::Message],
3599            _tools: &[Arc<meerkat_core::ToolDef>],
3600            _max_tokens: u32,
3601            _temperature: Option<f32>,
3602            _provider_params: Option<
3603                &meerkat_core::lifecycle::run_primitive::ProviderParamsOverride,
3604            >,
3605        ) -> Result<meerkat_core::agent::LlmStreamResult, meerkat_core::AgentError> {
3606            *self
3607                .seen_messages
3608                .lock()
3609                .unwrap_or_else(std::sync::PoisonError::into_inner) = messages.to_vec();
3610            Ok(meerkat_core::agent::LlmStreamResult::new(
3611                Vec::new(),
3612                meerkat_core::StopReason::EndTurn,
3613                meerkat_core::Usage::default(),
3614            ))
3615        }
3616
3617        fn provider(&self) -> &'static str {
3618            "openai"
3619        }
3620
3621        fn model(&self) -> &'static str {
3622            "gpt-5.5"
3623        }
3624    }
3625
3626    #[tokio::test]
3627    async fn sanitize_agent_llm_client_drops_replay_unsafe_server_tool_blocks() {
3628        let capture = Arc::new(CapturingAgentLlmClient::default());
3629        let inner: Arc<dyn meerkat_core::AgentLlmClient> = capture.clone();
3630        let wrapped = ReplaySanitizingAgentLlmClient::wrap(inner);
3631        let messages = vec![meerkat_core::Message::BlockAssistant(
3632            meerkat_core::BlockAssistantMessage::new(
3633                vec![
3634                    meerkat_core::AssistantBlock::Text {
3635                        text: "visible".to_string(),
3636                        meta: None,
3637                    },
3638                    meerkat_core::AssistantBlock::ServerToolContent {
3639                        id: Some("ws-stream".to_string()),
3640                        name: "web_search".to_string(),
3641                        content: serde_json::json!({
3642                            "type": "response.web_search_call.searching",
3643                            "item_id": "ws_123"
3644                        }),
3645                        meta: None,
3646                    },
3647                    meerkat_core::AssistantBlock::ServerToolContent {
3648                        id: Some("ok".to_string()),
3649                        name: "web_search_call".to_string(),
3650                        content: serde_json::json!({
3651                            "type": "web_search_call",
3652                            "id": "ws_123",
3653                            "status": "completed"
3654                        }),
3655                        meta: None,
3656                    },
3657                ],
3658                meerkat_core::StopReason::EndTurn,
3659            ),
3660        )];
3661        let tools: Vec<Arc<meerkat_core::ToolDef>> = Vec::new();
3662
3663        wrapped
3664            .stream_response(&messages, &tools, 512, None, None)
3665            .await
3666            .expect("wrapped client should delegate");
3667
3668        let seen = capture
3669            .seen_messages
3670            .lock()
3671            .unwrap_or_else(std::sync::PoisonError::into_inner)
3672            .clone();
3673        let meerkat_core::Message::BlockAssistant(assistant) = &seen[0] else {
3674            panic!("expected block assistant");
3675        };
3676        assert_eq!(assistant.blocks.len(), 2);
3677        assert!(matches!(
3678            assistant.blocks[0],
3679            meerkat_core::AssistantBlock::Text { .. }
3680        ));
3681        assert!(matches!(
3682            assistant.blocks[1],
3683            meerkat_core::AssistantBlock::ServerToolContent { ref name, .. }
3684                if name == "web_search_call"
3685        ));
3686    }
3687
3688    fn generated_image_block_for_test() -> meerkat_core::AssistantBlock {
3689        serde_json::from_value(serde_json::json!({
3690            "block_type": "image",
3691            "data": {
3692                "image_id": "00000000-0000-0000-0000-000000000051",
3693                "blob_ref": {
3694                    "blob_id": "sha256:test-generated-image",
3695                    "media_type": "image/png"
3696                },
3697                "media_type": "image/png",
3698                "width": 1024,
3699                "height": 1024,
3700                "revised_prompt": { "disposition": "not_requested" },
3701                "meta": { "provider": "not_emitted" }
3702            }
3703        }))
3704        .expect("test image block should deserialize")
3705    }
3706
3707    #[test]
3708    fn sanitize_message_preserves_assistant_image_blocks() {
3709        let message =
3710            meerkat_core::Message::BlockAssistant(meerkat_core::BlockAssistantMessage::new(
3711                vec![
3712                    meerkat_core::AssistantBlock::Text {
3713                        text: "Here is the image.".to_string(),
3714                        meta: None,
3715                    },
3716                    generated_image_block_for_test(),
3717                ],
3718                meerkat_core::StopReason::EndTurn,
3719            ));
3720
3721        let sanitized = sanitize_message_for_stateless_replay(message);
3722        let meerkat_core::Message::BlockAssistant(assistant) = sanitized else {
3723            panic!("expected block assistant");
3724        };
3725
3726        assert_eq!(assistant.blocks.len(), 2);
3727        assert!(matches!(
3728            assistant.blocks[0],
3729            meerkat_core::AssistantBlock::Text { .. }
3730        ));
3731        assert!(
3732            matches!(
3733                assistant.blocks[1],
3734                meerkat_core::AssistantBlock::Image { .. }
3735            ),
3736            "generated image blocks should reach Meerkat's provider projection"
3737        );
3738    }
3739
3740    /// Verify that persistent_with_hook wraps the session service with
3741    /// PreBuildMobSessionService (hook is Some).
3742    #[test]
3743    fn persistent_with_hook_wraps_session_service() {
3744        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3745        let store_path = dir.path().to_path_buf();
3746        let Ok(sqlite) = meerkat_store::SqliteSessionStore::open(store_path.join("sessions.db"))
3747        else {
3748            panic!("failed to open sqlite session store");
3749        };
3750        let session_store: Arc<dyn SessionStore> = Arc::new(sqlite);
3751        let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
3752            panic!("failed to parse minimal mob definition");
3753        };
3754
3755        let hook_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
3756        let hook_called_clone = hook_called.clone();
3757
3758        let spec = MobBootstrapSpec::persistent_with_hook(
3759            definition,
3760            meerkat_mob::MobStorage::in_memory(),
3761            store_path.clone(),
3762            4,
3763            session_store,
3764            move |_req: &mut CreateSessionRequest| {
3765                hook_called_clone.store(true, std::sync::atomic::Ordering::Relaxed);
3766                Box::pin(async { Ok(()) })
3767            },
3768        );
3769
3770        // The session service is wired with a SqliteRuntimeStore so that
3771        // both `load_persisted_session` (resume) and
3772        // `load_persisted_session_for_control` (archive/retire) succeed
3773        // across process restart. spec.runtime_adapter is also set
3774        // explicitly so the bootstrap path uses the same store. See
3775        // `persistent_bootstrap_uses_sqlite_runtime_store` for the full
3776        // regression coverage.
3777        assert!(
3778            spec.runtime_adapter.is_some(),
3779            "persistent_with_hook must provide a runtime adapter via spec.runtime_adapter"
3780        );
3781        assert!(
3782            spec.session_service.runtime_adapter().is_some(),
3783            "session service must own a runtime_store so archive/retire don't \
3784             hit the store-only-projection rejection in meerkat-session"
3785        );
3786        assert!(
3787            store_path.join("runtime.sqlite").exists(),
3788            "persistent_inner must open a SqliteRuntimeStore at <store_path>/runtime.sqlite"
3789        );
3790
3791        // The hook isn't called until create_session — verify the wrapper exists
3792        // by checking the service is not the raw PersistentSessionService (it
3793        // wraps it). We can't call create_session without a full LLM stack, but
3794        // we can verify the hook_called flag is false (not prematurely invoked).
3795        assert!(
3796            !hook_called.load(std::sync::atomic::Ordering::Relaxed),
3797            "hook must not be called before create_session"
3798        );
3799    }
3800
3801    /// Verify that ephemeral_with_hook accepts and stores a hook.
3802    #[test]
3803    fn ephemeral_with_hook_creates_spec() {
3804        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3805        let store_path = dir.path().to_path_buf();
3806        let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
3807            panic!("failed to parse minimal mob definition");
3808        };
3809
3810        let hook_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
3811        let hook_called_clone = hook_called.clone();
3812
3813        let spec = MobBootstrapSpec::ephemeral_with_hook(
3814            definition,
3815            meerkat_mob::MobStorage::in_memory(),
3816            store_path,
3817            4,
3818            None,
3819            move |_req: &mut CreateSessionRequest| {
3820                hook_called_clone.store(true, std::sync::atomic::Ordering::Relaxed);
3821                Box::pin(async { Ok(()) })
3822            },
3823        );
3824
3825        // Ephemeral specs don't have a runtime adapter.
3826        assert!(spec.runtime_adapter.is_none());
3827
3828        // Hook not yet called.
3829        assert!(
3830            !hook_called.load(std::sync::atomic::Ordering::Relaxed),
3831            "hook must not be called before create_session"
3832        );
3833    }
3834
3835    /// Verify that PreBuildMobSessionService applies the hook to the request
3836    /// in create_session. The hook mutates the model and adds labels; we
3837    /// verify by capturing the state inside the hook itself.
3838    #[tokio::test]
3839    async fn pre_build_hook_mutates_create_session_request() {
3840        use std::sync::Mutex;
3841
3842        let captured = Arc::new(Mutex::new(None::<(String, Option<String>)>));
3843        let captured_clone = captured.clone();
3844
3845        // Build a minimal ephemeral service as the inner.
3846        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3847        let factory = AgentFactory::new(dir.path()).builtins(true);
3848        let config = Config::default();
3849        let builder = FactoryAgentBuilder::new(factory, config);
3850        let inner: Arc<dyn MobSessionService> =
3851            Arc::new(meerkat_session::EphemeralSessionService::new(builder, 4));
3852
3853        // Hook that mutates and captures the post-mutation state.
3854        let hook: PreBuildHook = Arc::new(move |req: &mut CreateSessionRequest| {
3855            req.model = "hooked-model".to_string();
3856            req.system_prompt = Some("injected-prompt".to_string());
3857            let labels = req.labels.get_or_insert_with(Default::default);
3858            labels.insert("hook_label".to_string(), "hook_value".to_string());
3859            // Capture to prove the hook ran and mutated the request.
3860            let mut lock = captured_clone
3861                .lock()
3862                .unwrap_or_else(std::sync::PoisonError::into_inner);
3863            *lock = Some((req.model.clone(), req.system_prompt.clone()));
3864            Box::pin(async { Ok(()) })
3865        });
3866        let wrapped = PreBuildMobSessionService {
3867            inner,
3868            hook,
3869            after_create_hook: None,
3870            runtime_adapter_override: None,
3871        };
3872
3873        let req = CreateSessionRequest {
3874            model: "original-model".to_string(),
3875            prompt: meerkat_core::ContentInput::Text("test".to_string()),
3876            render_metadata: None,
3877            system_prompt: None,
3878            max_tokens: None,
3879            event_tx: None,
3880            skill_references: None,
3881            initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
3882            build: None,
3883            labels: None,
3884            deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
3885        };
3886
3887        // create_session will fail (no LLM) but the hook runs first.
3888        let _ = meerkat_core::service::SessionService::create_session(&wrapped, req).await;
3889
3890        let (model, prompt) = captured
3891            .lock()
3892            .unwrap_or_else(std::sync::PoisonError::into_inner)
3893            .clone()
3894            .expect("hook must have been called");
3895        assert_eq!(model, "hooked-model", "hook must mutate the model");
3896        assert_eq!(
3897            prompt.as_deref(),
3898            Some("injected-prompt"),
3899            "hook must set the system prompt"
3900        );
3901    }
3902
3903    #[derive(Default)]
3904    struct ForwardingProbe {
3905        calls: Mutex<Vec<&'static str>>,
3906    }
3907
3908    impl ForwardingProbe {
3909        fn record(&self, call: &'static str) {
3910            self.calls
3911                .lock()
3912                .unwrap_or_else(std::sync::PoisonError::into_inner)
3913                .push(call);
3914        }
3915
3916        fn calls(&self) -> Vec<&'static str> {
3917            self.calls
3918                .lock()
3919                .unwrap_or_else(std::sync::PoisonError::into_inner)
3920                .clone()
3921        }
3922    }
3923
3924    #[async_trait]
3925    impl meerkat_core::service::SessionService for ForwardingProbe {
3926        async fn create_session(
3927            &self,
3928            _req: CreateSessionRequest,
3929        ) -> Result<meerkat_core::types::RunResult, SessionError> {
3930            Err(SessionError::Unsupported("create_session".to_string()))
3931        }
3932
3933        async fn start_turn(
3934            &self,
3935            _id: &meerkat_core::types::SessionId,
3936            _req: meerkat_core::service::StartTurnRequest,
3937        ) -> Result<meerkat_core::types::RunResult, SessionError> {
3938            Err(SessionError::Unsupported("start_turn".to_string()))
3939        }
3940
3941        async fn interrupt(
3942            &self,
3943            _id: &meerkat_core::types::SessionId,
3944        ) -> Result<(), SessionError> {
3945            self.record("interrupt");
3946            Ok(())
3947        }
3948
3949        async fn read(
3950            &self,
3951            id: &meerkat_core::types::SessionId,
3952        ) -> Result<meerkat_core::service::SessionView, SessionError> {
3953            Err(SessionError::NotFound { id: id.clone() })
3954        }
3955
3956        async fn list(
3957            &self,
3958            _query: meerkat_core::service::SessionQuery,
3959        ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
3960            Ok(Vec::new())
3961        }
3962
3963        async fn archive(&self, _id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
3964            self.record("archive");
3965            Ok(())
3966        }
3967    }
3968
3969    #[async_trait]
3970    impl meerkat_core::service::SessionServiceCommsExt for ForwardingProbe {}
3971
3972    #[async_trait]
3973    impl meerkat_core::service::SessionServiceControlExt for ForwardingProbe {
3974        async fn append_system_context(
3975            &self,
3976            _id: &meerkat_core::types::SessionId,
3977            _req: meerkat_core::service::AppendSystemContextRequest,
3978        ) -> Result<
3979            meerkat_core::service::AppendSystemContextResult,
3980            meerkat_core::service::SessionControlError,
3981        > {
3982            self.record("append_system_context");
3983            Ok(meerkat_core::service::AppendSystemContextResult {
3984                status: meerkat_core::service::AppendSystemContextStatus::Applied,
3985            })
3986        }
3987
3988        async fn stage_tool_results(
3989            &self,
3990            _id: &meerkat_core::types::SessionId,
3991            _req: meerkat_core::service::StageToolResultsRequest,
3992        ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
3993            self.record("stage_tool_results");
3994            Ok(meerkat_core::service::StageToolResultsResult {
3995                accepted_result_count: 7,
3996            })
3997        }
3998    }
3999
4000    #[async_trait]
4001    impl meerkat_core::service::SessionServiceHistoryExt for ForwardingProbe {
4002        async fn read_history(
4003            &self,
4004            id: &meerkat_core::types::SessionId,
4005            _query: meerkat_core::service::SessionHistoryQuery,
4006        ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
4007            Err(SessionError::NotFound { id: id.clone() })
4008        }
4009    }
4010
4011    #[async_trait]
4012    impl MobSessionService for ForwardingProbe {
4013        fn supports_persistent_sessions(&self) -> bool {
4014            true
4015        }
4016
4017        fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
4018            Some(Arc::new(meerkat_runtime::MeerkatMachine::ephemeral()))
4019        }
4020
4021        async fn archive_with_mob_lifecycle_authority(
4022            &self,
4023            _session_id: &meerkat_core::types::SessionId,
4024        ) -> Result<(), SessionError> {
4025            self.record("archive_with_mob_lifecycle_authority");
4026            Ok(())
4027        }
4028
4029        async fn stage_runtime_system_context_for_active_turn(
4030            &self,
4031            _session_id: &meerkat_core::types::SessionId,
4032            _expected_run_id: &meerkat_core::lifecycle::RunId,
4033            _appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
4034        ) -> Result<Option<Vec<u8>>, SessionError> {
4035            self.record("stage_runtime_system_context_for_active_turn");
4036            Ok(Some(b"snapshot".to_vec()))
4037        }
4038
4039        async fn discard_runtime_system_context_for_active_turn(
4040            &self,
4041            _session_id: &meerkat_core::types::SessionId,
4042            _expected_run_id: &meerkat_core::lifecycle::RunId,
4043            _idempotency_keys: Vec<String>,
4044        ) -> Result<(), SessionError> {
4045            self.record("discard_runtime_system_context_for_active_turn");
4046            Ok(())
4047        }
4048
4049        async fn active_turn_system_context_boundary_available(
4050            &self,
4051            _session_id: &meerkat_core::types::SessionId,
4052        ) -> Result<Option<bool>, SessionError> {
4053            self.record("active_turn_system_context_boundary_available");
4054            Ok(Some(true))
4055        }
4056    }
4057
4058    #[tokio::test]
4059    async fn pre_build_wrapper_forwards_mob_authority_and_control_extensions() {
4060        let probe = Arc::new(ForwardingProbe::default());
4061        let inner: Arc<dyn MobSessionService> = probe.clone();
4062        let wrapped = PreBuildMobSessionService {
4063            inner,
4064            hook: no_op_pre_build_hook(),
4065            after_create_hook: None,
4066            runtime_adapter_override: Some(Arc::new(meerkat_runtime::MeerkatMachine::ephemeral())),
4067        };
4068        let session_id = meerkat_core::types::SessionId::new();
4069
4070        MobSessionService::archive_with_mob_lifecycle_authority(&wrapped, &session_id)
4071            .await
4072            .expect("archive_with_mob_lifecycle_authority should forward to inner service");
4073        let staged = meerkat_core::service::SessionServiceControlExt::stage_tool_results(
4074            &wrapped,
4075            &session_id,
4076            meerkat_core::service::StageToolResultsRequest {
4077                results: Vec::new(),
4078            },
4079        )
4080        .await
4081        .expect("stage_tool_results should forward to inner service");
4082
4083        assert_eq!(staged.accepted_result_count, 7);
4084        let boundary_available = wrapped
4085            .active_turn_system_context_boundary_available(&session_id)
4086            .await
4087            .expect("active-turn boundary probe should forward");
4088        assert_eq!(boundary_available, Some(true));
4089        let snapshot = wrapped
4090            .stage_runtime_system_context_for_active_turn(
4091                &session_id,
4092                &meerkat_core::lifecycle::RunId::new(),
4093                vec![meerkat_core::session::PendingSystemContextAppend {
4094                    text: "steer".to_string(),
4095                    source: Some("test".to_string()),
4096                    idempotency_key: Some("test".to_string()),
4097                    accepted_at: meerkat_core::time_compat::SystemTime::now(),
4098                }],
4099            )
4100            .await
4101            .expect("active-turn staging should forward");
4102        assert_eq!(snapshot.as_deref(), Some(&b"snapshot"[..]));
4103        wrapped
4104            .discard_runtime_system_context_for_active_turn(
4105                &session_id,
4106                &meerkat_core::lifecycle::RunId::new(),
4107                vec!["test".to_string()],
4108            )
4109            .await
4110            .expect("active-turn rollback should forward");
4111        assert_eq!(
4112            probe.calls(),
4113            vec![
4114                "archive_with_mob_lifecycle_authority",
4115                "stage_tool_results",
4116                "active_turn_system_context_boundary_available",
4117                "stage_runtime_system_context_for_active_turn",
4118                "discard_runtime_system_context_for_active_turn",
4119            ]
4120        );
4121    }
4122
4123    /// Regression for two compounding bugs in the persistent wiring:
4124    ///
4125    /// 1. **0.6.0**: `persistent_inner` handed the
4126    ///    `PersistentSessionService` an `InMemoryRuntimeStore`. With the
4127    ///    runtime_store path active the `StoreCheckpointer` was disabled
4128    ///    (it's gated on `runtime_store.is_none()`), and the in-memory
4129    ///    store didn't survive process restart. Resume raised "missing
4130    ///    durable session snapshot for '<sid>'".
4131    ///
4132    /// 2. **0.6.1**: switching the session service to `runtime_store=None`
4133    ///    re-enabled the checkpointer (fixing #1) but broke archive/retire,
4134    ///    because `load_persisted_session_for_control` rejects mutations
4135    ///    when runtime_store is None and the session exists in the store
4136    ///    (the "store-only compatibility projection" error from
4137    ///    meerkat-session/src/persistent.rs:786).
4138    ///
4139    /// The 0.6.3 fix uses a **persistent** SqliteRuntimeStore — durable
4140    /// across restart AND control-op authoritative — at
4141    /// `<store_path>/runtime.sqlite`.
4142    #[test]
4143    fn persistent_bootstrap_uses_sqlite_runtime_store() {
4144        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4145        let store_path = dir.path().to_path_buf();
4146        let Ok(sqlite) = meerkat_store::SqliteSessionStore::open(store_path.join("sessions.db"))
4147        else {
4148            panic!("failed to open sqlite session store");
4149        };
4150        let session_store: Arc<dyn SessionStore> = Arc::new(sqlite);
4151        let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
4152            panic!("failed to parse minimal mob definition");
4153        };
4154        let spec = MobBootstrapSpec::persistent(
4155            definition,
4156            meerkat_mob::MobStorage::in_memory(),
4157            store_path.clone(),
4158            4,
4159            session_store,
4160        );
4161        assert!(
4162            spec.runtime_adapter.is_some(),
4163            "persistent bootstrap must provide its own runtime adapter via spec.runtime_adapter"
4164        );
4165        assert!(
4166            spec.session_service.runtime_adapter().is_some(),
4167            "session service must own a runtime_store so archive/retire don't \
4168             hit the store-only-projection rejection"
4169        );
4170        assert!(
4171            store_path.join("runtime.sqlite").exists(),
4172            "persistent_inner must open a SqliteRuntimeStore at <store_path>/runtime.sqlite"
4173        );
4174    }
4175
4176    /// Ephemeral counterpart: runtime-backed ephemeral builds must use a
4177    /// single in-memory machine authority for session service, comms, and
4178    /// image-generation tooling.
4179    #[test]
4180    fn ephemeral_runtime_backed_uses_session_service_runtime_adapter() {
4181        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4182        let store_path = dir.path().to_path_buf();
4183        let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
4184            panic!("failed to parse minimal mob definition");
4185        };
4186        let spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4187            definition,
4188            meerkat_mob::MobStorage::in_memory(),
4189            store_path,
4190            4,
4191            None,
4192            None,
4193            None,
4194            CapabilityFlags::default(),
4195            None,
4196            None,
4197        );
4198        assert!(
4199            spec.runtime_adapter.is_some(),
4200            "ephemeral_runtime_backed_inner must expose the shared runtime authority"
4201        );
4202        assert!(
4203            spec.session_service.runtime_adapter().is_some(),
4204            "session service must still expose a runtime adapter so autonomous-host comms can wire"
4205        );
4206    }
4207
4208    #[tokio::test]
4209    async fn agent_mob_tools_expose_definition_profiles_as_realm_profiles() {
4210        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4211        let store_path = dir.path().to_path_buf();
4212        let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
4213            "[mob]\nid = \"test\"\n\n[profiles.investigation-worker]\nmodel = \"gpt-5.5\"\n[profiles.investigation-worker.tools]\ncomms = true\nmob = true\n\n[profiles.person-worker]\nmodel = \"gpt-5.5\"\n[profiles.person-worker.tools]\ncomms = true\n",
4214        ) else {
4215            panic!("failed to parse mob definition with worker profiles");
4216        };
4217
4218        let spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4219            definition,
4220            meerkat_mob::MobStorage::in_memory(),
4221            store_path,
4222            4,
4223            None,
4224            None,
4225            None,
4226            CapabilityFlags::default(),
4227            None,
4228            None,
4229        );
4230        let state = spec
4231            .agent_mob_mcp_state
4232            .expect("agent mob MCP state should be installed");
4233
4234        let profiles = state
4235            .realm_profile_list()
4236            .await
4237            .expect("definition profiles should list through agent mob tools");
4238        let names = profiles
4239            .iter()
4240            .map(|profile| profile.name.as_str())
4241            .collect::<Vec<_>>();
4242        assert!(
4243            names.contains(&"investigation-worker"),
4244            "definition profiles must be visible to mob_profile_list so agents can create mobs that reference them"
4245        );
4246        assert!(names.contains(&"person-worker"));
4247
4248        let worker = state
4249            .realm_profile_get("investigation-worker")
4250            .await
4251            .expect("definition profile lookup should succeed")
4252            .expect("definition profile should exist");
4253        assert_eq!(worker.profile.model, "gpt-5.5");
4254        assert_eq!(
4255            worker.revision, 0,
4256            "definition-backed profiles are immutable runtime seeds, not persisted realm revisions"
4257        );
4258    }
4259
4260    #[tokio::test]
4261    async fn agent_created_mobs_can_spawn_definition_seeded_realm_profiles() {
4262        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4263        let store_path = dir.path().to_path_buf();
4264        let Ok(parent_definition) = meerkat_mob::MobDefinition::from_toml(
4265            "[mob]\nid = \"parent\"\n\n[profiles.investigation-worker]\nmodel = \"gpt-5.5\"\n[profiles.investigation-worker.tools]\ncomms = true\nmob = true\n",
4266        ) else {
4267            panic!("failed to parse parent mob definition");
4268        };
4269
4270        let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4271            parent_definition,
4272            meerkat_mob::MobStorage::in_memory(),
4273            store_path,
4274            4,
4275            None,
4276            None,
4277            None,
4278            CapabilityFlags::default(),
4279            None,
4280            None,
4281        );
4282        spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
4283        let runtime = MobRuntime::bootstrap(spec)
4284            .await
4285            .unwrap_or_else(|e| panic!("{e}"));
4286        let state = runtime
4287            .agent_mob_mcp_state
4288            .clone()
4289            .expect("agent mob MCP state should be installed");
4290        let Ok(child_definition) = meerkat_mob::MobDefinition::from_toml(
4291            "[mob]\nid = \"child\"\n\n[profiles.investigation-worker]\nrealm_profile = \"investigation-worker\"\n",
4292        ) else {
4293            panic!("failed to parse child mob definition");
4294        };
4295
4296        let mob_id = state
4297            .mob_create_definition(child_definition)
4298            .await
4299            .expect("child mob should be created");
4300        state
4301            .mob_spawn_spec(
4302                &mob_id,
4303                SpawnMemberSpec::new(
4304                    ProfileName::from("investigation-worker"),
4305                    meerkat_mob::AgentIdentity::from("investigation-worker:one"),
4306                ),
4307            )
4308            .await
4309            .expect("created mob should resolve definition-seeded realm profile at spawn time");
4310    }
4311
4312    #[tokio::test]
4313    async fn ephemeral_runtime_backed_custom_session_store_persists_created_session() {
4314        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4315        let store_path = dir.path().to_path_buf();
4316        let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
4317            "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\n[profiles.worker.tools]\ncomms = true\n",
4318        ) else {
4319            panic!("failed to parse minimal mob definition");
4320        };
4321        let custom_store: Arc<dyn SessionStore> = Arc::new(meerkat_store::MemoryStore::new());
4322        let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4323            definition,
4324            meerkat_mob::MobStorage::in_memory(),
4325            store_path,
4326            4,
4327            Some(custom_store.clone()),
4328            None,
4329            None,
4330            CapabilityFlags::default(),
4331            None,
4332            None,
4333        );
4334        spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
4335
4336        let runtime = MobRuntime::bootstrap(spec)
4337            .await
4338            .unwrap_or_else(|e| panic!("{e}"));
4339        let mid = meerkat_mob::ids::MeerkatId::from("worker:one");
4340        runtime
4341            .handle
4342            .spawn_spec(SpawnMemberSpec::new(
4343                ProfileName::from("worker"),
4344                mid.clone(),
4345            ))
4346            .await
4347            .unwrap_or_else(|e| panic!("{e}"));
4348        let session_id = runtime
4349            .handle
4350            .resolve_bridge_session_id(&mid)
4351            .await
4352            .unwrap_or_else(|| panic!("spawned worker has no bridge session id"));
4353
4354        let stored = custom_store
4355            .load(&session_id)
4356            .await
4357            .unwrap_or_else(|e| panic!("{e}"));
4358        assert!(
4359            stored.is_some(),
4360            "ephemeral runtime-backed builds with a custom store must persist through that store"
4361        );
4362    }
4363
4364    #[tokio::test]
4365    async fn ephemeral_runtime_backed_custom_session_store_resumes_after_runtime_restart() {
4366        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4367        let store_path = dir.path().to_path_buf();
4368        let definition_toml = "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\n[profiles.worker.tools]\ncomms = true\n";
4369        let Ok(definition) = meerkat_mob::MobDefinition::from_toml(definition_toml) else {
4370            panic!("failed to parse minimal mob definition");
4371        };
4372        let custom_store: Arc<dyn SessionStore> = Arc::new(meerkat_store::MemoryStore::new());
4373        let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4374            definition,
4375            meerkat_mob::MobStorage::in_memory(),
4376            store_path.clone(),
4377            4,
4378            Some(custom_store.clone()),
4379            None,
4380            None,
4381            CapabilityFlags::default(),
4382            None,
4383            None,
4384        );
4385        spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
4386
4387        let runtime = MobRuntime::bootstrap(spec)
4388            .await
4389            .unwrap_or_else(|e| panic!("{e}"));
4390        let mid = meerkat_mob::ids::MeerkatId::from("worker:one");
4391        runtime
4392            .handle
4393            .spawn_spec(SpawnMemberSpec::new(
4394                ProfileName::from("worker"),
4395                mid.clone(),
4396            ))
4397            .await
4398            .unwrap_or_else(|e| panic!("{e}"));
4399        let session_id = runtime
4400            .handle
4401            .resolve_bridge_session_id(&mid)
4402            .await
4403            .unwrap_or_else(|| panic!("spawned worker has no bridge session id"));
4404        drop(runtime);
4405
4406        let Ok(definition) = meerkat_mob::MobDefinition::from_toml(definition_toml) else {
4407            panic!("failed to parse minimal mob definition");
4408        };
4409        let mut restarted_spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4410            definition,
4411            meerkat_mob::MobStorage::in_memory(),
4412            store_path,
4413            4,
4414            Some(custom_store),
4415            None,
4416            None,
4417            CapabilityFlags::default(),
4418            None,
4419            None,
4420        );
4421        restarted_spec.options.default_llm_client =
4422            Some(Arc::new(meerkat_client::TestClient::default()));
4423
4424        let restarted = MobRuntime::bootstrap(restarted_spec)
4425            .await
4426            .unwrap_or_else(|e| panic!("{e}"));
4427        let mut resume_spec = SpawnMemberSpec::new(ProfileName::from("worker"), mid.clone());
4428        resume_spec.launch_mode = meerkat_mob::MemberLaunchMode::Resume {
4429            bridge_session_id: session_id.clone(),
4430        };
4431        restarted
4432            .handle
4433            .spawn_spec(resume_spec)
4434            .await
4435            .unwrap_or_else(|e| panic!("resume should load the external session snapshot: {e}"));
4436
4437        let resumed_session_id = restarted
4438            .handle
4439            .resolve_bridge_session_id(&mid)
4440            .await
4441            .unwrap_or_else(|| panic!("resumed worker has no bridge session id"));
4442        assert_eq!(resumed_session_id, session_id);
4443    }
4444
4445    /// Regression: public ephemeral builds without image generation stay on the
4446    /// lighter direct session-service path.
4447    #[test]
4448    fn ephemeral_bootstrap_without_image_generation_stays_direct() {
4449        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4450        let store_path = dir.path().to_path_buf();
4451        let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
4452            "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\nruntime_mode = \"autonomous_host\"\n[profiles.worker.tools]\ncomms = true\n",
4453        ) else {
4454            panic!("failed to parse definition");
4455        };
4456        let spec = MobBootstrapSpec::ephemeral_inner(
4457            definition,
4458            meerkat_mob::MobStorage::in_memory(),
4459            store_path,
4460            4,
4461            None,
4462            None,
4463            CapabilityFlags::default(),
4464            None,
4465            None,
4466        );
4467        assert!(
4468            spec.runtime_adapter.is_none(),
4469            "public ephemeral builds only need a runtime adapter when the definition may use image generation"
4470        );
4471    }
4472
4473    /// Regression: public ephemeral image-generation builds must expose the same
4474    /// runtime adapter through the spec and the session service. The generated
4475    /// image tool consults runtime session/image-operation state by session id,
4476    /// so a fresh, tool-only MeerkatMachine cannot be used here.
4477    #[test]
4478    fn ephemeral_bootstrap_with_image_generation_shares_runtime_adapter() {
4479        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4480        let store_path = dir.path().to_path_buf();
4481        let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
4482            r#"
4483[mob]
4484id = "test"
4485
4486[profiles.commander]
4487model = "gpt-5.5"
4488
4489[profiles.commander.tools]
4490builtins = true
4491image_generation = true
4492"#,
4493        ) else {
4494            panic!("failed to parse image-generation definition");
4495        };
4496        let spec = MobBootstrapSpec::ephemeral(
4497            definition,
4498            meerkat_mob::MobStorage::in_memory(),
4499            store_path,
4500            4,
4501            None,
4502        );
4503        let spec_adapter = spec
4504            .runtime_adapter
4505            .as_ref()
4506            .expect("image-generation ephemeral builds must expose a runtime adapter");
4507        let service_adapter = spec
4508            .session_service
4509            .runtime_adapter()
4510            .expect("session service must expose the same runtime adapter");
4511        assert!(
4512            spec_adapter.shares_runtime_persistence_with(&service_adapter),
4513            "image-generation tool state and session state must share one runtime authority"
4514        );
4515    }
4516
4517    /// Runtime-owned handling/routing semantics must be stripped before a
4518    /// runtime-applied turn reaches the direct session-service path.
4519    #[test]
4520    fn normalize_runtime_turn_request_strips_runtime_owned_semantics() {
4521        let req = meerkat_core::service::StartTurnRequest {
4522            prompt: meerkat_core::ContentInput::Text("checkpoint".to_string()),
4523            system_prompt: Some("system".to_string()),
4524            event_tx: None,
4525            runtime: meerkat_core::service::StartTurnRuntimeSemantics {
4526                render_metadata: Some(meerkat_core::types::RenderMetadata {
4527                    class: meerkat_core::types::RenderClass::OpsProgress,
4528                    salience: meerkat_core::types::RenderSalience::Urgent,
4529                }),
4530                handling_mode: meerkat_core::types::HandlingMode::Steer,
4531                skill_references: None,
4532                flow_tool_overlay: None,
4533                pre_turn_context_appends: Vec::new(),
4534                typed_turn_appends: Vec::new(),
4535                turn_metadata: None,
4536            },
4537        };
4538
4539        let expected_prompt = req.prompt.clone();
4540        let expected_system_prompt = req.system_prompt.clone();
4541
4542        let normalized = normalize_runtime_turn_request(req);
4543
4544        assert_eq!(
4545            normalized.runtime.handling_mode,
4546            meerkat_core::types::HandlingMode::Queue,
4547            "runtime-applied turns must downgrade Steer before reaching direct session services"
4548        );
4549        assert!(
4550            normalized.runtime.render_metadata.is_none(),
4551            "runtime-owned render metadata must not be forwarded through the direct agent path"
4552        );
4553        assert_eq!(normalized.prompt, expected_prompt);
4554        assert_eq!(normalized.system_prompt, expected_system_prompt);
4555    }
4556
4557    /// SessionCreatedContext must carry model, labels, and optional system_prompt.
4558    #[test]
4559    fn session_created_context_fields() {
4560        let ctx = SessionCreatedContext {
4561            model: "claude-sonnet-4-5".to_string(),
4562            labels: std::collections::BTreeMap::from([(
4563                "agent_type".to_string(),
4564                "lead".to_string(),
4565            )]),
4566            system_prompt: Some("You are a lead agent.".to_string()),
4567        };
4568        assert_eq!(ctx.model, "claude-sonnet-4-5");
4569        assert_eq!(ctx.labels["agent_type"], "lead");
4570        assert_eq!(ctx.system_prompt.as_deref(), Some("You are a lead agent."));
4571    }
4572
4573    /// SessionHook default implementations are no-ops — calling them must not panic.
4574    #[tokio::test]
4575    async fn session_hook_default_impls_are_noop() {
4576        struct EmptyHook;
4577        #[async_trait]
4578        impl SessionHook for EmptyHook {}
4579
4580        let hook = EmptyHook;
4581        let mut req = CreateSessionRequest {
4582            model: "test".to_string(),
4583            prompt: meerkat_core::ContentInput::Text("test".to_string()),
4584            render_metadata: None,
4585            system_prompt: None,
4586            max_tokens: None,
4587            event_tx: None,
4588            skill_references: None,
4589            initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
4590            build: None,
4591            labels: None,
4592            deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
4593        };
4594        // before_create must succeed with default impl.
4595        hook.before_create(&mut req).await.unwrap();
4596        // after_create must not panic.
4597        let ctx = SessionCreatedContext {
4598            model: "test".to_string(),
4599            labels: Default::default(),
4600            system_prompt: None,
4601        };
4602        hook.after_create(&meerkat_core::types::SessionId::new(), &ctx)
4603            .await;
4604    }
4605
4606    /// before_create returning Err must abort (the caller decides how).
4607    #[tokio::test]
4608    async fn session_hook_before_create_can_abort() {
4609        struct AbortHook;
4610        #[async_trait]
4611        impl SessionHook for AbortHook {
4612            async fn before_create(
4613                &self,
4614                _req: &mut CreateSessionRequest,
4615            ) -> Result<(), SessionError> {
4616                Err(SessionError::Unsupported("hook abort".into()))
4617            }
4618        }
4619
4620        let hook = AbortHook;
4621        let mut req = CreateSessionRequest {
4622            model: "test".to_string(),
4623            prompt: meerkat_core::ContentInput::Text("test".to_string()),
4624            render_metadata: None,
4625            system_prompt: None,
4626            max_tokens: None,
4627            event_tx: None,
4628            skill_references: None,
4629            initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
4630            build: None,
4631            labels: None,
4632            deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
4633        };
4634        let result = hook.before_create(&mut req).await;
4635        assert!(result.is_err());
4636    }
4637
4638    /// before_create mutations must be visible in the request.
4639    #[tokio::test]
4640    async fn session_hook_before_create_mutates_request() {
4641        struct MutatingHook;
4642        #[async_trait]
4643        impl SessionHook for MutatingHook {
4644            async fn before_create(
4645                &self,
4646                req: &mut CreateSessionRequest,
4647            ) -> Result<(), SessionError> {
4648                req.model = "hook-overridden".to_string();
4649                req.system_prompt = Some("injected by hook".to_string());
4650                Ok(())
4651            }
4652        }
4653
4654        let hook = MutatingHook;
4655        let mut req = CreateSessionRequest {
4656            model: "original".to_string(),
4657            prompt: meerkat_core::ContentInput::Text("test".to_string()),
4658            render_metadata: None,
4659            system_prompt: None,
4660            max_tokens: None,
4661            event_tx: None,
4662            skill_references: None,
4663            initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
4664            build: None,
4665            labels: None,
4666            deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
4667        };
4668        hook.before_create(&mut req).await.unwrap();
4669        assert_eq!(req.model, "hook-overridden");
4670        assert_eq!(req.system_prompt.as_deref(), Some("injected by hook"));
4671    }
4672}