Skip to main content

meerkat_mobkit/
mob_handle_runtime.rs

1//! Mob member lifecycle management — bootstrap, spawn, reconcile, and roster queries.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use meerkat::{AgentFactory, Config, FactoryAgentBuilder, SessionStore};
10use meerkat_client::types::LlmStream;
11use meerkat_client::{LlmClient, LlmRequest};
12use meerkat_core::agent::CommsRuntime;
13use meerkat_core::service::{
14    CreateSessionRequest, SessionError, SessionHistoryPage, SessionHistoryQuery,
15    SessionServiceHistoryExt,
16};
17use meerkat_core::{AgentSessionStore, AssistantBlock, Message, Provider};
18use meerkat_mob::{
19    MobBuilder, MobDefinition, MobError, MobHandle, MobSessionService, MobStorage, Profile,
20    ProfileName, SpawnMemberSpec,
21};
22use meerkat_runtime::input_state::StoredInputState;
23use meerkat_runtime::store::MachineLifecycleCommit;
24use meerkat_store::StoreAdapter;
25use serde_json::Value;
26
27use crate::blob_store::{
28    Base64BlobStoreAdapter, BinaryBlobStore, BinaryBlobStoreAdapter, ObjectStoreBlobStore,
29};
30
31pub(crate) const DELEGATE_IDLE_RETIRE_SECS_LABEL: &str = "implicit_delegate_idle_retire_secs";
32pub(crate) const DELEGATE_IDLE_RETIRE_DISABLED_LABEL: &str = "disabled";
33
34pub(crate) fn is_previous_member_cleanup_ambiguous_error(error: &str) -> bool {
35    error.contains("previous member cleanup ambiguous for member ")
36}
37
38pub(crate) fn is_recoverable_lifecycle_cleanup_error(error: &str) -> bool {
39    is_previous_member_cleanup_ambiguous_error(error)
40        || (error.contains("disposal completed but ArchiveSession failed")
41            && error.contains("cancel-before-retire failed")
42            && error.contains("Runtime not ready: running"))
43}
44
45#[cfg(test)]
46use std::sync::{Mutex, OnceLock};
47
48/// Member state constant for active members.
49pub const MEMBER_STATE_ACTIVE: &str = "active";
50/// Member state constant for members transitioning to retired.
51pub const MEMBER_STATE_RETIRING: &str = "retiring";
52
53/// Options for bootstrapping a mob runtime.
54#[derive(Clone, Default)]
55pub struct MobBootstrapOptions {
56    pub allow_ephemeral_sessions: bool,
57    pub notify_orchestrator_on_resume: bool,
58    pub default_llm_client: Option<Arc<dyn LlmClient>>,
59}
60
61/// Wraps an LLM client and strips provider-emitted evidence blocks that are
62/// useful for UI/citation projection but unsafe to replay into the next
63/// stateless provider request.
64pub struct ReplaySanitizingLlmClient {
65    inner: Arc<dyn LlmClient>,
66}
67
68type SharedDefaultLlmClientSlot = Arc<std::sync::RwLock<Option<Arc<dyn LlmClient>>>>;
69
70impl ReplaySanitizingLlmClient {
71    pub fn new(inner: Arc<dyn LlmClient>) -> Self {
72        Self { inner }
73    }
74
75    pub fn wrap(inner: Arc<dyn LlmClient>) -> Arc<dyn LlmClient> {
76        Arc::new(Self::new(inner))
77    }
78}
79
80/// Agent-layer companion to [`ReplaySanitizingLlmClient`].
81///
82/// Meerkat session services can also receive already-adapted
83/// `AgentLlmClient`s through live replacement and hot-swap APIs. Sanitize at
84/// that boundary too so provider-emitted server tool telemetry is never
85/// replayed into the next stateless model request just because the client
86/// entered below the raw `LlmClient` adapter seam.
87pub struct ReplaySanitizingAgentLlmClient {
88    inner: Arc<dyn meerkat_core::AgentLlmClient>,
89}
90
91impl ReplaySanitizingAgentLlmClient {
92    pub fn new(inner: Arc<dyn meerkat_core::AgentLlmClient>) -> Self {
93        Self { inner }
94    }
95
96    pub fn wrap(
97        inner: Arc<dyn meerkat_core::AgentLlmClient>,
98    ) -> Arc<dyn meerkat_core::AgentLlmClient> {
99        Arc::new(Self::new(inner))
100    }
101}
102
103#[async_trait]
104impl meerkat_core::AgentLlmClient for ReplaySanitizingAgentLlmClient {
105    async fn stream_response(
106        &self,
107        messages: &[Message],
108        tools: &[Arc<meerkat_core::ToolDef>],
109        max_tokens: u32,
110        temperature: Option<f32>,
111        provider_params: Option<&meerkat_core::lifecycle::run_primitive::ProviderParamsOverride>,
112    ) -> Result<meerkat_core::agent::LlmStreamResult, meerkat_core::AgentError> {
113        let sanitized: Vec<Message> = messages
114            .iter()
115            .cloned()
116            .map(sanitize_message_for_stateless_replay)
117            .collect();
118        self.inner
119            .stream_response(&sanitized, tools, max_tokens, temperature, provider_params)
120            .await
121    }
122
123    fn provider(&self) -> &'static str {
124        self.inner.provider()
125    }
126
127    fn model(&self) -> &str {
128        self.inner.model()
129    }
130
131    fn compile_schema(
132        &self,
133        output_schema: &meerkat_core::OutputSchema,
134    ) -> Result<meerkat_core::schema::CompiledSchema, meerkat_core::schema::SchemaError> {
135        self.inner.compile_schema(output_schema)
136    }
137}
138
139#[async_trait]
140impl LlmClient for ReplaySanitizingLlmClient {
141    fn project_replay_messages(
142        &self,
143        messages: &[Message],
144    ) -> Result<Vec<Message>, meerkat_client::LlmError> {
145        let sanitized: Vec<Message> = messages
146            .iter()
147            .cloned()
148            .map(sanitize_message_for_stateless_replay)
149            .collect();
150        self.inner.project_replay_messages(&sanitized)
151    }
152
153    fn stream<'a>(&'a self, request: &'a LlmRequest) -> LlmStream<'a> {
154        let inner = Arc::clone(&self.inner);
155        let sanitized = sanitize_llm_request_for_stateless_replay(request);
156        Box::pin(async_stream::stream! {
157            let mut stream = inner.stream(&sanitized);
158            while let Some(event) = stream.next().await {
159                if runtime_turn_diagnostics_enabled()
160                    && let Err(error) = &event
161                {
162                    tracing::error!(
163                        error = %error,
164                        error_debug = ?error,
165                        "mobkit llm client stream error"
166                    );
167                }
168                yield event;
169            }
170        })
171    }
172
173    fn provider(&self) -> &'static str {
174        self.inner.provider()
175    }
176
177    async fn health_check(&self) -> Result<(), meerkat_client::LlmError> {
178        self.inner.health_check().await
179    }
180
181    fn compile_schema(
182        &self,
183        output_schema: &meerkat_core::OutputSchema,
184    ) -> Result<meerkat_core::schema::CompiledSchema, meerkat_core::schema::SchemaError> {
185        self.inner.compile_schema(output_schema)
186    }
187}
188
189/// Async hook called before each session is created. Receives the mutable
190/// `CreateSessionRequest` so the app can inject external tools, augment the
191/// system prompt, set labels, override the model, load session resume data
192/// from external stores, etc.
193///
194/// The hook runs **before** `create_session` captures labels and LLM identity,
195/// so all mutations are reflected in session metadata, not just the agent build.
196///
197/// ```rust,ignore
198/// let spec = MobBootstrapSpec::persistent_with_hook(
199///     definition, storage, store_path, 64, session_store,
200///     |req: &mut CreateSessionRequest| {
201///         Box::pin(async move {
202///             // Async: load session from external store
203///             let session = my_store.load_by_owner(&owner_id).await;
204///             if let Some(s) = session {
205///                 let build = req.build.get_or_insert_with(SessionBuildOptions::default);
206///                 build.resume_session = Some(s);
207///             }
208///             // Sync: inject tools, augment prompt
209///             let build = req.build.get_or_insert_with(SessionBuildOptions::default);
210///             build.external_tools = Some(my_tools());
211///             Ok(())
212///         })
213///     },
214/// );
215/// ```
216pub(crate) type PreBuildHook = Arc<
217    dyn Fn(
218            &mut CreateSessionRequest,
219        ) -> std::pin::Pin<
220            Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
221        > + Send
222        + Sync,
223>;
224
225/// Optional post-creation hook invoked after `create_session` succeeds.
226pub type AfterCreateHook = Arc<
227    dyn Fn(
228            meerkat_core::types::SessionId,
229            SessionCreatedContext,
230        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
231        + Send
232        + Sync,
233>;
234
235/// Wraps a `MobSessionService`, applying a `PreBuildHook` to the
236/// `CreateSessionRequest` in `create_session()` before delegating.
237///
238/// The hook runs before labels and LLM identity are captured by the inner
239/// session service, so mutations to `req.labels`, `req.model`, `req.build`,
240/// and `req.system_prompt` are fully reflected in session metadata.
241struct PreBuildMobSessionService {
242    inner: Arc<dyn MobSessionService>,
243    hook: PreBuildHook,
244    after_create_hook: Option<AfterCreateHook>,
245    runtime_adapter_override: Option<Arc<meerkat_runtime::MeerkatMachine>>,
246}
247
248fn no_op_pre_build_hook() -> PreBuildHook {
249    Arc::new(|_req: &mut CreateSessionRequest| Box::pin(async { Ok(()) }))
250}
251
252#[derive(Debug, Clone, Copy, PartialEq, Eq)]
253pub(crate) enum DelegateIdleRetireOverride {
254    Disabled,
255    Seconds(u64),
256}
257
258#[derive(Clone, Default)]
259pub(crate) struct ImplicitDelegateRetirementOverrides {
260    inner: Arc<tokio::sync::RwLock<BTreeMap<(String, String), DelegateIdleRetireOverride>>>,
261}
262
263impl ImplicitDelegateRetirementOverrides {
264    pub(crate) async fn set(
265        &self,
266        mob_id: impl Into<String>,
267        member_id: impl Into<String>,
268        override_policy: DelegateIdleRetireOverride,
269    ) {
270        self.inner
271            .write()
272            .await
273            .insert((mob_id.into(), member_id.into()), override_policy);
274    }
275
276    pub(crate) async fn get(
277        &self,
278        mob_id: &str,
279        member_id: &str,
280    ) -> Option<DelegateIdleRetireOverride> {
281        self.inner
282            .read()
283            .await
284            .get(&(mob_id.to_string(), member_id.to_string()))
285            .copied()
286    }
287}
288
289struct AutoWireParentMobToolsFactory {
290    inner: Arc<dyn meerkat_core::service::MobToolsFactory>,
291    implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides,
292}
293
294#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
295#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
296impl meerkat_core::service::MobToolsFactory for AutoWireParentMobToolsFactory {
297    async fn build_mob_tools(
298        &self,
299        args: meerkat_core::service::MobToolsBuildArgs,
300    ) -> Result<Arc<dyn meerkat_core::AgentToolDispatcher>, Box<dyn std::error::Error + Send + Sync>>
301    {
302        let inner = self.inner.build_mob_tools(args).await?;
303        Ok(Arc::new(AutoWireParentMobToolDispatcher {
304            inner,
305            implicit_delegate_retirement_overrides: self
306                .implicit_delegate_retirement_overrides
307                .clone(),
308        }))
309    }
310}
311
312struct AutoWireParentMobToolDispatcher {
313    inner: Arc<dyn meerkat_core::AgentToolDispatcher>,
314    implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides,
315}
316
317#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
318#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
319impl meerkat_core::AgentToolDispatcher for AutoWireParentMobToolDispatcher {
320    fn tools(&self) -> Arc<[Arc<meerkat_core::types::ToolDef>]> {
321        self.inner
322            .tools()
323            .iter()
324            .map(|tool| {
325                if tool.name == "delegate" {
326                    Arc::new(delegate_tool_def_with_idle_retire_secs(tool))
327                } else if tool.name == "mob_spawn_member" {
328                    Arc::new(mob_spawn_tool_def_with_idle_retire_secs(tool))
329                } else {
330                    Arc::clone(tool)
331                }
332            })
333            .collect::<Vec<_>>()
334            .into()
335    }
336
337    async fn dispatch(
338        &self,
339        call: meerkat_core::types::ToolCallView<'_>,
340    ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
341        if call.name == "delegate" {
342            return self.dispatch_delegate(call).await;
343        }
344        if call.name != "mob_spawn_member" {
345            return self.inner.dispatch(call).await;
346        }
347        self.dispatch_mob_spawn_member(call).await
348    }
349
350    fn capabilities(&self) -> meerkat_core::agent::DispatcherCapabilities {
351        self.inner.capabilities()
352    }
353
354    fn bind_ops_lifecycle(
355        self: Arc<Self>,
356        registry: Arc<dyn meerkat_core::ops_lifecycle::OpsLifecycleRegistry>,
357        owner_bridge_session_id: meerkat_core::types::SessionId,
358    ) -> Result<meerkat_core::agent::BindOutcome, meerkat_core::agent::OpsLifecycleBindError> {
359        let owned = Arc::try_unwrap(self)
360            .map_err(|_| meerkat_core::agent::OpsLifecycleBindError::SharedOwnership)?;
361        let outcome = owned
362            .inner
363            .bind_ops_lifecycle(registry, owner_bridge_session_id)?;
364        let was_bound = outcome.was_bound();
365        let dispatcher = Arc::new(Self {
366            inner: outcome.into_dispatcher(),
367            implicit_delegate_retirement_overrides: owned.implicit_delegate_retirement_overrides,
368        });
369        Ok(if was_bound {
370            meerkat_core::agent::BindOutcome::Bound(dispatcher)
371        } else {
372            meerkat_core::agent::BindOutcome::Skipped(dispatcher)
373        })
374    }
375}
376
377impl AutoWireParentMobToolDispatcher {
378    async fn dispatch_mob_spawn_member(
379        &self,
380        call: meerkat_core::types::ToolCallView<'_>,
381    ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
382        let mut args = serde_json::from_str::<Value>(call.args.get()).map_err(|error| {
383            meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
384        })?;
385        let idle_retire_override = delegate_idle_retire_override_from_args(call.name, &mut args)?;
386        let idle_retire_targets = idle_retire_targets_from_spawn_args(&args);
387        if let Some(object) = args.as_object_mut() {
388            object
389                .entry("auto_wire_parent".to_string())
390                .or_insert(Value::Bool(true));
391        }
392        let args = serde_json::value::RawValue::from_string(args.to_string()).map_err(|error| {
393            meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
394        })?;
395        let call = meerkat_core::types::ToolCallView {
396            id: call.id,
397            name: call.name,
398            args: &args,
399        };
400        let outcome = self.inner.dispatch(call).await?;
401        self.register_idle_retire_override_from_outcome(
402            &outcome,
403            idle_retire_override,
404            &idle_retire_targets,
405        )
406        .await;
407
408        Ok(outcome)
409    }
410
411    async fn dispatch_delegate(
412        &self,
413        call: meerkat_core::types::ToolCallView<'_>,
414    ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
415        let mut args = serde_json::from_str::<Value>(call.args.get()).map_err(|error| {
416            meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
417        })?;
418        let idle_retire_override = delegate_idle_retire_override_from_args(call.name, &mut args)?;
419        let args = serde_json::value::RawValue::from_string(args.to_string()).map_err(|error| {
420            meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
421        })?;
422        let call = meerkat_core::types::ToolCallView {
423            id: call.id,
424            name: call.name,
425            args: &args,
426        };
427        let outcome = self.inner.dispatch(call).await?;
428
429        self.register_idle_retire_override_from_outcome(&outcome, idle_retire_override, &[])
430            .await;
431
432        Ok(outcome)
433    }
434
435    async fn register_idle_retire_override_from_outcome(
436        &self,
437        outcome: &meerkat_core::ToolDispatchOutcome,
438        idle_retire_override: Option<DelegateIdleRetireOverride>,
439        fallback_targets: &[IdleRetireTarget],
440    ) {
441        if outcome.result.is_error {
442            return;
443        }
444        let Some(override_policy) = idle_retire_override else {
445            return;
446        };
447        for target in
448            idle_retire_targets_from_outcome_text(&outcome.result.text_content(), fallback_targets)
449        {
450            self.implicit_delegate_retirement_overrides
451                .set(&target.mob_id, &target.member_id, override_policy)
452                .await;
453        }
454    }
455}
456
457#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
458struct IdleRetireTarget {
459    mob_id: String,
460    member_id: String,
461}
462
463fn text_field<'a>(value: &'a Value, key: &str) -> Option<&'a str> {
464    value
465        .get(key)
466        .and_then(Value::as_str)
467        .filter(|text| !text.is_empty())
468}
469
470fn member_identity_field(value: &Value) -> Option<&str> {
471    text_field(value, "agent_identity")
472        .or_else(|| text_field(value, "member_id"))
473        .or_else(|| text_field(value, "identity"))
474}
475
476fn target_from_value(value: &Value, default_mob_id: Option<&str>) -> Option<IdleRetireTarget> {
477    let mob_id = text_field(value, "mob_id").or(default_mob_id)?;
478    let member_id = member_identity_field(value)?;
479    Some(IdleRetireTarget {
480        mob_id: mob_id.to_string(),
481        member_id: member_id.to_string(),
482    })
483}
484
485fn idle_retire_targets_from_spawn_args(args: &Value) -> Vec<IdleRetireTarget> {
486    let default_mob_id = text_field(args, "mob_id");
487    let mut targets = BTreeSet::new();
488    if let Some(target) = target_from_value(args, default_mob_id) {
489        targets.insert(target);
490    }
491    for key in ["specs", "members"] {
492        let Some(values) = args.get(key).and_then(Value::as_array) else {
493            continue;
494        };
495        for value in values {
496            if let Some(target) = target_from_value(value, default_mob_id) {
497                targets.insert(target);
498            }
499        }
500    }
501    targets.into_iter().collect()
502}
503
504fn target_from_result_value(
505    value: &Value,
506    fallback_targets: &[IdleRetireTarget],
507) -> Option<IdleRetireTarget> {
508    if let Some(target) = target_from_value(value, None) {
509        return Some(target);
510    }
511    let member_id = member_identity_field(value)?;
512    let mut matches = fallback_targets
513        .iter()
514        .filter(|target| target.member_id == member_id);
515    let target = matches.next()?;
516    if matches.next().is_some() {
517        return None;
518    }
519    Some(target.clone())
520}
521
522fn collect_idle_retire_result_targets(
523    value: &Value,
524    fallback_targets: &[IdleRetireTarget],
525    targets: &mut BTreeSet<IdleRetireTarget>,
526) {
527    if let Some(target) = target_from_result_value(value, fallback_targets) {
528        targets.insert(target);
529    }
530    for key in ["members", "specs", "spawned", "results"] {
531        let Some(values) = value.get(key).and_then(Value::as_array) else {
532            continue;
533        };
534        for value in values {
535            collect_idle_retire_result_targets(value, fallback_targets, targets);
536        }
537    }
538}
539
540fn idle_retire_targets_from_outcome_text(
541    text: &str,
542    fallback_targets: &[IdleRetireTarget],
543) -> Vec<IdleRetireTarget> {
544    let Ok(payload) = serde_json::from_str::<Value>(text) else {
545        return fallback_targets.to_vec();
546    };
547    let mut targets = BTreeSet::new();
548    collect_idle_retire_result_targets(&payload, fallback_targets, &mut targets);
549    if targets.is_empty() {
550        targets.extend(fallback_targets.iter().cloned());
551    }
552    targets.into_iter().collect()
553}
554
555struct DefinitionSeededRealmProfileStore {
556    inner: Arc<dyn meerkat_mob::RealmProfileStore>,
557    profiles: BTreeMap<String, Profile>,
558}
559
560impl DefinitionSeededRealmProfileStore {
561    fn new(
562        definition: &MobDefinition,
563        inner: Arc<dyn meerkat_mob::RealmProfileStore>,
564    ) -> Option<Self> {
565        let profiles = definition
566            .profiles
567            .iter()
568            .filter_map(|(name, binding)| {
569                binding
570                    .as_inline()
571                    .cloned()
572                    .map(|profile| (name.to_string(), profile))
573            })
574            .collect::<BTreeMap<_, _>>();
575
576        (!profiles.is_empty()).then_some(Self { inner, profiles })
577    }
578
579    fn stored(&self, name: &str, profile: &Profile) -> meerkat_mob::StoredRealmProfile {
580        let now = chrono::Utc::now();
581        meerkat_mob::StoredRealmProfile {
582            name: name.to_string(),
583            profile: profile.clone(),
584            revision: 0,
585            created_at: now,
586            updated_at: now,
587        }
588    }
589}
590
591#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
592#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
593impl meerkat_mob::RealmProfileStore for DefinitionSeededRealmProfileStore {
594    async fn create(
595        &self,
596        name: &str,
597        profile: &Profile,
598    ) -> Result<meerkat_mob::StoredRealmProfile, meerkat_mob::MobStoreError> {
599        if self.profiles.contains_key(name) {
600            return Err(meerkat_mob::MobStoreError::CasConflict(format!(
601                "realm profile already exists: {name}"
602            )));
603        }
604        self.inner.create(name, profile).await
605    }
606
607    async fn get(
608        &self,
609        name: &str,
610    ) -> Result<Option<meerkat_mob::StoredRealmProfile>, meerkat_mob::MobStoreError> {
611        if let Some(profile) = self.profiles.get(name) {
612            return Ok(Some(self.stored(name, profile)));
613        }
614        self.inner.get(name).await
615    }
616
617    async fn list(
618        &self,
619    ) -> Result<Vec<meerkat_mob::StoredRealmProfile>, meerkat_mob::MobStoreError> {
620        let mut merged = self.inner.list().await?;
621        merged.retain(|profile| !self.profiles.contains_key(profile.name.as_str()));
622        merged.extend(
623            self.profiles
624                .iter()
625                .map(|(name, profile)| self.stored(name, profile)),
626        );
627        merged.sort_by(|a, b| a.name.cmp(&b.name));
628        Ok(merged)
629    }
630
631    async fn update(
632        &self,
633        name: &str,
634        profile: &Profile,
635        expected_revision: u64,
636    ) -> Result<meerkat_mob::StoredRealmProfile, meerkat_mob::MobStoreError> {
637        if self.profiles.contains_key(name) {
638            return Err(meerkat_mob::MobStoreError::CasConflict(format!(
639                "realm profile '{name}' is provided by the mob definition"
640            )));
641        }
642        self.inner.update(name, profile, expected_revision).await
643    }
644
645    async fn delete(
646        &self,
647        name: &str,
648        expected_revision: u64,
649    ) -> Result<meerkat_mob::StoredRealmProfile, meerkat_mob::MobStoreError> {
650        if self.profiles.contains_key(name) {
651            return Err(meerkat_mob::MobStoreError::CasConflict(format!(
652                "realm profile '{name}' is provided by the mob definition"
653            )));
654        }
655        self.inner.delete(name, expected_revision).await
656    }
657}
658
659fn delegate_idle_retire_override_from_args(
660    tool_name: &str,
661    args: &mut Value,
662) -> Result<Option<DelegateIdleRetireOverride>, meerkat_core::ToolError> {
663    let Some(object) = args.as_object_mut() else {
664        return Ok(None);
665    };
666    let Some(value) = object.remove("idle_retire_secs") else {
667        return Ok(None);
668    };
669    if value.is_null() {
670        return Ok(Some(DelegateIdleRetireOverride::Disabled));
671    }
672    value
673        .as_u64()
674        .map(DelegateIdleRetireOverride::Seconds)
675        .map(Some)
676        .ok_or_else(|| {
677            meerkat_core::ToolError::invalid_arguments(
678                tool_name,
679                "idle_retire_secs must be a non-negative integer or null",
680            )
681        })
682}
683
684fn delegate_tool_def_with_idle_retire_secs(
685    tool: &meerkat_core::types::ToolDef,
686) -> meerkat_core::types::ToolDef {
687    let mut patched = tool.clone();
688    if !patched.description.contains("IDLE RETIREMENT:") {
689        patched.description.push_str(
690            "\n\nIDLE RETIREMENT:\n\
691             Omit idle_retire_secs to use the runtime default. Pass an integer \
692             number of seconds to override idle auto-retirement for this helper. \
693             Pass null to disable auto-retirement for this helper.",
694        );
695    }
696    if let Some(properties) = patched
697        .input_schema
698        .get_mut("properties")
699        .and_then(Value::as_object_mut)
700    {
701        properties
702            .entry("idle_retire_secs".to_string())
703            .or_insert_with(|| {
704                serde_json::json!({
705                    "description": "Override idle auto-retirement for this helper. Omit to use the runtime default, use an integer number of seconds to override, or null to disable auto-retirement for this helper.",
706                    "anyOf": [
707                        {"type": "integer", "minimum": 0},
708                        {"type": "null"}
709                    ]
710                })
711            });
712    }
713    patched
714}
715
716fn mob_spawn_tool_def_with_idle_retire_secs(
717    tool: &meerkat_core::types::ToolDef,
718) -> meerkat_core::types::ToolDef {
719    let mut patched = tool.clone();
720    if !patched.description.contains("IDLE RETIREMENT:") {
721        patched.description.push_str(
722            "\n\nIDLE RETIREMENT:\n\
723             Omit idle_retire_secs to leave this spawned member out of auto-retirement. \
724             Pass an integer number of seconds to retire the member after it has been \
725             idle for that long. Pass null to explicitly disable auto-retirement.",
726        );
727    }
728    if let Some(properties) = patched
729        .input_schema
730        .get_mut("properties")
731        .and_then(Value::as_object_mut)
732    {
733        properties
734            .entry("idle_retire_secs".to_string())
735            .or_insert_with(|| {
736                serde_json::json!({
737                    "description": "Opt this spawned member into idle auto-retirement. Omit to keep the member indefinitely, use an integer number of seconds to retire after that much idle time, or null to explicitly disable auto-retirement.",
738                    "anyOf": [
739                        {"type": "integer", "minimum": 0},
740                        {"type": "null"}
741                    ]
742                })
743            });
744    }
745    patched
746}
747
748fn install_agent_mob_tools(
749    definition: &MobDefinition,
750    slot: Arc<std::sync::RwLock<Option<Arc<dyn meerkat_core::service::MobToolsFactory>>>>,
751    session_service: Arc<dyn MobSessionService>,
752) -> (
753    Arc<meerkat_mob_mcp::MobMcpState>,
754    ImplicitDelegateRetirementOverrides,
755    SharedDefaultLlmClientSlot,
756) {
757    let default_llm_client_slot = Arc::new(std::sync::RwLock::new(None::<Arc<dyn LlmClient>>));
758    let default_llm_client_provider_slot = Arc::clone(&default_llm_client_slot);
759    let mut state = meerkat_mob_mcp::MobMcpState::new(session_service);
760    if let Some(base_store) = state.realm_profile_store().cloned()
761        && let Some(store) = DefinitionSeededRealmProfileStore::new(definition, base_store)
762    {
763        state = state.with_realm_profile_store(Some(Arc::new(store)));
764    }
765    state = state
766        .with_realm_skill_sources(definition.skills.clone())
767        .with_default_llm_client_provider(Some(Arc::new(move || {
768            default_llm_client_provider_slot
769                .read()
770                .unwrap_or_else(std::sync::PoisonError::into_inner)
771                .clone()
772        })));
773    let state = Arc::new(state);
774    let implicit_delegate_retirement_overrides = ImplicitDelegateRetirementOverrides::default();
775    let inner = Arc::new(meerkat_mob_mcp::AgentMobToolSurfaceFactory::new(
776        Arc::clone(&state),
777    ));
778    let factory = Arc::new(AutoWireParentMobToolsFactory {
779        inner,
780        implicit_delegate_retirement_overrides: implicit_delegate_retirement_overrides.clone(),
781    });
782    *slot
783        .write()
784        .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(factory);
785    (
786        state,
787        implicit_delegate_retirement_overrides,
788        default_llm_client_slot,
789    )
790}
791
792#[cfg(test)]
793#[allow(dead_code)]
794#[derive(Debug, Clone)]
795pub(crate) struct RuntimeTurnTrace {
796    pub(crate) session_id: String,
797    pub(crate) boundary: String,
798    pub(crate) contributing_input_count: usize,
799    pub(crate) outcome: String,
800}
801
802fn is_replay_unsafe_server_tool_content(name: &str, content: &Value) -> bool {
803    name == "web_search_annotations"
804        || content
805            .get("type")
806            .and_then(Value::as_str)
807            .is_some_and(|kind| kind.starts_with("response."))
808}
809
810fn sanitize_llm_request_for_stateless_replay(request: &LlmRequest) -> LlmRequest {
811    let mut sanitized = request.clone();
812    sanitized.messages = request
813        .messages
814        .iter()
815        .cloned()
816        .map(sanitize_message_for_stateless_replay)
817        .collect();
818    sanitized
819}
820
821fn sanitize_create_session_request_llm_override(req: &mut CreateSessionRequest) {
822    let Some(build) = req.build.as_mut() else {
823        return;
824    };
825    let Some(client) = build
826        .llm_client_override
827        .as_ref()
828        .and_then(meerkat::decode_llm_client_override_from_service)
829    else {
830        return;
831    };
832    build.llm_client_override = Some(meerkat::encode_llm_client_override_for_service(
833        ReplaySanitizingLlmClient::wrap(client),
834    ));
835}
836
837fn sanitize_message_for_stateless_replay(message: Message) -> Message {
838    match message {
839        Message::BlockAssistant(mut assistant) => {
840            assistant.blocks = assistant
841                .blocks
842                .into_iter()
843                .filter_map(|block| match block {
844                    AssistantBlock::ServerToolContent { name, content, .. }
845                        if is_replay_unsafe_server_tool_content(&name, &content) =>
846                    {
847                        None
848                    }
849                    other => Some(other),
850                })
851                .collect();
852            Message::BlockAssistant(assistant)
853        }
854        other => other,
855    }
856}
857
858/// Open the persistent runtime store that holds the authoritative
859/// session snapshot used by `load_persisted_session` (resume path) and
860/// `load_persisted_session_for_control` (archive/retire path). Lives at
861/// `<store_path>/runtime.sqlite` — separate file from the session
862/// store so we don't depend on the session_store's concrete type. If
863/// the SQLite open fails (rare: disk full, permissions), fall back to
864/// `InMemoryRuntimeStore` so the runtime can still bootstrap. In that
865/// degraded mode resume across restart and archive operations will
866/// fail; the warning makes the cause visible in operator logs.
867fn build_persistent_runtime_store(store_path: &Path) -> Arc<dyn meerkat_runtime::RuntimeStore> {
868    let runtime_db = store_path.join("runtime.sqlite");
869    match meerkat_runtime::store::SqliteRuntimeStore::new(&runtime_db) {
870        Ok(store) => Arc::new(store),
871        Err(err) => {
872            tracing::warn!(
873                path = %runtime_db.display(),
874                error = %err,
875                "failed to open SqliteRuntimeStore; falling back to InMemoryRuntimeStore. \
876                 Sessions will not survive process restart and archive operations may fail.",
877            );
878            Arc::new(meerkat_runtime::InMemoryRuntimeStore::new())
879        }
880    }
881}
882
883/// RuntimeStore facade for external-authoritative identity-first apps.
884///
885/// `PersistentSessionService` treats `RuntimeStore` as the authoritative
886/// session snapshot source whenever one is installed. External apps such as
887/// OB3 supply a durable `SessionStore` through `ContinuitySessionStoreAdapter`,
888/// so this bridge makes that store visible to the runtime snapshot path while
889/// delegating non-session runtime bookkeeping to the process-local store.
890struct SessionStoreBackedRuntimeStore {
891    inner: Arc<dyn meerkat_runtime::RuntimeStore>,
892}
893
894impl SessionStoreBackedRuntimeStore {
895    fn new(
896        inner: Arc<dyn meerkat_runtime::RuntimeStore>,
897        _session_store: Arc<dyn SessionStore>,
898    ) -> Self {
899        Self { inner }
900    }
901}
902
903#[async_trait]
904impl meerkat_runtime::RuntimeStore for SessionStoreBackedRuntimeStore {
905    fn auth_authority_key(&self) -> Option<String> {
906        self.inner.auth_authority_key()
907    }
908
909    fn persist_auth_oauth_flow_snapshot(
910        &self,
911        snapshot_json: &[u8],
912    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
913        self.inner.persist_auth_oauth_flow_snapshot(snapshot_json)
914    }
915
916    fn load_auth_oauth_flow_snapshot(
917        &self,
918    ) -> Result<Option<Vec<u8>>, meerkat_runtime::store::RuntimeStoreError> {
919        self.inner.load_auth_oauth_flow_snapshot()
920    }
921
922    fn update_auth_oauth_flow_snapshot(
923        &self,
924        update: &mut meerkat_runtime::store::AuthOAuthFlowSnapshotUpdate<'_>,
925    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
926        self.inner.update_auth_oauth_flow_snapshot(update)
927    }
928
929    async fn commit_session_snapshot(
930        &self,
931        runtime_id: &meerkat_runtime::LogicalRuntimeId,
932        session_delta: meerkat_runtime::store::SessionDelta,
933    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
934        self.inner
935            .commit_session_snapshot(runtime_id, session_delta)
936            .await
937    }
938
939    async fn commit_session_transcript_rewrite_snapshot(
940        &self,
941        runtime_id: &meerkat_runtime::LogicalRuntimeId,
942        session_delta: meerkat_runtime::store::SessionDelta,
943        commit: &meerkat_core::TranscriptRewriteCommit,
944    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
945        self.inner
946            .commit_session_transcript_rewrite_snapshot(runtime_id, session_delta, commit)
947            .await
948    }
949
950    async fn atomic_apply(
951        &self,
952        runtime_id: &meerkat_runtime::LogicalRuntimeId,
953        session_delta: Option<meerkat_runtime::store::SessionDelta>,
954        receipt: meerkat_core::lifecycle::RunBoundaryReceipt,
955        input_updates: Vec<StoredInputState>,
956        session_store_key: Option<meerkat_core::types::SessionId>,
957    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
958        self.inner
959            .atomic_apply(
960                runtime_id,
961                session_delta,
962                receipt,
963                input_updates,
964                session_store_key,
965            )
966            .await
967    }
968
969    async fn load_input_states(
970        &self,
971        runtime_id: &meerkat_runtime::LogicalRuntimeId,
972    ) -> Result<Vec<StoredInputState>, meerkat_runtime::store::RuntimeStoreError> {
973        self.inner.load_input_states(runtime_id).await
974    }
975
976    async fn load_boundary_receipt(
977        &self,
978        runtime_id: &meerkat_runtime::LogicalRuntimeId,
979        run_id: &meerkat_core::lifecycle::RunId,
980        sequence: u64,
981    ) -> Result<
982        Option<meerkat_core::lifecycle::RunBoundaryReceipt>,
983        meerkat_runtime::store::RuntimeStoreError,
984    > {
985        self.inner
986            .load_boundary_receipt(runtime_id, run_id, sequence)
987            .await
988    }
989
990    async fn load_session_snapshot(
991        &self,
992        runtime_id: &meerkat_runtime::LogicalRuntimeId,
993    ) -> Result<Option<Vec<u8>>, meerkat_runtime::store::RuntimeStoreError> {
994        self.inner.load_session_snapshot(runtime_id).await
995    }
996
997    async fn clear_session_snapshot(
998        &self,
999        runtime_id: &meerkat_runtime::LogicalRuntimeId,
1000    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
1001        self.inner.clear_session_snapshot(runtime_id).await
1002    }
1003
1004    async fn replace_session_snapshot_if_current(
1005        &self,
1006        runtime_id: &meerkat_runtime::LogicalRuntimeId,
1007        expected_current: &[u8],
1008        replacement: Vec<u8>,
1009    ) -> Result<bool, meerkat_runtime::store::RuntimeStoreError> {
1010        self.inner
1011            .replace_session_snapshot_if_current(runtime_id, expected_current, replacement)
1012            .await
1013    }
1014
1015    async fn clear_session_snapshot_if_current(
1016        &self,
1017        runtime_id: &meerkat_runtime::LogicalRuntimeId,
1018        expected_current: &[u8],
1019    ) -> Result<bool, meerkat_runtime::store::RuntimeStoreError> {
1020        self.inner
1021            .clear_session_snapshot_if_current(runtime_id, expected_current)
1022            .await
1023    }
1024
1025    async fn persist_input_state(
1026        &self,
1027        runtime_id: &meerkat_runtime::LogicalRuntimeId,
1028        state: &StoredInputState,
1029    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
1030        self.inner.persist_input_state(runtime_id, state).await
1031    }
1032
1033    async fn load_input_state(
1034        &self,
1035        runtime_id: &meerkat_runtime::LogicalRuntimeId,
1036        input_id: &meerkat_core::lifecycle::InputId,
1037    ) -> Result<Option<StoredInputState>, meerkat_runtime::store::RuntimeStoreError> {
1038        self.inner.load_input_state(runtime_id, input_id).await
1039    }
1040
1041    async fn load_runtime_state(
1042        &self,
1043        runtime_id: &meerkat_runtime::LogicalRuntimeId,
1044    ) -> Result<Option<meerkat_runtime::RuntimeState>, meerkat_runtime::store::RuntimeStoreError>
1045    {
1046        self.inner.load_runtime_state(runtime_id).await
1047    }
1048
1049    async fn commit_machine_lifecycle(
1050        &self,
1051        runtime_id: &meerkat_runtime::LogicalRuntimeId,
1052        commit: MachineLifecycleCommit,
1053        input_states: &[StoredInputState],
1054    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
1055        self.inner
1056            .commit_machine_lifecycle(runtime_id, commit, input_states)
1057            .await
1058    }
1059
1060    async fn persist_ops_lifecycle(
1061        &self,
1062        runtime_id: &meerkat_runtime::LogicalRuntimeId,
1063        snapshot: &meerkat_runtime::ops_lifecycle::PersistedOpsSnapshot,
1064    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
1065        self.inner.persist_ops_lifecycle(runtime_id, snapshot).await
1066    }
1067
1068    async fn load_ops_lifecycle(
1069        &self,
1070        runtime_id: &meerkat_runtime::LogicalRuntimeId,
1071    ) -> Result<
1072        Option<meerkat_runtime::ops_lifecycle::PersistedOpsSnapshot>,
1073        meerkat_runtime::store::RuntimeStoreError,
1074    > {
1075        self.inner.load_ops_lifecycle(runtime_id).await
1076    }
1077}
1078
1079#[cfg(test)]
1080static RUNTIME_TURN_TRACES: OnceLock<Mutex<Vec<RuntimeTurnTrace>>> = OnceLock::new();
1081
1082#[cfg(test)]
1083fn runtime_turn_traces() -> &'static Mutex<Vec<RuntimeTurnTrace>> {
1084    RUNTIME_TURN_TRACES.get_or_init(|| Mutex::new(Vec::new()))
1085}
1086
1087#[cfg(test)]
1088#[allow(clippy::expect_used)]
1089fn record_runtime_turn_trace(trace: RuntimeTurnTrace) {
1090    runtime_turn_traces()
1091        .lock()
1092        .expect("runtime turn traces mutex")
1093        .push(trace);
1094}
1095
1096#[cfg(test)]
1097#[allow(dead_code)]
1098#[allow(clippy::expect_used)]
1099pub(crate) fn take_runtime_turn_traces() -> Vec<RuntimeTurnTrace> {
1100    std::mem::take(
1101        &mut *runtime_turn_traces()
1102            .lock()
1103            .expect("runtime turn traces mutex"),
1104    )
1105}
1106
1107#[cfg(not(test))]
1108#[allow(dead_code)]
1109fn record_runtime_turn_trace(_trace: ()) {}
1110
1111fn runtime_turn_diagnostics_enabled() -> bool {
1112    std::env::var_os("MOBKIT_TRACE_RUNTIME_TURNS").is_some()
1113}
1114
1115fn summarize_runtime_prompt(prompt: &meerkat_core::ContentInput) -> String {
1116    match prompt {
1117        meerkat_core::ContentInput::Text(text) => {
1118            text.lines().take(6).collect::<Vec<_>>().join(" ")
1119        }
1120        meerkat_core::ContentInput::Blocks(blocks) => blocks
1121            .iter()
1122            .map(|block| block.text_projection().to_string())
1123            .collect::<Vec<_>>()
1124            .join(" ")
1125            .lines()
1126            .take(6)
1127            .collect::<Vec<_>>()
1128            .join(" "),
1129    }
1130}
1131
1132/// Whether the session factory should wire the image-generation substrate for
1133/// this definition. Meerkat owns the per-profile visibility decision via
1134/// `profile.tools.image_generation`; MobKit only needs to make the runtime
1135/// machine available when a profile opts in, or when a realm profile may resolve
1136/// to an opt-in profile at spawn time.
1137pub fn mob_definition_may_use_image_generation(definition: &MobDefinition) -> bool {
1138    definition.profiles.values().any(|binding| {
1139        binding
1140            .as_inline()
1141            .is_none_or(|profile| profile.tools.image_generation)
1142    })
1143}
1144
1145fn normalize_runtime_turn_request(
1146    mut req: meerkat_core::service::StartTurnRequest,
1147) -> meerkat_core::service::StartTurnRequest {
1148    // Queue/Steer and render metadata are runtime-owned semantics. By the
1149    // time apply_runtime_turn() invokes the session service, the runtime
1150    // has already chosen the boundary and recorded the metadata it needs.
1151    // The direct agent/session path is queue-only, so forward a normalized
1152    // turn request to avoid re-injecting runtime-only semantics.
1153    req.runtime.handling_mode = meerkat_core::types::HandlingMode::Queue;
1154    req.runtime.render_metadata = None;
1155    req
1156}
1157
1158/// Implement all `MobSessionService` super-traits by delegating to `self.inner`,
1159/// overriding only `create_session` to apply the pre-build hook.
1160macro_rules! delegate_mob_session_service {
1161    ($wrapper:ty) => {
1162        #[async_trait]
1163        impl meerkat_core::service::SessionService for $wrapper {
1164            async fn create_session(
1165                &self,
1166                mut req: CreateSessionRequest,
1167            ) -> Result<meerkat_core::types::RunResult, SessionError> {
1168                (self.hook)(&mut req).await?;
1169                sanitize_create_session_request_llm_override(&mut req);
1170
1171                // Capture context before create_session consumes the request.
1172                let ctx = SessionCreatedContext {
1173                    model: req.model.clone(),
1174                    labels: req.labels.clone().unwrap_or_default(),
1175                    system_prompt: req.system_prompt.clone(),
1176                };
1177                let result = self.inner.create_session(req).await?;
1178
1179                // Best-effort after_create — errors logged, not propagated.
1180                if let Some(ref after_hook) = self.after_create_hook {
1181                    after_hook(result.session_id.clone(), ctx).await;
1182                }
1183
1184                Ok(result)
1185            }
1186            async fn start_turn(
1187                &self,
1188                id: &meerkat_core::types::SessionId,
1189                req: meerkat_core::service::StartTurnRequest,
1190            ) -> Result<meerkat_core::types::RunResult, SessionError> {
1191                self.inner.start_turn(id, req).await
1192            }
1193            async fn interrupt(
1194                &self,
1195                id: &meerkat_core::types::SessionId,
1196            ) -> Result<(), SessionError> {
1197                self.inner.interrupt(id).await
1198            }
1199            async fn cancel_after_boundary(
1200                &self,
1201                id: &meerkat_core::types::SessionId,
1202            ) -> Result<(), SessionError> {
1203                self.inner.cancel_after_boundary(id).await
1204            }
1205            async fn set_session_client(
1206                &self,
1207                id: &meerkat_core::types::SessionId,
1208                client: Arc<dyn meerkat_core::AgentLlmClient>,
1209            ) -> Result<(), SessionError> {
1210                self.inner
1211                    .set_session_client(id, ReplaySanitizingAgentLlmClient::wrap(client))
1212                    .await
1213            }
1214            async fn hot_swap_session_llm_identity(
1215                &self,
1216                id: &meerkat_core::types::SessionId,
1217                client: Arc<dyn meerkat_core::AgentLlmClient>,
1218                identity: meerkat_core::session::SessionLlmIdentity,
1219                request_policy: meerkat_core::SessionLlmRequestPolicy,
1220            ) -> Result<(), SessionError> {
1221                self.inner
1222                    .hot_swap_session_llm_identity(
1223                        id,
1224                        ReplaySanitizingAgentLlmClient::wrap(client),
1225                        identity,
1226                        request_policy,
1227                    )
1228                    .await
1229            }
1230            async fn update_session_keep_alive(
1231                &self,
1232                id: &meerkat_core::types::SessionId,
1233                keep_alive: bool,
1234            ) -> Result<(), SessionError> {
1235                self.inner.update_session_keep_alive(id, keep_alive).await
1236            }
1237            async fn update_session_mob_authority_context(
1238                &self,
1239                id: &meerkat_core::types::SessionId,
1240                authority_context: Option<meerkat_core::service::MobToolAuthorityContext>,
1241            ) -> Result<(), SessionError> {
1242                self.inner
1243                    .update_session_mob_authority_context(id, authority_context)
1244                    .await
1245            }
1246            async fn has_live_session(
1247                &self,
1248                id: &meerkat_core::types::SessionId,
1249            ) -> Result<bool, SessionError> {
1250                self.inner.has_live_session(id).await
1251            }
1252            async fn set_session_tool_visibility_state(
1253                &self,
1254                id: &meerkat_core::types::SessionId,
1255                state: Option<meerkat_core::SessionToolVisibilityState>,
1256            ) -> Result<(), SessionError> {
1257                self.inner
1258                    .set_session_tool_visibility_state(id, state)
1259                    .await
1260            }
1261            async fn set_session_tool_filter(
1262                &self,
1263                id: &meerkat_core::types::SessionId,
1264                filter: meerkat_core::ToolFilter,
1265            ) -> Result<(), SessionError> {
1266                self.inner.set_session_tool_filter(id, filter).await
1267            }
1268            async fn read(
1269                &self,
1270                id: &meerkat_core::types::SessionId,
1271            ) -> Result<meerkat_core::service::SessionView, SessionError> {
1272                self.inner.read(id).await
1273            }
1274            async fn list(
1275                &self,
1276                query: meerkat_core::service::SessionQuery,
1277            ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
1278                self.inner.list(query).await
1279            }
1280            async fn archive(
1281                &self,
1282                id: &meerkat_core::types::SessionId,
1283            ) -> Result<(), SessionError> {
1284                self.inner.archive(id).await
1285            }
1286            async fn subscribe_session_events(
1287                &self,
1288                id: &meerkat_core::types::SessionId,
1289            ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1290                meerkat_core::service::SessionService::subscribe_session_events(
1291                    self.inner.as_ref(),
1292                    id,
1293                )
1294                .await
1295            }
1296        }
1297
1298        #[async_trait]
1299        impl meerkat_core::service::SessionServiceCommsExt for $wrapper {
1300            async fn comms_runtime(
1301                &self,
1302                id: &meerkat_core::types::SessionId,
1303            ) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
1304                self.inner.comms_runtime(id).await
1305            }
1306
1307            async fn event_injector(
1308                &self,
1309                id: &meerkat_core::types::SessionId,
1310            ) -> Option<Arc<dyn meerkat_core::EventInjector>> {
1311                self.inner.event_injector(id).await
1312            }
1313
1314            async fn interaction_event_injector(
1315                &self,
1316                id: &meerkat_core::types::SessionId,
1317            ) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
1318                self.inner.interaction_event_injector(id).await
1319            }
1320        }
1321
1322        #[async_trait]
1323        impl meerkat_core::service::SessionServiceControlExt for $wrapper {
1324            async fn append_system_context(
1325                &self,
1326                id: &meerkat_core::types::SessionId,
1327                req: meerkat_core::service::AppendSystemContextRequest,
1328            ) -> Result<
1329                meerkat_core::service::AppendSystemContextResult,
1330                meerkat_core::service::SessionControlError,
1331            > {
1332                self.inner.append_system_context(id, req).await
1333            }
1334            async fn stage_tool_results(
1335                &self,
1336                id: &meerkat_core::types::SessionId,
1337                req: meerkat_core::service::StageToolResultsRequest,
1338            ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
1339                self.inner.stage_tool_results(id, req).await
1340            }
1341        }
1342
1343        #[async_trait]
1344        impl meerkat_core::service::SessionServiceHistoryExt for $wrapper {
1345            async fn read_history(
1346                &self,
1347                id: &meerkat_core::types::SessionId,
1348                query: meerkat_core::service::SessionHistoryQuery,
1349            ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
1350                self.inner.read_history(id, query).await
1351            }
1352        }
1353
1354        #[async_trait]
1355        impl MobSessionService for $wrapper {
1356            fn supports_persistent_sessions(&self) -> bool {
1357                self.inner.supports_persistent_sessions()
1358            }
1359            fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
1360                self.runtime_adapter_override
1361                    .clone()
1362                    .or_else(|| self.inner.runtime_adapter())
1363            }
1364            async fn interrupt_with_machine_authority(
1365                &self,
1366                session_id: &meerkat_core::types::SessionId,
1367                authority: meerkat_runtime::MachineSessionControlAuthority,
1368            ) -> Result<(), SessionError> {
1369                self.inner
1370                    .interrupt_with_machine_authority(session_id, authority)
1371                    .await
1372            }
1373            async fn cancel_after_boundary_with_machine_authority(
1374                &self,
1375                session_id: &meerkat_core::types::SessionId,
1376                authority: meerkat_runtime::MachineSessionControlAuthority,
1377            ) -> Result<(), SessionError> {
1378                self.inner
1379                    .cancel_after_boundary_with_machine_authority(session_id, authority)
1380                    .await
1381            }
1382            async fn session_belongs_to_mob(
1383                &self,
1384                session_id: &meerkat_core::types::SessionId,
1385                mob_id: &meerkat_mob::MobId,
1386            ) -> bool {
1387                self.inner.session_belongs_to_mob(session_id, mob_id).await
1388            }
1389            async fn load_persisted_session(
1390                &self,
1391                session_id: &meerkat_core::types::SessionId,
1392            ) -> Result<Option<meerkat_core::session::Session>, SessionError> {
1393                self.inner.load_persisted_session(session_id).await
1394            }
1395            async fn subscribe_session_events(
1396                &self,
1397                session_id: &meerkat_core::types::SessionId,
1398            ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1399                meerkat_mob::MobSessionService::subscribe_session_events(
1400                    self.inner.as_ref(),
1401                    session_id,
1402                )
1403                .await
1404            }
1405            async fn archive_with_mob_lifecycle_authority(
1406                &self,
1407                session_id: &meerkat_core::types::SessionId,
1408            ) -> Result<(), SessionError> {
1409                self.inner
1410                    .archive_with_mob_lifecycle_authority(session_id)
1411                    .await
1412            }
1413            async fn execution_snapshot(
1414                &self,
1415                session_id: &meerkat_core::types::SessionId,
1416            ) -> Result<Option<meerkat_core::agent::AgentExecutionSnapshot>, SessionError> {
1417                self.inner.execution_snapshot(session_id).await
1418            }
1419            async fn tool_scope_snapshot(
1420                &self,
1421                session_id: &meerkat_core::types::SessionId,
1422            ) -> Result<Option<meerkat_core::ToolScopeSnapshot>, SessionError> {
1423                self.inner.tool_scope_snapshot(session_id).await
1424            }
1425            async fn external_tool_surface_snapshot(
1426                &self,
1427                session_id: &meerkat_core::types::SessionId,
1428            ) -> Result<Option<meerkat_core::ExternalToolSurfaceSnapshot>, SessionError> {
1429                self.inner.external_tool_surface_snapshot(session_id).await
1430            }
1431            async fn peer_ingress_runtime_snapshot(
1432                &self,
1433                session_id: &meerkat_core::types::SessionId,
1434            ) -> Result<Option<meerkat_core::PeerIngressRuntimeSnapshot>, SessionError> {
1435                self.inner.peer_ingress_runtime_snapshot(session_id).await
1436            }
1437            async fn apply_runtime_turn(
1438                &self,
1439                session_id: &meerkat_core::types::SessionId,
1440                run_id: meerkat_core::lifecycle::RunId,
1441                req: meerkat_core::service::StartTurnRequest,
1442                boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1443                contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1444            ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1445                #[cfg(test)]
1446                let boundary_name = format!("{boundary:?}");
1447                #[cfg(test)]
1448                let contributing_count = contributing_input_ids.len();
1449                let run_id_for_log = run_id.to_string();
1450                let prompt_summary = if runtime_turn_diagnostics_enabled() {
1451                    Some(summarize_runtime_prompt(&req.prompt))
1452                } else {
1453                    None
1454                };
1455                if let Some(summary) = prompt_summary.as_ref() {
1456                    tracing::warn!(
1457                        session_id = %session_id,
1458                        run_id = %run_id_for_log,
1459                        boundary = ?boundary,
1460                        contributing_inputs = contributing_input_ids.len(),
1461                        prompt = %summary,
1462                        runtime = ?req.runtime,
1463                        "mobkit runtime turn start"
1464                    );
1465                }
1466                let result = self
1467                    .inner
1468                    .apply_runtime_turn(
1469                        session_id,
1470                        run_id,
1471                        normalize_runtime_turn_request(req),
1472                        boundary,
1473                        contributing_input_ids,
1474                    )
1475                    .await;
1476                #[cfg(test)]
1477                record_runtime_turn_trace(RuntimeTurnTrace {
1478                    session_id: session_id.to_string(),
1479                    boundary: boundary_name,
1480                    contributing_input_count: contributing_count,
1481                    outcome: match &result {
1482                        Ok(_) => "ok".to_string(),
1483                        Err(error) => format!("err:{error}"),
1484                    },
1485                });
1486                if runtime_turn_diagnostics_enabled() {
1487                    match &result {
1488                        Ok(_) => tracing::warn!(
1489                            session_id = %session_id,
1490                            run_id = %run_id_for_log,
1491                            "mobkit runtime turn ok"
1492                        ),
1493                        Err(error) => tracing::error!(
1494                            session_id = %session_id,
1495                            run_id = %run_id_for_log,
1496                            error = %error,
1497                            error_debug = ?error,
1498                            "mobkit runtime turn error"
1499                        ),
1500                    }
1501                }
1502                result
1503            }
1504            async fn apply_runtime_context_appends(
1505                &self,
1506                session_id: &meerkat_core::types::SessionId,
1507                run_id: meerkat_core::lifecycle::RunId,
1508                appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1509                contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1510            ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1511                self.inner
1512                    .apply_runtime_context_appends(
1513                        session_id,
1514                        run_id,
1515                        appends,
1516                        contributing_input_ids,
1517                    )
1518                    .await
1519            }
1520            async fn apply_runtime_context_appends_with_boundary(
1521                &self,
1522                session_id: &meerkat_core::types::SessionId,
1523                run_id: meerkat_core::lifecycle::RunId,
1524                appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1525                boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1526                contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1527            ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1528                self.inner
1529                    .apply_runtime_context_appends_with_boundary(
1530                        session_id,
1531                        run_id,
1532                        appends,
1533                        boundary,
1534                        contributing_input_ids,
1535                    )
1536                    .await
1537            }
1538            async fn apply_runtime_system_context_for_turn(
1539                &self,
1540                session_id: &meerkat_core::types::SessionId,
1541                appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1542            ) -> Result<(), SessionError> {
1543                self.inner
1544                    .apply_runtime_system_context_for_turn(session_id, appends)
1545                    .await
1546            }
1547            async fn stage_runtime_system_context_for_active_turn(
1548                &self,
1549                session_id: &meerkat_core::types::SessionId,
1550                expected_run_id: &meerkat_core::lifecycle::RunId,
1551                appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1552            ) -> Result<Option<Vec<u8>>, SessionError> {
1553                self.inner
1554                    .stage_runtime_system_context_for_active_turn(
1555                        session_id,
1556                        expected_run_id,
1557                        appends,
1558                    )
1559                    .await
1560            }
1561            async fn discard_runtime_system_context_for_active_turn(
1562                &self,
1563                session_id: &meerkat_core::types::SessionId,
1564                expected_run_id: &meerkat_core::lifecycle::RunId,
1565                idempotency_keys: Vec<String>,
1566            ) -> Result<(), SessionError> {
1567                self.inner
1568                    .discard_runtime_system_context_for_active_turn(
1569                        session_id,
1570                        expected_run_id,
1571                        idempotency_keys,
1572                    )
1573                    .await
1574            }
1575            async fn active_turn_system_context_boundary_available(
1576                &self,
1577                session_id: &meerkat_core::types::SessionId,
1578            ) -> Result<Option<bool>, SessionError> {
1579                self.inner
1580                    .active_turn_system_context_boundary_available(session_id)
1581                    .await
1582            }
1583            async fn discard_live_session(
1584                &self,
1585                session_id: &meerkat_core::types::SessionId,
1586            ) -> Result<(), SessionError> {
1587                self.inner.discard_live_session(session_id).await
1588            }
1589            async fn checkpoint_committed_runtime_session_snapshot(
1590                &self,
1591                session_id: &meerkat_core::types::SessionId,
1592                session_snapshot: &[u8],
1593            ) -> Result<(), SessionError> {
1594                self.inner
1595                    .checkpoint_committed_runtime_session_snapshot(session_id, session_snapshot)
1596                    .await
1597            }
1598            async fn cancel_all_checkpointers(&self) {
1599                self.inner.cancel_all_checkpointers().await;
1600            }
1601            async fn rearm_all_checkpointers(&self) {
1602                self.inner.rearm_all_checkpointers().await;
1603            }
1604        }
1605    };
1606}
1607
1608delegate_mob_session_service!(PreBuildMobSessionService);
1609
1610/// Wraps a `MobSessionService` to fire an `AfterCreateHook` after each
1611/// successful `create_session`. Unlike `PreBuildMobSessionService`, this
1612/// captures context **after** the inner service (including any pre-build hooks)
1613/// has finished, so the context reflects all mutations.
1614struct AfterCreateMobSessionService {
1615    inner: Arc<dyn MobSessionService>,
1616    after_hook: AfterCreateHook,
1617}
1618
1619#[async_trait]
1620impl meerkat_core::service::SessionService for AfterCreateMobSessionService {
1621    async fn create_session(
1622        &self,
1623        mut req: CreateSessionRequest,
1624    ) -> Result<meerkat_core::types::RunResult, SessionError> {
1625        sanitize_create_session_request_llm_override(&mut req);
1626        // Capture pre-create context from the request (before inner consumes it).
1627        // The inner service's pre-build hooks may mutate the request further,
1628        // but we capture here because we can't read the request after inner
1629        // consumes it. The pre-build hook runs inside inner.create_session.
1630        //
1631        // For accurate post-mutation context, we re-read from the request
1632        // that was already mutated by any outer hooks, and accept that inner
1633        // hooks are not visible here. This is the correct trade-off: the
1634        // after_create context matches the request as seen by this layer.
1635        let ctx = SessionCreatedContext {
1636            model: req.model.clone(),
1637            labels: req.labels.clone().unwrap_or_default(),
1638            system_prompt: req.system_prompt.clone(),
1639        };
1640        let result = self.inner.create_session(req).await?;
1641        (self.after_hook)(result.session_id.clone(), ctx).await;
1642        Ok(result)
1643    }
1644    async fn start_turn(
1645        &self,
1646        id: &meerkat_core::types::SessionId,
1647        req: meerkat_core::service::StartTurnRequest,
1648    ) -> Result<meerkat_core::types::RunResult, SessionError> {
1649        self.inner.start_turn(id, req).await
1650    }
1651    async fn interrupt(&self, id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
1652        self.inner.interrupt(id).await
1653    }
1654    async fn cancel_after_boundary(
1655        &self,
1656        id: &meerkat_core::types::SessionId,
1657    ) -> Result<(), SessionError> {
1658        self.inner.cancel_after_boundary(id).await
1659    }
1660    async fn set_session_client(
1661        &self,
1662        id: &meerkat_core::types::SessionId,
1663        client: Arc<dyn meerkat_core::AgentLlmClient>,
1664    ) -> Result<(), SessionError> {
1665        self.inner
1666            .set_session_client(id, ReplaySanitizingAgentLlmClient::wrap(client))
1667            .await
1668    }
1669    async fn hot_swap_session_llm_identity(
1670        &self,
1671        id: &meerkat_core::types::SessionId,
1672        client: Arc<dyn meerkat_core::AgentLlmClient>,
1673        identity: meerkat_core::session::SessionLlmIdentity,
1674        request_policy: meerkat_core::SessionLlmRequestPolicy,
1675    ) -> Result<(), SessionError> {
1676        self.inner
1677            .hot_swap_session_llm_identity(
1678                id,
1679                ReplaySanitizingAgentLlmClient::wrap(client),
1680                identity,
1681                request_policy,
1682            )
1683            .await
1684    }
1685    async fn update_session_keep_alive(
1686        &self,
1687        id: &meerkat_core::types::SessionId,
1688        keep_alive: bool,
1689    ) -> Result<(), SessionError> {
1690        self.inner.update_session_keep_alive(id, keep_alive).await
1691    }
1692    async fn update_session_mob_authority_context(
1693        &self,
1694        id: &meerkat_core::types::SessionId,
1695        authority_context: Option<meerkat_core::service::MobToolAuthorityContext>,
1696    ) -> Result<(), SessionError> {
1697        self.inner
1698            .update_session_mob_authority_context(id, authority_context)
1699            .await
1700    }
1701    async fn has_live_session(
1702        &self,
1703        id: &meerkat_core::types::SessionId,
1704    ) -> Result<bool, SessionError> {
1705        self.inner.has_live_session(id).await
1706    }
1707    async fn set_session_tool_visibility_state(
1708        &self,
1709        id: &meerkat_core::types::SessionId,
1710        state: Option<meerkat_core::SessionToolVisibilityState>,
1711    ) -> Result<(), SessionError> {
1712        self.inner
1713            .set_session_tool_visibility_state(id, state)
1714            .await
1715    }
1716    async fn set_session_tool_filter(
1717        &self,
1718        id: &meerkat_core::types::SessionId,
1719        filter: meerkat_core::ToolFilter,
1720    ) -> Result<(), SessionError> {
1721        self.inner.set_session_tool_filter(id, filter).await
1722    }
1723    async fn read(
1724        &self,
1725        id: &meerkat_core::types::SessionId,
1726    ) -> Result<meerkat_core::service::SessionView, SessionError> {
1727        self.inner.read(id).await
1728    }
1729    async fn list(
1730        &self,
1731        query: meerkat_core::service::SessionQuery,
1732    ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
1733        self.inner.list(query).await
1734    }
1735    async fn archive(&self, id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
1736        self.inner.archive(id).await
1737    }
1738    async fn subscribe_session_events(
1739        &self,
1740        id: &meerkat_core::types::SessionId,
1741    ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1742        meerkat_core::service::SessionService::subscribe_session_events(self.inner.as_ref(), id)
1743            .await
1744    }
1745}
1746
1747#[async_trait]
1748impl meerkat_core::service::SessionServiceCommsExt for AfterCreateMobSessionService {
1749    async fn comms_runtime(
1750        &self,
1751        id: &meerkat_core::types::SessionId,
1752    ) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
1753        self.inner.comms_runtime(id).await
1754    }
1755
1756    async fn event_injector(
1757        &self,
1758        id: &meerkat_core::types::SessionId,
1759    ) -> Option<Arc<dyn meerkat_core::EventInjector>> {
1760        self.inner.event_injector(id).await
1761    }
1762
1763    async fn interaction_event_injector(
1764        &self,
1765        id: &meerkat_core::types::SessionId,
1766    ) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
1767        self.inner.interaction_event_injector(id).await
1768    }
1769}
1770
1771#[async_trait]
1772impl meerkat_core::service::SessionServiceControlExt for AfterCreateMobSessionService {
1773    async fn append_system_context(
1774        &self,
1775        id: &meerkat_core::types::SessionId,
1776        req: meerkat_core::service::AppendSystemContextRequest,
1777    ) -> Result<
1778        meerkat_core::service::AppendSystemContextResult,
1779        meerkat_core::service::SessionControlError,
1780    > {
1781        self.inner.append_system_context(id, req).await
1782    }
1783    async fn stage_tool_results(
1784        &self,
1785        id: &meerkat_core::types::SessionId,
1786        req: meerkat_core::service::StageToolResultsRequest,
1787    ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
1788        self.inner.stage_tool_results(id, req).await
1789    }
1790}
1791
1792#[async_trait]
1793impl meerkat_core::service::SessionServiceHistoryExt for AfterCreateMobSessionService {
1794    async fn read_history(
1795        &self,
1796        id: &meerkat_core::types::SessionId,
1797        query: meerkat_core::service::SessionHistoryQuery,
1798    ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
1799        self.inner.read_history(id, query).await
1800    }
1801}
1802
1803#[async_trait]
1804impl MobSessionService for AfterCreateMobSessionService {
1805    fn supports_persistent_sessions(&self) -> bool {
1806        self.inner.supports_persistent_sessions()
1807    }
1808    fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
1809        self.inner.runtime_adapter()
1810    }
1811    async fn interrupt_with_machine_authority(
1812        &self,
1813        session_id: &meerkat_core::types::SessionId,
1814        authority: meerkat_runtime::MachineSessionControlAuthority,
1815    ) -> Result<(), SessionError> {
1816        self.inner
1817            .interrupt_with_machine_authority(session_id, authority)
1818            .await
1819    }
1820    async fn cancel_after_boundary_with_machine_authority(
1821        &self,
1822        session_id: &meerkat_core::types::SessionId,
1823        authority: meerkat_runtime::MachineSessionControlAuthority,
1824    ) -> Result<(), SessionError> {
1825        self.inner
1826            .cancel_after_boundary_with_machine_authority(session_id, authority)
1827            .await
1828    }
1829    async fn session_belongs_to_mob(
1830        &self,
1831        session_id: &meerkat_core::types::SessionId,
1832        mob_id: &meerkat_mob::MobId,
1833    ) -> bool {
1834        self.inner.session_belongs_to_mob(session_id, mob_id).await
1835    }
1836    async fn load_persisted_session(
1837        &self,
1838        session_id: &meerkat_core::types::SessionId,
1839    ) -> Result<Option<meerkat_core::session::Session>, SessionError> {
1840        self.inner.load_persisted_session(session_id).await
1841    }
1842    async fn subscribe_session_events(
1843        &self,
1844        session_id: &meerkat_core::types::SessionId,
1845    ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1846        meerkat_mob::MobSessionService::subscribe_session_events(self.inner.as_ref(), session_id)
1847            .await
1848    }
1849    async fn archive_with_mob_lifecycle_authority(
1850        &self,
1851        session_id: &meerkat_core::types::SessionId,
1852    ) -> Result<(), SessionError> {
1853        self.inner
1854            .archive_with_mob_lifecycle_authority(session_id)
1855            .await
1856    }
1857    async fn execution_snapshot(
1858        &self,
1859        session_id: &meerkat_core::types::SessionId,
1860    ) -> Result<Option<meerkat_core::agent::AgentExecutionSnapshot>, SessionError> {
1861        self.inner.execution_snapshot(session_id).await
1862    }
1863    async fn tool_scope_snapshot(
1864        &self,
1865        session_id: &meerkat_core::types::SessionId,
1866    ) -> Result<Option<meerkat_core::ToolScopeSnapshot>, SessionError> {
1867        self.inner.tool_scope_snapshot(session_id).await
1868    }
1869    async fn external_tool_surface_snapshot(
1870        &self,
1871        session_id: &meerkat_core::types::SessionId,
1872    ) -> Result<Option<meerkat_core::ExternalToolSurfaceSnapshot>, SessionError> {
1873        self.inner.external_tool_surface_snapshot(session_id).await
1874    }
1875    async fn peer_ingress_runtime_snapshot(
1876        &self,
1877        session_id: &meerkat_core::types::SessionId,
1878    ) -> Result<Option<meerkat_core::PeerIngressRuntimeSnapshot>, SessionError> {
1879        self.inner.peer_ingress_runtime_snapshot(session_id).await
1880    }
1881    async fn apply_runtime_turn(
1882        &self,
1883        session_id: &meerkat_core::types::SessionId,
1884        run_id: meerkat_core::lifecycle::RunId,
1885        req: meerkat_core::service::StartTurnRequest,
1886        boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1887        contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1888    ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1889        self.inner
1890            .apply_runtime_turn(session_id, run_id, req, boundary, contributing_input_ids)
1891            .await
1892    }
1893    async fn apply_runtime_context_appends(
1894        &self,
1895        session_id: &meerkat_core::types::SessionId,
1896        run_id: meerkat_core::lifecycle::RunId,
1897        appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1898        contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1899    ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1900        self.inner
1901            .apply_runtime_context_appends(session_id, run_id, appends, contributing_input_ids)
1902            .await
1903    }
1904    async fn apply_runtime_context_appends_with_boundary(
1905        &self,
1906        session_id: &meerkat_core::types::SessionId,
1907        run_id: meerkat_core::lifecycle::RunId,
1908        appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1909        boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1910        contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1911    ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1912        self.inner
1913            .apply_runtime_context_appends_with_boundary(
1914                session_id,
1915                run_id,
1916                appends,
1917                boundary,
1918                contributing_input_ids,
1919            )
1920            .await
1921    }
1922    async fn apply_runtime_system_context_for_turn(
1923        &self,
1924        session_id: &meerkat_core::types::SessionId,
1925        appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1926    ) -> Result<(), SessionError> {
1927        self.inner
1928            .apply_runtime_system_context_for_turn(session_id, appends)
1929            .await
1930    }
1931    async fn stage_runtime_system_context_for_active_turn(
1932        &self,
1933        session_id: &meerkat_core::types::SessionId,
1934        expected_run_id: &meerkat_core::lifecycle::RunId,
1935        appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1936    ) -> Result<Option<Vec<u8>>, SessionError> {
1937        self.inner
1938            .stage_runtime_system_context_for_active_turn(session_id, expected_run_id, appends)
1939            .await
1940    }
1941    async fn discard_runtime_system_context_for_active_turn(
1942        &self,
1943        session_id: &meerkat_core::types::SessionId,
1944        expected_run_id: &meerkat_core::lifecycle::RunId,
1945        idempotency_keys: Vec<String>,
1946    ) -> Result<(), SessionError> {
1947        self.inner
1948            .discard_runtime_system_context_for_active_turn(
1949                session_id,
1950                expected_run_id,
1951                idempotency_keys,
1952            )
1953            .await
1954    }
1955    async fn active_turn_system_context_boundary_available(
1956        &self,
1957        session_id: &meerkat_core::types::SessionId,
1958    ) -> Result<Option<bool>, SessionError> {
1959        self.inner
1960            .active_turn_system_context_boundary_available(session_id)
1961            .await
1962    }
1963    async fn discard_live_session(
1964        &self,
1965        session_id: &meerkat_core::types::SessionId,
1966    ) -> Result<(), SessionError> {
1967        self.inner.discard_live_session(session_id).await
1968    }
1969    async fn checkpoint_committed_runtime_session_snapshot(
1970        &self,
1971        session_id: &meerkat_core::types::SessionId,
1972        session_snapshot: &[u8],
1973    ) -> Result<(), SessionError> {
1974        self.inner
1975            .checkpoint_committed_runtime_session_snapshot(session_id, session_snapshot)
1976            .await
1977    }
1978    async fn cancel_all_checkpointers(&self) {
1979        self.inner.cancel_all_checkpointers().await;
1980    }
1981    async fn rearm_all_checkpointers(&self) {
1982        self.inner.rearm_all_checkpointers().await;
1983    }
1984}
1985
1986/// Specification for bootstrapping a mob runtime from a definition, storage, and session service.
1987pub struct MobBootstrapSpec {
1988    pub definition: MobDefinition,
1989    pub storage: MobStorage,
1990    pub session_service: Arc<dyn MobSessionService>,
1991    pub binary_blob_store: Option<Arc<dyn BinaryBlobStore>>,
1992    pub(crate) agent_mob_mcp_state: Option<Arc<meerkat_mob_mcp::MobMcpState>>,
1993    pub(crate) implicit_delegate_retirement_overrides: Option<ImplicitDelegateRetirementOverrides>,
1994    pub(crate) agent_mob_default_llm_client_slot: Option<SharedDefaultLlmClientSlot>,
1995    pub options: MobBootstrapOptions,
1996    /// Explicit runtime adapter — bypasses `session_service.runtime_adapter()`.
1997    ///
1998    /// Used by `persistent()` to supply the adapter directly so the session
1999    /// service's `runtime_store` can stay `None` (keeping the checkpointer
2000    /// enabled). See meerkat-session#checkpointer-enabled-flag.
2001    pub runtime_adapter: Option<Arc<meerkat_runtime::MeerkatMachine>>,
2002    /// Holds the ephemeral temp directory alive for the lifetime of the spec.
2003    /// Only populated when the builder creates an ephemeral runtime.
2004    pub(crate) _ephemeral_dir: Option<Arc<tempfile::TempDir>>,
2005}
2006
2007impl MobBootstrapSpec {
2008    pub fn new(
2009        definition: MobDefinition,
2010        storage: MobStorage,
2011        session_service: Arc<dyn MobSessionService>,
2012    ) -> Self {
2013        let session_service = Arc::new(PreBuildMobSessionService {
2014            inner: session_service,
2015            hook: no_op_pre_build_hook(),
2016            after_create_hook: None,
2017            runtime_adapter_override: None,
2018        }) as Arc<dyn MobSessionService>;
2019        Self {
2020            definition,
2021            storage,
2022            session_service,
2023            binary_blob_store: None,
2024            agent_mob_mcp_state: None,
2025            implicit_delegate_retirement_overrides: None,
2026            agent_mob_default_llm_client_slot: None,
2027            options: MobBootstrapOptions {
2028                allow_ephemeral_sessions: true,
2029                notify_orchestrator_on_resume: true,
2030                default_llm_client: None,
2031            },
2032            runtime_adapter: None,
2033            _ephemeral_dir: None,
2034        }
2035    }
2036
2037    pub fn with_options(mut self, options: MobBootstrapOptions) -> Self {
2038        self.options = options;
2039        self
2040    }
2041
2042    /// Expose a runtime adapter through the session-service facade.
2043    ///
2044    /// Custom embedders that construct their own `MobSessionService` still need
2045    /// MobKit's session-service surface to report the same runtime authority
2046    /// that `MobBuilder::with_runtime_adapter(...)` receives. This keeps
2047    /// autonomous-host comms, runtime inspection, and control paths pointed at
2048    /// one machine without forcing embedders through the stock factory helpers.
2049    pub fn with_session_runtime_adapter(
2050        mut self,
2051        adapter: Arc<meerkat_runtime::MeerkatMachine>,
2052    ) -> Self {
2053        self.session_service = Arc::new(PreBuildMobSessionService {
2054            inner: self.session_service,
2055            hook: no_op_pre_build_hook(),
2056            after_create_hook: None,
2057            runtime_adapter_override: Some(adapter),
2058        });
2059        self
2060    }
2061
2062    /// Wrap the session service with an after-create hook that fires after
2063    /// each successful `create_session`. The hook is best-effort: errors are
2064    /// not propagated. Uses `AfterCreateMobSessionService` which wraps the
2065    /// inner service without a pre-build hook, so any pre-build mutations
2066    /// from inner wrappers are fully reflected in the context.
2067    pub fn with_after_create_hook(mut self, hook: AfterCreateHook) -> Self {
2068        self.session_service = Arc::new(AfterCreateMobSessionService {
2069            inner: self.session_service,
2070            after_hook: hook,
2071        });
2072        self
2073    }
2074
2075    /// Build an ephemeral session service with a correctly wired `AgentFactory`.
2076    ///
2077    /// If `session_store` is provided, it is set on the `FactoryAgentBuilder` so
2078    /// that agents use the given store instead of falling back to JSONL.
2079    pub fn ephemeral(
2080        definition: MobDefinition,
2081        storage: MobStorage,
2082        store_path: PathBuf,
2083        max_sessions: usize,
2084        session_store: Option<Arc<dyn AgentSessionStore>>,
2085    ) -> Self {
2086        Self::ephemeral_inner(
2087            definition,
2088            storage,
2089            store_path,
2090            max_sessions,
2091            session_store,
2092            None,
2093            CapabilityFlags::default(),
2094            None,
2095            None,
2096        )
2097    }
2098
2099    /// Like [`ephemeral`](Self::ephemeral), but with a pre-build hook that is
2100    /// called before each agent is constructed. Use this to inject external
2101    /// tools, augment system prompts, or set per-agent labels.
2102    pub fn ephemeral_with_hook(
2103        definition: MobDefinition,
2104        storage: MobStorage,
2105        store_path: PathBuf,
2106        max_sessions: usize,
2107        session_store: Option<Arc<dyn AgentSessionStore>>,
2108        hook: impl Fn(
2109            &mut CreateSessionRequest,
2110        ) -> std::pin::Pin<
2111            Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
2112        > + Send
2113        + Sync
2114        + 'static,
2115    ) -> Self {
2116        Self::ephemeral_inner(
2117            definition,
2118            storage,
2119            store_path,
2120            max_sessions,
2121            session_store,
2122            Some(Arc::new(hook)),
2123            CapabilityFlags::default(),
2124            None,
2125            None,
2126        )
2127    }
2128
2129    #[allow(clippy::too_many_arguments)]
2130    pub(crate) fn ephemeral_inner(
2131        definition: MobDefinition,
2132        storage: MobStorage,
2133        store_path: PathBuf,
2134        max_sessions: usize,
2135        session_store: Option<Arc<dyn AgentSessionStore>>,
2136        hook: Option<PreBuildHook>,
2137        mut caps: CapabilityFlags,
2138        after_create_hook: Option<AfterCreateHook>,
2139        agent_config: Option<Config>,
2140    ) -> Self {
2141        caps.image_generation |= mob_definition_may_use_image_generation(&definition);
2142        let binary_blob_store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
2143        let blob_store: Arc<dyn meerkat_core::BlobStore> =
2144            Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
2145        let runtime_adapter = if caps.image_generation {
2146            let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2147                Arc::new(meerkat_runtime::InMemoryRuntimeStore::new());
2148            Some(Arc::new(meerkat_runtime::MeerkatMachine::persistent(
2149                runtime_store,
2150                Arc::clone(&blob_store),
2151            )))
2152        } else {
2153            None
2154        };
2155        let mut factory = AgentFactory::new(&store_path)
2156            .builtins(caps.builtins)
2157            .shell(caps.shell)
2158            .mob(caps.mob)
2159            .comms(caps.comms)
2160            .memory(caps.memory);
2161        if let Some(machine) = runtime_adapter.clone() {
2162            factory = factory.with_image_generation_machine(machine);
2163        }
2164        let config = agent_config.unwrap_or_default();
2165        let mut builder = FactoryAgentBuilder::new(factory, config);
2166        builder.default_blob_store = Some(blob_store);
2167        if let Some(store) = session_store {
2168            builder.default_session_store = Some(store);
2169        }
2170        let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
2171        let session_service: Arc<dyn MobSessionService> = Arc::new(
2172            meerkat_session::EphemeralSessionService::new(builder, max_sessions),
2173        );
2174        let hook = hook.unwrap_or_else(no_op_pre_build_hook);
2175        let after_create_hook = if let Some(runtime_adapter) = runtime_adapter.clone() {
2176            let user_after_create_hook = after_create_hook.clone();
2177            Some(Arc::new(
2178                move |session_id: meerkat_core::types::SessionId, ctx: SessionCreatedContext| {
2179                    let runtime_adapter = runtime_adapter.clone();
2180                    let user_after_create_hook = user_after_create_hook.clone();
2181                    Box::pin(async move {
2182                        runtime_adapter.register_session(session_id.clone()).await;
2183                        if let Some(user_after_create_hook) = user_after_create_hook {
2184                            user_after_create_hook(session_id, ctx).await;
2185                        }
2186                    })
2187                        as std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
2188                },
2189            ) as AfterCreateHook)
2190        } else {
2191            after_create_hook
2192        };
2193        let session_service = Arc::new(PreBuildMobSessionService {
2194            inner: session_service,
2195            hook,
2196            after_create_hook,
2197            runtime_adapter_override: runtime_adapter.clone(),
2198        }) as Arc<dyn MobSessionService>;
2199        let (
2200            agent_mob_mcp_state,
2201            implicit_delegate_retirement_overrides,
2202            agent_mob_default_llm_client_slot,
2203        ) = install_agent_mob_tools(&definition, mob_tools_slot, Arc::clone(&session_service));
2204        let mut spec = Self::new(definition, storage, session_service);
2205        spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
2206        spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
2207        spec.agent_mob_default_llm_client_slot = Some(agent_mob_default_llm_client_slot);
2208        spec.runtime_adapter = runtime_adapter;
2209        spec.binary_blob_store = Some(binary_blob_store);
2210        spec
2211    }
2212
2213    /// Build a persistent session service with a correctly wired `AgentFactory`.
2214    ///
2215    /// The `session_store` is used in two places:
2216    /// 1. As the persistence backend for `PersistentSessionService` (checkpoint/restore).
2217    /// 2. Adapted via `StoreAdapter` and set on `FactoryAgentBuilder.default_session_store`
2218    ///    so that agents use it directly instead of falling back to JSONL.
2219    pub fn persistent(
2220        definition: MobDefinition,
2221        storage: MobStorage,
2222        store_path: PathBuf,
2223        max_sessions: usize,
2224        session_store: Arc<dyn SessionStore>,
2225    ) -> Self {
2226        Self::persistent_inner(
2227            definition,
2228            storage,
2229            store_path,
2230            max_sessions,
2231            session_store,
2232            None,
2233            None,
2234            CapabilityFlags::default(),
2235            None,
2236            None,
2237        )
2238    }
2239
2240    /// Like [`persistent`](Self::persistent), but with a pre-build hook that
2241    /// is called before each agent is constructed. Use this to inject external
2242    /// tools, augment system prompts, or set per-agent labels.
2243    pub fn persistent_with_hook(
2244        definition: MobDefinition,
2245        storage: MobStorage,
2246        store_path: PathBuf,
2247        max_sessions: usize,
2248        session_store: Arc<dyn SessionStore>,
2249        hook: impl Fn(
2250            &mut CreateSessionRequest,
2251        ) -> std::pin::Pin<
2252            Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
2253        > + Send
2254        + Sync
2255        + 'static,
2256    ) -> Self {
2257        Self::persistent_inner(
2258            definition,
2259            storage,
2260            store_path,
2261            max_sessions,
2262            session_store,
2263            None,
2264            Some(Arc::new(hook)),
2265            CapabilityFlags::default(),
2266            None,
2267            None,
2268        )
2269    }
2270
2271    #[allow(clippy::too_many_arguments)]
2272    pub(crate) fn persistent_inner(
2273        definition: MobDefinition,
2274        storage: MobStorage,
2275        store_path: PathBuf,
2276        max_sessions: usize,
2277        session_store: Arc<dyn SessionStore>,
2278        custom_blob_store: Option<Arc<dyn meerkat_core::BlobStore>>,
2279        hook: Option<PreBuildHook>,
2280        mut caps: CapabilityFlags,
2281        after_create_hook: Option<AfterCreateHook>,
2282        agent_config: Option<Config>,
2283    ) -> Self {
2284        caps.image_generation |= mob_definition_may_use_image_generation(&definition);
2285        let (binary_blob_store, blob_store): (
2286            Arc<dyn BinaryBlobStore>,
2287            Arc<dyn meerkat_core::BlobStore>,
2288        ) = if let Some(blob_store) = custom_blob_store {
2289            (
2290                Arc::new(BinaryBlobStoreAdapter::new(blob_store.clone())),
2291                blob_store,
2292            )
2293        } else {
2294            let binary_blob_store: Arc<dyn BinaryBlobStore> = match ObjectStoreBlobStore::local(
2295                store_path.join("blobs"),
2296            ) {
2297                Ok(store) => Arc::new(store),
2298                Err(err) => {
2299                    tracing::warn!(
2300                        error = %err,
2301                        "failed to initialize persistent binary blob store; falling back to in-memory blobs"
2302                    );
2303                    Arc::new(ObjectStoreBlobStore::memory())
2304                }
2305            };
2306            let blob_store: Arc<dyn meerkat_core::BlobStore> =
2307                Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
2308            (binary_blob_store, blob_store)
2309        };
2310        // Use a SQLite-backed runtime store so we get BOTH durability across
2311        // process restart AND control-op authority (archive/retire). The
2312        // earlier 0.6.1 wiring used `Some(InMemoryRuntimeStore)`, which was
2313        // a half-fix: it kept the session-service's runtime_store path on
2314        // (so `load_authoritative_session` resolved through runtime_store —
2315        // good for control ops), but the in-memory store died on restart so
2316        // resume failed. Switching the in-memory store for a persistent one
2317        // satisfies both. The store lives at `store_path/runtime.sqlite`,
2318        // sibling to whatever path the caller's `session_store` uses.
2319        let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2320            build_persistent_runtime_store(&store_path);
2321        let runtime_adapter = Arc::new(meerkat_runtime::MeerkatMachine::persistent(
2322            Arc::clone(&runtime_store),
2323            Arc::clone(&blob_store),
2324        ));
2325        let mut factory = AgentFactory::new(&store_path)
2326            .builtins(caps.builtins)
2327            .shell(caps.shell)
2328            .mob(caps.mob)
2329            .comms(caps.comms)
2330            .memory(caps.memory);
2331        if caps.image_generation {
2332            factory = factory.with_image_generation_machine(runtime_adapter.clone());
2333        }
2334        let config = agent_config.unwrap_or_default();
2335        let mut builder = FactoryAgentBuilder::new(factory, config);
2336        builder.default_session_store = Some(Arc::new(StoreAdapter::new(session_store.clone())));
2337        builder.default_blob_store = Some(blob_store.clone());
2338        let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
2339        let session_service: Arc<dyn MobSessionService> =
2340            Arc::new(meerkat_session::PersistentSessionService::new(
2341                builder,
2342                max_sessions,
2343                session_store,
2344                Some(runtime_store),
2345                blob_store,
2346            ));
2347        let hook = hook.unwrap_or_else(no_op_pre_build_hook);
2348        let session_service = Arc::new(PreBuildMobSessionService {
2349            inner: session_service,
2350            hook,
2351            after_create_hook,
2352            runtime_adapter_override: None,
2353        }) as Arc<dyn MobSessionService>;
2354        let (
2355            agent_mob_mcp_state,
2356            implicit_delegate_retirement_overrides,
2357            agent_mob_default_llm_client_slot,
2358        ) = install_agent_mob_tools(&definition, mob_tools_slot, Arc::clone(&session_service));
2359        let mut spec = Self::new(definition, storage, session_service);
2360        spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
2361        spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
2362        spec.agent_mob_default_llm_client_slot = Some(agent_mob_default_llm_client_slot);
2363        spec.runtime_adapter = Some(runtime_adapter);
2364        spec.binary_blob_store = Some(binary_blob_store);
2365        spec
2366    }
2367
2368    #[allow(clippy::too_many_arguments)]
2369    pub(crate) fn ephemeral_runtime_backed_inner(
2370        definition: MobDefinition,
2371        storage: MobStorage,
2372        store_path: PathBuf,
2373        max_sessions: usize,
2374        custom_session_store: Option<Arc<dyn SessionStore>>,
2375        custom_blob_store: Option<Arc<dyn meerkat_core::BlobStore>>,
2376        hook: Option<PreBuildHook>,
2377        mut caps: CapabilityFlags,
2378        after_create_hook: Option<AfterCreateHook>,
2379        agent_config: Option<Config>,
2380    ) -> Self {
2381        caps.image_generation |= mob_definition_may_use_image_generation(&definition);
2382        let config = agent_config.unwrap_or_default();
2383        let session_store: Arc<dyn SessionStore> = custom_session_store
2384            .clone()
2385            .unwrap_or_else(|| Arc::new(meerkat_store::MemoryStore::new()));
2386        let (binary_blob_store, blob_store): (
2387            Arc<dyn BinaryBlobStore>,
2388            Arc<dyn meerkat_core::BlobStore>,
2389        ) = if let Some(blob_store) = custom_blob_store {
2390            (
2391                Arc::new(BinaryBlobStoreAdapter::new(blob_store.clone())),
2392                blob_store,
2393            )
2394        } else {
2395            let binary_blob_store: Arc<dyn BinaryBlobStore> =
2396                Arc::new(ObjectStoreBlobStore::memory());
2397            let blob_store: Arc<dyn meerkat_core::BlobStore> =
2398                Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
2399            (binary_blob_store, blob_store)
2400        };
2401        // Runtime-backed ephemeral mode keeps the live EphemeralSessionService
2402        // as the comms authority, but registers each created session with the
2403        // same in-memory machine used by image generation. Meerkat 0.6.4's
2404        // persistent runtime-backed create path does not expose member comms
2405        // handles early enough for mob edge reconciliation; this bounded bridge
2406        // preserves live comms while avoiding the old "image tool sees the
2407        // session as destroyed" split-machine bug.
2408        let base_runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2409            Arc::new(meerkat_runtime::InMemoryRuntimeStore::new());
2410        let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2411            if let Some(custom_session_store) = custom_session_store.clone() {
2412                Arc::new(SessionStoreBackedRuntimeStore::new(
2413                    Arc::clone(&base_runtime_store),
2414                    custom_session_store,
2415                ))
2416            } else {
2417                Arc::clone(&base_runtime_store)
2418            };
2419        let runtime_adapter = Arc::new(meerkat_runtime::MeerkatMachine::persistent(
2420            Arc::clone(&runtime_store),
2421            Arc::clone(&blob_store),
2422        ));
2423        let mut factory = AgentFactory::new(&store_path)
2424            .builtins(caps.builtins)
2425            .shell(caps.shell)
2426            .mob(caps.mob)
2427            .comms(caps.comms)
2428            .memory(caps.memory);
2429        if caps.image_generation {
2430            factory = factory.with_image_generation_machine(runtime_adapter.clone());
2431        }
2432        let mut builder = FactoryAgentBuilder::new(factory, config);
2433        builder.default_session_store = Some(Arc::new(StoreAdapter::new(session_store.clone())));
2434        builder.default_blob_store = Some(blob_store.clone());
2435        let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
2436        let session_service: Arc<dyn MobSessionService> =
2437            if let Some(custom_session_store) = custom_session_store {
2438                Arc::new(meerkat_session::PersistentSessionService::new(
2439                    builder,
2440                    max_sessions,
2441                    custom_session_store,
2442                    Some(runtime_store.clone()),
2443                    blob_store,
2444                ))
2445            } else {
2446                Arc::new(meerkat_session::EphemeralSessionService::new(
2447                    builder,
2448                    max_sessions,
2449                ))
2450            };
2451        let hook = hook.unwrap_or_else(no_op_pre_build_hook);
2452        let runtime_adapter_for_after_create = runtime_adapter.clone();
2453        let combined_after_create_hook: AfterCreateHook = Arc::new(move |session_id, ctx| {
2454            let runtime_adapter = runtime_adapter_for_after_create.clone();
2455            let after_create_hook = after_create_hook.clone();
2456            Box::pin(async move {
2457                runtime_adapter.register_session(session_id.clone()).await;
2458                if let Some(after_create_hook) = after_create_hook {
2459                    after_create_hook(session_id, ctx).await;
2460                }
2461            })
2462        });
2463        let session_service = Arc::new(PreBuildMobSessionService {
2464            inner: session_service,
2465            hook,
2466            after_create_hook: Some(combined_after_create_hook),
2467            runtime_adapter_override: Some(runtime_adapter.clone()),
2468        }) as Arc<dyn MobSessionService>;
2469        let (
2470            agent_mob_mcp_state,
2471            implicit_delegate_retirement_overrides,
2472            agent_mob_default_llm_client_slot,
2473        ) = install_agent_mob_tools(&definition, mob_tools_slot, Arc::clone(&session_service));
2474        let mut spec = Self::new(definition, storage, session_service);
2475        spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
2476        spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
2477        spec.agent_mob_default_llm_client_slot = Some(agent_mob_default_llm_client_slot);
2478        spec.runtime_adapter = Some(runtime_adapter);
2479        spec.binary_blob_store = Some(binary_blob_store);
2480        spec
2481    }
2482}
2483
2484/// Error returned by mob runtime operations.
2485#[derive(Debug)]
2486pub enum MobRuntimeError {
2487    Mob(MobError),
2488    InvalidInput(&'static str),
2489    InvalidConfig(String),
2490}
2491
2492impl std::fmt::Display for MobRuntimeError {
2493    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2494        match self {
2495            Self::Mob(err) => write!(f, "{err}"),
2496            Self::InvalidInput(message) => write!(f, "{message}"),
2497            Self::InvalidConfig(message) => write!(f, "{message}"),
2498        }
2499    }
2500}
2501
2502impl std::error::Error for MobRuntimeError {}
2503
2504impl From<MobError> for MobRuntimeError {
2505    fn from(value: MobError) -> Self {
2506        Self::Mob(value)
2507    }
2508}
2509
2510// Mobkit's `MobMemberSnapshot`, `MobReconcileReport`, `MobReconcileOptions`
2511// wrapper types were removed as part of the meerkat 0.6 thin-shell cleanup.
2512// Consumers now use `meerkat_mob::runtime::MobMemberListEntry` and
2513// `meerkat_mob::runtime::reconcile::{ReconcileReport, ReconcileOptions,
2514// MemberFilter}` directly.
2515
2516/// Context delivered to [`SessionHook::after_create`] after a session is
2517/// successfully created.
2518#[derive(Clone, Debug)]
2519pub struct SessionCreatedContext {
2520    pub model: String,
2521    pub labels: std::collections::BTreeMap<String, String>,
2522    pub system_prompt: Option<String>,
2523}
2524
2525/// Hook trait for customising session lifecycle.
2526///
2527/// - `before_create` — runs before `create_session`. Returning `Err` aborts
2528///   session creation (both Rust-native and Python/TS boundary).
2529/// - `after_create` — runs after session creation succeeds. Best-effort: errors
2530///   are logged at `warn`, not propagated. The session is already live.
2531#[async_trait]
2532pub trait SessionHook: Send + Sync {
2533    /// Called before session creation. Mutate the request to inject tools,
2534    /// augment prompts, set labels, override model, etc. Return `Err` to
2535    /// abort session creation.
2536    async fn before_create(&self, _req: &mut CreateSessionRequest) -> Result<(), SessionError> {
2537        Ok(())
2538    }
2539
2540    /// Called after a session is successfully created. Best-effort — errors
2541    /// logged, not propagated.
2542    async fn after_create(
2543        &self,
2544        _session_id: &meerkat_core::types::SessionId,
2545        _ctx: &SessionCreatedContext,
2546    ) {
2547    }
2548}
2549
2550/// Capability flags controlling which agent capabilities are enabled.
2551#[derive(Clone, Copy, Debug)]
2552pub struct CapabilityFlags {
2553    pub builtins: bool,
2554    pub shell: bool,
2555    pub mob: bool,
2556    pub comms: bool,
2557    pub memory: bool,
2558    pub image_generation: bool,
2559}
2560
2561impl Default for CapabilityFlags {
2562    fn default() -> Self {
2563        Self {
2564            builtins: true,
2565            shell: true,
2566            mob: true,
2567            comms: true,
2568            memory: true,
2569            image_generation: false,
2570        }
2571    }
2572}
2573
2574/// Backward-compatible alias for [`MobRuntime`].
2575pub type RealMobRuntime = MobRuntime;
2576
2577/// Live mob runtime backed by a `MobHandle`.
2578#[derive(Clone)]
2579pub struct MobRuntime {
2580    handle: MobHandle,
2581    session_service: Option<Arc<dyn MobSessionService>>,
2582    agent_mob_mcp_state: Option<Arc<meerkat_mob_mcp::MobMcpState>>,
2583    implicit_delegate_retirement_overrides: Option<ImplicitDelegateRetirementOverrides>,
2584    binary_blob_store: Option<Arc<dyn BinaryBlobStore>>,
2585    baseline_member_specs: Arc<tokio::sync::RwLock<Vec<SpawnMemberSpec>>>,
2586    /// Keeps the ephemeral temp directory alive for the lifetime of the runtime.
2587    /// Dropped when the runtime is dropped, cleaning up the temp dir.
2588    _ephemeral_dir: Option<Arc<tempfile::TempDir>>,
2589}
2590
2591impl MobRuntime {
2592    pub async fn bootstrap(spec: MobBootstrapSpec) -> Result<Self, MobRuntimeError> {
2593        let ephemeral_dir = spec._ephemeral_dir.clone();
2594        let session_service = spec.session_service.clone();
2595        let binary_blob_store = spec.binary_blob_store.clone();
2596        let mob_id = spec.definition.id.clone();
2597        let agent_mob_mcp_state = spec.agent_mob_mcp_state.clone();
2598        let implicit_delegate_retirement_overrides =
2599            spec.implicit_delegate_retirement_overrides.clone();
2600        let default_llm_client = spec
2601            .options
2602            .default_llm_client
2603            .clone()
2604            .map(ReplaySanitizingLlmClient::wrap);
2605        if let Some(slot) = spec.agent_mob_default_llm_client_slot.as_ref() {
2606            *slot
2607                .write()
2608                .unwrap_or_else(std::sync::PoisonError::into_inner) = default_llm_client.clone();
2609        }
2610        let effective_runtime_adapter = spec
2611            .runtime_adapter
2612            .clone()
2613            .or_else(|| session_service.runtime_adapter());
2614
2615        let mut builder = MobBuilder::new(spec.definition, spec.storage);
2616
2617        // MobActor's autonomous readiness/comms-drain path consults the
2618        // builder-published runtime adapter directly. For session services
2619        // that already embed a runtime adapter (definition-based ephemeral
2620        // and persistent-with-runtime-backed-service), forward that adapter
2621        // explicitly so autonomous members do not come up session-backed but
2622        // runtime-unattached.
2623        if let Some(adapter) = effective_runtime_adapter {
2624            builder = builder.with_runtime_adapter(adapter);
2625        }
2626
2627        builder = builder
2628            .with_session_service(session_service.clone())
2629            .allow_ephemeral_sessions(spec.options.allow_ephemeral_sessions)
2630            .notify_orchestrator_on_resume(spec.options.notify_orchestrator_on_resume);
2631
2632        if let Some(client) = default_llm_client {
2633            builder = builder.with_default_llm_client(client);
2634        }
2635
2636        let handle = builder.create().await?;
2637        if let Some(state) = agent_mob_mcp_state.as_ref() {
2638            state.mob_insert_handle(mob_id, handle.clone()).await;
2639        }
2640        Ok(Self {
2641            handle,
2642            session_service: Some(session_service),
2643            agent_mob_mcp_state,
2644            implicit_delegate_retirement_overrides,
2645            binary_blob_store,
2646            baseline_member_specs: Arc::new(tokio::sync::RwLock::new(Vec::new())),
2647            _ephemeral_dir: ephemeral_dir,
2648        })
2649    }
2650
2651    pub fn from_handle(handle: MobHandle) -> Self {
2652        Self {
2653            handle,
2654            session_service: None,
2655            agent_mob_mcp_state: None,
2656            implicit_delegate_retirement_overrides: None,
2657            binary_blob_store: None,
2658            baseline_member_specs: Arc::new(tokio::sync::RwLock::new(Vec::new())),
2659            _ephemeral_dir: None,
2660        }
2661    }
2662
2663    pub fn handle(&self) -> MobHandle {
2664        self.handle.clone()
2665    }
2666
2667    pub fn agent_mob_mcp_state(&self) -> Option<Arc<meerkat_mob_mcp::MobMcpState>> {
2668        self.agent_mob_mcp_state.clone()
2669    }
2670
2671    pub(crate) fn implicit_delegate_retirement_overrides(
2672        &self,
2673    ) -> Option<ImplicitDelegateRetirementOverrides> {
2674        self.implicit_delegate_retirement_overrides.clone()
2675    }
2676
2677    pub async fn set_baseline_member_specs(&self, specs: Vec<SpawnMemberSpec>) {
2678        *self.baseline_member_specs.write().await = specs;
2679    }
2680
2681    pub async fn baseline_member_specs(&self) -> Vec<SpawnMemberSpec> {
2682        self.baseline_member_specs.read().await.clone()
2683    }
2684
2685    pub async fn read_session_history(
2686        &self,
2687        session_id_str: &str,
2688        offset: usize,
2689        limit: Option<usize>,
2690    ) -> Result<SessionHistoryPage, MobRuntimeError> {
2691        if session_id_str.trim().is_empty() {
2692            return Err(MobRuntimeError::InvalidInput(
2693                "session_id must not be empty",
2694            ));
2695        }
2696        let Some(session_service) = self.session_service.as_ref() else {
2697            return Err(MobRuntimeError::InvalidInput(
2698                "session history unavailable for this runtime",
2699            ));
2700        };
2701        let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2702            .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2703        SessionServiceHistoryExt::read_history(
2704            session_service.as_ref(),
2705            &session_id,
2706            SessionHistoryQuery { offset, limit },
2707        )
2708        .await
2709        .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))
2710    }
2711
2712    #[allow(dead_code)]
2713    pub(crate) async fn runtime_state_for_session(
2714        &self,
2715        session_id_str: &str,
2716    ) -> Result<Option<meerkat_runtime::RuntimeState>, MobRuntimeError> {
2717        if session_id_str.trim().is_empty() {
2718            return Err(MobRuntimeError::InvalidInput(
2719                "session_id must not be empty",
2720            ));
2721        }
2722        let Some(session_service) = self.session_service.as_ref() else {
2723            return Ok(None);
2724        };
2725        let Some(runtime_adapter) = session_service.runtime_adapter() else {
2726            return Ok(None);
2727        };
2728        let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2729            .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2730        let state = meerkat_runtime::service_ext::SessionServiceRuntimeExt::runtime_state(
2731            runtime_adapter.as_ref(),
2732            &session_id,
2733        )
2734        .await
2735        .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))?;
2736        Ok(Some(state))
2737    }
2738
2739    #[allow(dead_code)]
2740    pub(crate) async fn comms_runtime_for_session(
2741        &self,
2742        session_id_str: &str,
2743    ) -> Result<Option<Arc<dyn CommsRuntime>>, MobRuntimeError> {
2744        if session_id_str.trim().is_empty() {
2745            return Err(MobRuntimeError::InvalidInput(
2746                "session_id must not be empty",
2747            ));
2748        }
2749        let Some(session_service) = self.session_service.as_ref() else {
2750            return Ok(None);
2751        };
2752        let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2753            .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2754        Ok(
2755            meerkat_core::service::SessionServiceCommsExt::comms_runtime(
2756                session_service.as_ref(),
2757                &session_id,
2758            )
2759            .await,
2760        )
2761    }
2762
2763    #[allow(dead_code)]
2764    pub(crate) async fn active_input_ids_for_session(
2765        &self,
2766        session_id_str: &str,
2767    ) -> Result<Option<Vec<String>>, MobRuntimeError> {
2768        if session_id_str.trim().is_empty() {
2769            return Err(MobRuntimeError::InvalidInput(
2770                "session_id must not be empty",
2771            ));
2772        }
2773        let Some(session_service) = self.session_service.as_ref() else {
2774            return Ok(None);
2775        };
2776        let Some(runtime_adapter) = session_service.runtime_adapter() else {
2777            return Ok(None);
2778        };
2779        let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2780            .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2781        let input_ids = meerkat_runtime::service_ext::SessionServiceRuntimeExt::list_active_inputs(
2782            runtime_adapter.as_ref(),
2783            &session_id,
2784        )
2785        .await
2786        .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))?;
2787        Ok(Some(
2788            input_ids.into_iter().map(|id| id.to_string()).collect(),
2789        ))
2790    }
2791
2792    #[allow(dead_code)]
2793    pub(crate) async fn ensure_comms_drain_for_session(
2794        &self,
2795        session_id_str: &str,
2796    ) -> Result<Option<bool>, MobRuntimeError> {
2797        if session_id_str.trim().is_empty() {
2798            return Err(MobRuntimeError::InvalidInput(
2799                "session_id must not be empty",
2800            ));
2801        }
2802        let Some(session_service) = self.session_service.as_ref() else {
2803            return Ok(None);
2804        };
2805        let Some(runtime_adapter) = session_service.runtime_adapter() else {
2806            return Ok(None);
2807        };
2808        let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2809            .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2810        let comms_runtime = meerkat_core::service::SessionServiceCommsExt::comms_runtime(
2811            session_service.as_ref(),
2812            &session_id,
2813        )
2814        .await;
2815        if let Some(comms) = comms_runtime {
2816            let _handle = meerkat_runtime::comms_drain::spawn_comms_drain(
2817                runtime_adapter.clone(),
2818                session_id,
2819                comms,
2820                None,
2821            );
2822            Ok(Some(true))
2823        } else {
2824            Ok(Some(false))
2825        }
2826    }
2827
2828    /// Access the session service this runtime was bootstrapped with, if any.
2829    ///
2830    /// Present for `MobRuntime::bootstrap(...)`-produced runtimes; `None` for
2831    /// `MobRuntime::from_handle(...)`. HTTP handlers that need to read session
2832    /// history reach through this accessor.
2833    pub fn session_service(&self) -> Option<&Arc<dyn MobSessionService>> {
2834        self.session_service.as_ref()
2835    }
2836
2837    pub fn binary_blob_store(&self) -> Option<Arc<dyn BinaryBlobStore>> {
2838        self.binary_blob_store.clone()
2839    }
2840}
2841
2842/// Project a meerkat `MobMemberListEntry` into mobkit's HTTP JSON shape.
2843///
2844/// Aligns with meerkat 0.6's lightweight-roster design: list entries do
2845/// not carry a bridge `session_id`. Callers needing the realtime session
2846/// for a member must use `mobkit/member_status`, which serializes
2847/// `MobMemberSnapshot.current_session_id` natively.
2848pub fn member_entry_to_json(entry: &meerkat_mob::runtime::MobMemberListEntry) -> serde_json::Value {
2849    serde_json::to_value(entry).unwrap_or(serde_json::Value::Null)
2850}
2851
2852pub fn content_input_has_images(content: &meerkat_core::ContentInput) -> bool {
2853    match content {
2854        meerkat_core::ContentInput::Text(_) => false,
2855        meerkat_core::ContentInput::Blocks(blocks) => blocks
2856            .iter()
2857            .any(|block| matches!(block, meerkat_core::ContentBlock::Image { .. })),
2858    }
2859}
2860
2861pub fn model_capabilities_for_model(
2862    provider: Provider,
2863    model: &str,
2864) -> crate::runtime::ConsoleModelCapabilities {
2865    let image_input = meerkat_core::model_profile::profile_for(provider, model)
2866        .map(|profile| profile.vision)
2867        .unwrap_or(false);
2868    crate::runtime::ConsoleModelCapabilities { image_input }
2869}
2870
2871pub fn model_capabilities_for_profile(
2872    profile: &Profile,
2873) -> crate::runtime::ConsoleModelCapabilities {
2874    let image_input = Provider::infer_from_model(&profile.model)
2875        .and_then(|provider| meerkat_core::model_profile::profile_for(provider, &profile.model))
2876        .map(|profile| profile.vision)
2877        .unwrap_or(false);
2878    crate::runtime::ConsoleModelCapabilities { image_input }
2879}
2880
2881pub fn model_capabilities_for_role(
2882    definition: &MobDefinition,
2883    role: &str,
2884) -> crate::runtime::ConsoleModelCapabilities {
2885    let profile_name = ProfileName::from(role);
2886    definition
2887        .resolve_inline_profile(&profile_name)
2888        .map(model_capabilities_for_profile)
2889        .unwrap_or(crate::runtime::ConsoleModelCapabilities { image_input: false })
2890}
2891
2892pub fn model_capabilities_for_member_entry(
2893    definition: &MobDefinition,
2894    entry: &meerkat_mob::runtime::MobMemberListEntry,
2895) -> crate::runtime::ConsoleModelCapabilities {
2896    model_capabilities_for_role(definition, entry.role.as_str())
2897}
2898
2899pub async fn model_capabilities_for_member(
2900    handle: &MobHandle,
2901    session_service: Option<&Arc<dyn MobSessionService>>,
2902    member_id: &meerkat_mob::ids::MeerkatId,
2903) -> crate::runtime::ConsoleModelCapabilities {
2904    if let Some(service) = session_service
2905        && let Some(session_id) = handle.resolve_bridge_session_id(member_id).await
2906        && let Ok(view) = service.read(&session_id).await
2907    {
2908        return model_capabilities_for_model(view.state.provider, &view.state.model);
2909    }
2910
2911    handle
2912        .get_member(member_id)
2913        .await
2914        .map(|member| model_capabilities_for_role(handle.definition(), member.role.as_str()))
2915        .unwrap_or(crate::runtime::ConsoleModelCapabilities { image_input: false })
2916}
2917
2918pub async fn assert_member_accepts_images(
2919    handle: &MobHandle,
2920    session_service: Option<&Arc<dyn MobSessionService>>,
2921    member_id: &str,
2922    content: &meerkat_core::ContentInput,
2923) -> Result<(), MobRuntimeError> {
2924    if !content_input_has_images(content) {
2925        return Ok(());
2926    }
2927    let mid = meerkat_mob::ids::MeerkatId::from(member_id);
2928    let Some(member) = handle.get_member(&mid).await else {
2929        return Err(MobRuntimeError::InvalidInput("member not found"));
2930    };
2931    let caps = model_capabilities_for_member(handle, session_service, &member.agent_identity).await;
2932    if caps.image_input {
2933        Ok(())
2934    } else {
2935        Err(MobRuntimeError::InvalidInput(
2936            "target member model cannot accept image input",
2937        ))
2938    }
2939}
2940
2941/// Send content to a mob member and return the bridge session id that
2942/// accepted the injection.
2943///
2944/// Validates that `member_id` and `content` are non-empty, calls
2945/// `handle.member(&id).send(...)`, then queries the mob handle for the
2946/// currently-bound bridge session id. Meerkat 0.6 removed `session_id` from
2947/// `MemberDeliveryReceipt`; this helper is mobkit's glue for the
2948/// send-and-learn-what-session-took-it pattern used by HTTP/RPC handlers and
2949/// the scheduled-dispatch injection path.
2950pub async fn send_message_on_mob(
2951    handle: &MobHandle,
2952    member_id: &str,
2953    content: impl Into<meerkat_core::ContentInput>,
2954) -> Result<String, MobRuntimeError> {
2955    send_message_on_mob_with_mode(
2956        handle,
2957        member_id,
2958        content,
2959        meerkat_core::types::HandlingMode::Queue,
2960    )
2961    .await
2962}
2963
2964/// Variant that accepts the console's `Queue`/`Steer` wire contract while
2965/// delivering through MobKit's direct member-send path.
2966pub async fn send_message_on_mob_with_mode(
2967    handle: &MobHandle,
2968    member_id: &str,
2969    content: impl Into<meerkat_core::ContentInput>,
2970    handling_mode: meerkat_core::types::HandlingMode,
2971) -> Result<String, MobRuntimeError> {
2972    if member_id.trim().is_empty() {
2973        return Err(MobRuntimeError::InvalidInput("member_id must not be empty"));
2974    }
2975    let content = content.into();
2976    let is_empty = match &content {
2977        meerkat_core::ContentInput::Text(s) => s.trim().is_empty(),
2978        meerkat_core::ContentInput::Blocks(blocks) => blocks.is_empty(),
2979    };
2980    if is_empty {
2981        return Err(MobRuntimeError::InvalidInput("content must not be empty"));
2982    }
2983    let mid = meerkat_mob::ids::MeerkatId::from(member_id);
2984    let _receipt = handle
2985        .member(&mid)
2986        .await?
2987        .send(content, handling_mode)
2988        .await?;
2989    let session_id = handle
2990        .resolve_bridge_session_id(&mid)
2991        .await
2992        .ok_or_else(|| {
2993            MobRuntimeError::Mob(MobError::Internal(
2994                "member has no bridge session after send".to_string(),
2995            ))
2996        })?;
2997    Ok(session_id.to_string())
2998}
2999
3000#[cfg(test)]
3001#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
3002mod tests {
3003    use super::*;
3004
3005    struct EmptyDispatcher;
3006
3007    #[async_trait::async_trait]
3008    impl meerkat_core::AgentToolDispatcher for EmptyDispatcher {
3009        fn tools(&self) -> Arc<[Arc<meerkat_core::types::ToolDef>]> {
3010            Vec::<Arc<meerkat_core::types::ToolDef>>::new().into()
3011        }
3012
3013        async fn dispatch(
3014            &self,
3015            call: meerkat_core::types::ToolCallView<'_>,
3016        ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
3017            Err(meerkat_core::ToolError::not_found(call.name))
3018        }
3019
3020        fn capabilities(&self) -> meerkat_core::agent::DispatcherCapabilities {
3021            meerkat_core::agent::DispatcherCapabilities::default()
3022        }
3023    }
3024
3025    fn wrapper_with_overrides(
3026        overrides: ImplicitDelegateRetirementOverrides,
3027    ) -> AutoWireParentMobToolDispatcher {
3028        AutoWireParentMobToolDispatcher {
3029            inner: Arc::new(EmptyDispatcher),
3030            implicit_delegate_retirement_overrides: overrides,
3031        }
3032    }
3033
3034    #[test]
3035    fn delegate_tool_schema_exposes_idle_retire_secs() {
3036        let tool = meerkat_core::types::ToolDef::new(
3037            "delegate",
3038            "Delegate work",
3039            serde_json::json!({
3040                "type": "object",
3041                "properties": {
3042                    "task": {"type": "string"}
3043                },
3044                "required": ["task"]
3045            }),
3046        );
3047
3048        let patched = delegate_tool_def_with_idle_retire_secs(&tool);
3049        let idle_retire_secs = &patched.input_schema["properties"]["idle_retire_secs"];
3050
3051        assert!(patched.description.contains("IDLE RETIREMENT:"));
3052        assert_eq!(idle_retire_secs["anyOf"][0]["type"], "integer");
3053        assert_eq!(idle_retire_secs["anyOf"][0]["minimum"], 0);
3054        assert_eq!(idle_retire_secs["anyOf"][1]["type"], "null");
3055    }
3056
3057    #[test]
3058    fn mob_spawn_tool_schema_exposes_opt_in_idle_retire_secs() {
3059        let tool = meerkat_core::types::ToolDef::new(
3060            "mob_spawn_member",
3061            "Spawn member",
3062            serde_json::json!({
3063                "type": "object",
3064                "properties": {
3065                    "profile": {"type": "string"},
3066                    "member_id": {"type": "string"}
3067                },
3068                "required": ["profile", "member_id"]
3069            }),
3070        );
3071
3072        let patched = mob_spawn_tool_def_with_idle_retire_secs(&tool);
3073        let idle_retire_secs = &patched.input_schema["properties"]["idle_retire_secs"];
3074
3075        assert!(
3076            patched
3077                .description
3078                .contains("Omit idle_retire_secs to leave this spawned member out")
3079        );
3080        assert_eq!(idle_retire_secs["anyOf"][0]["type"], "integer");
3081        assert_eq!(idle_retire_secs["anyOf"][0]["minimum"], 0);
3082        assert_eq!(idle_retire_secs["anyOf"][1]["type"], "null");
3083    }
3084
3085    #[tokio::test]
3086    async fn auto_wire_wrapper_preserves_ops_lifecycle_binding() {
3087        use meerkat_core::AgentToolDispatcher;
3088        use std::sync::atomic::{AtomicBool, Ordering};
3089
3090        struct BindAwareDispatcher {
3091            bound: Arc<AtomicBool>,
3092        }
3093
3094        #[async_trait::async_trait]
3095        impl meerkat_core::AgentToolDispatcher for BindAwareDispatcher {
3096            fn tools(&self) -> Arc<[Arc<meerkat_core::types::ToolDef>]> {
3097                Vec::<Arc<meerkat_core::types::ToolDef>>::new().into()
3098            }
3099
3100            async fn dispatch(
3101                &self,
3102                call: meerkat_core::types::ToolCallView<'_>,
3103            ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
3104                Err(meerkat_core::ToolError::not_found(call.name))
3105            }
3106
3107            fn capabilities(&self) -> meerkat_core::agent::DispatcherCapabilities {
3108                meerkat_core::agent::DispatcherCapabilities {
3109                    ops_lifecycle: true,
3110                }
3111            }
3112
3113            fn bind_ops_lifecycle(
3114                self: Arc<Self>,
3115                _registry: Arc<dyn meerkat_core::ops_lifecycle::OpsLifecycleRegistry>,
3116                _owner_bridge_session_id: meerkat_core::types::SessionId,
3117            ) -> Result<meerkat_core::agent::BindOutcome, meerkat_core::agent::OpsLifecycleBindError>
3118            {
3119                self.bound.store(true, Ordering::SeqCst);
3120                Ok(meerkat_core::agent::BindOutcome::Bound(self))
3121            }
3122        }
3123
3124        let bound = Arc::new(AtomicBool::new(false));
3125        let dispatcher = Arc::new(AutoWireParentMobToolDispatcher {
3126            inner: Arc::new(BindAwareDispatcher {
3127                bound: Arc::clone(&bound),
3128            }),
3129            implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides::default(),
3130        });
3131
3132        assert!(dispatcher.capabilities().ops_lifecycle);
3133        let outcome = dispatcher
3134            .bind_ops_lifecycle(
3135                Arc::new(meerkat_runtime::ops_lifecycle::RuntimeOpsLifecycleRegistry::new()),
3136                meerkat_core::types::SessionId::new(),
3137            )
3138            .expect("wrapper should delegate ops lifecycle binding");
3139
3140        assert!(outcome.was_bound());
3141        assert!(bound.load(Ordering::SeqCst));
3142        assert!(outcome.into_dispatcher().capabilities().ops_lifecycle);
3143    }
3144
3145    #[test]
3146    fn delegate_idle_retire_secs_arg_is_stripped_and_parsed() {
3147        let mut args = serde_json::json!({
3148            "task": "inspect",
3149            "idle_retire_secs": 42
3150        });
3151
3152        let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
3153            .expect("valid idle retire arg");
3154
3155        assert_eq!(parsed, Some(DelegateIdleRetireOverride::Seconds(42)));
3156        assert!(args.get("idle_retire_secs").is_none());
3157    }
3158
3159    #[test]
3160    fn delegate_idle_retire_secs_null_disables_member_retirement() {
3161        let mut args = serde_json::json!({
3162            "task": "inspect",
3163            "idle_retire_secs": null
3164        });
3165
3166        let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
3167            .expect("valid idle retire arg");
3168
3169        assert_eq!(parsed, Some(DelegateIdleRetireOverride::Disabled));
3170        assert!(args.get("idle_retire_secs").is_none());
3171    }
3172
3173    #[test]
3174    fn delegate_idle_retire_secs_omitted_inherits_runtime_default() {
3175        let mut args = serde_json::json!({"task": "inspect"});
3176
3177        let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
3178            .expect("omitted idle retire arg");
3179
3180        assert_eq!(parsed, None);
3181        assert_eq!(args, serde_json::json!({"task": "inspect"}));
3182    }
3183
3184    #[test]
3185    fn delegate_idle_retire_secs_rejects_negative_or_fractional_values() {
3186        let mut negative = serde_json::json!({"task": "inspect", "idle_retire_secs": -1});
3187        let mut fractional = serde_json::json!({"task": "inspect", "idle_retire_secs": 1.5});
3188
3189        assert!(delegate_idle_retire_override_from_args("delegate", &mut negative).is_err());
3190        assert!(delegate_idle_retire_override_from_args("delegate", &mut fractional).is_err());
3191    }
3192
3193    #[test]
3194    fn mob_spawn_idle_retire_targets_use_args_when_result_omits_mob_id() {
3195        let args = serde_json::json!({
3196            "mob_id": "ob3",
3197            "profile": "review-worker",
3198            "member_id": "review-worker-vibe-forward",
3199        });
3200        let fallback_targets = idle_retire_targets_from_spawn_args(&args);
3201
3202        assert_eq!(
3203            fallback_targets,
3204            vec![IdleRetireTarget {
3205                mob_id: "ob3".to_string(),
3206                member_id: "review-worker-vibe-forward".to_string(),
3207            }]
3208        );
3209        assert_eq!(
3210            idle_retire_targets_from_outcome_text(
3211                r#"{"agent_identity":"review-worker-vibe-forward","member_ref":"opaque"}"#,
3212                &fallback_targets,
3213            ),
3214            fallback_targets
3215        );
3216    }
3217
3218    #[test]
3219    fn mob_spawn_idle_retire_targets_support_canonical_specs_shape() {
3220        let args = serde_json::json!({
3221            "mob_id": "ob3",
3222            "specs": [
3223                {"profile": "person-worker", "agent_identity": "person-worker-a"},
3224                {"profile": "person-worker", "member_id": "person-worker-b", "mob_id": "other"}
3225            ]
3226        });
3227        let fallback_targets = idle_retire_targets_from_spawn_args(&args);
3228
3229        assert_eq!(
3230            fallback_targets,
3231            vec![
3232                IdleRetireTarget {
3233                    mob_id: "ob3".to_string(),
3234                    member_id: "person-worker-a".to_string(),
3235                },
3236                IdleRetireTarget {
3237                    mob_id: "other".to_string(),
3238                    member_id: "person-worker-b".to_string(),
3239                },
3240            ]
3241        );
3242        assert_eq!(
3243            idle_retire_targets_from_outcome_text(
3244                r#"{"members":[{"agent_identity":"person-worker-a"},{"agent_identity":"person-worker-b","mob_id":"other"}]}"#,
3245                &fallback_targets,
3246            ),
3247            fallback_targets
3248        );
3249    }
3250
3251    #[tokio::test]
3252    async fn implicit_delegate_retirement_overrides_round_trip_per_member() {
3253        let overrides = ImplicitDelegateRetirementOverrides::default();
3254
3255        overrides
3256            .set("mob-a", "worker-1", DelegateIdleRetireOverride::Seconds(12))
3257            .await;
3258        overrides
3259            .set("mob-a", "worker-2", DelegateIdleRetireOverride::Disabled)
3260            .await;
3261
3262        assert_eq!(
3263            overrides.get("mob-a", "worker-1").await,
3264            Some(DelegateIdleRetireOverride::Seconds(12))
3265        );
3266        assert_eq!(
3267            overrides.get("mob-a", "worker-2").await,
3268            Some(DelegateIdleRetireOverride::Disabled)
3269        );
3270        assert_eq!(overrides.get("mob-a", "worker-3").await, None);
3271    }
3272
3273    #[tokio::test]
3274    async fn mob_spawn_idle_retire_registration_uses_spawn_args_when_result_omits_mob_id() {
3275        let overrides = ImplicitDelegateRetirementOverrides::default();
3276        let dispatcher = wrapper_with_overrides(overrides.clone());
3277        let fallback_targets = idle_retire_targets_from_spawn_args(&serde_json::json!({
3278            "mob_id": "ob3",
3279            "member_id": "review-worker-vibe-forward",
3280        }));
3281        let outcome =
3282            meerkat_core::ToolDispatchOutcome::sync_result(meerkat_core::types::ToolResult::new(
3283                "spawn-1".to_string(),
3284                r#"{"agent_identity":"review-worker-vibe-forward","member_ref":"opaque"}"#
3285                    .to_string(),
3286                false,
3287            ));
3288
3289        dispatcher
3290            .register_idle_retire_override_from_outcome(
3291                &outcome,
3292                Some(DelegateIdleRetireOverride::Seconds(900)),
3293                &fallback_targets,
3294            )
3295            .await;
3296
3297        assert_eq!(
3298            overrides.get("ob3", "review-worker-vibe-forward").await,
3299            Some(DelegateIdleRetireOverride::Seconds(900))
3300        );
3301    }
3302
3303    #[test]
3304    fn image_generation_substrate_defaults_off_for_inline_profiles() {
3305        let definition = meerkat_mob::MobDefinition::from_toml(
3306            r#"
3307[mob]
3308id = "test"
3309
3310[profiles.worker]
3311model = "gpt-5.5"
3312
3313[profiles.worker.tools]
3314builtins = true
3315"#,
3316        )
3317        .unwrap_or_else(|e| panic!("{e}"));
3318
3319        assert!(
3320            !mob_definition_may_use_image_generation(&definition),
3321            "inline profiles should not wire the image substrate unless a profile opts in"
3322        );
3323    }
3324
3325    #[test]
3326    fn image_generation_substrate_follows_profile_tool_config() {
3327        let definition = meerkat_mob::MobDefinition::from_toml(
3328            r#"
3329[mob]
3330id = "test"
3331
3332[profiles.commander]
3333model = "gpt-5.5"
3334
3335[profiles.commander.tools]
3336builtins = true
3337image_generation = true
3338
3339[profiles.investigator]
3340model = "gpt-5.5"
3341
3342[profiles.investigator.tools]
3343builtins = true
3344image_generation = false
3345"#,
3346        )
3347        .unwrap_or_else(|e| panic!("{e}"));
3348
3349        let commander = definition.profiles["commander"].as_inline().unwrap();
3350        let investigator = definition.profiles["investigator"].as_inline().unwrap();
3351        assert!(commander.tools.image_generation);
3352        assert!(!investigator.tools.image_generation);
3353        assert!(
3354            mob_definition_may_use_image_generation(&definition),
3355            "one opt-in profile is enough to wire substrate; Meerkat gates visibility per profile"
3356        );
3357    }
3358
3359    #[test]
3360    fn image_generation_profiles_can_disable_builtins_with_meerkat_062() {
3361        let definition = meerkat_mob::MobDefinition::from_toml(
3362            r#"
3363[mob]
3364id = "test"
3365
3366[profiles.commander]
3367model = "gpt-5.5"
3368
3369[profiles.commander.tools]
3370builtins = false
3371image_generation = true
3372"#,
3373        )
3374        .unwrap_or_else(|e| panic!("{e}"));
3375
3376        let commander = definition.profiles["commander"].as_inline().unwrap();
3377        assert!(!commander.tools.builtins);
3378        assert!(commander.tools.image_generation);
3379        assert!(
3380            mob_definition_may_use_image_generation(&definition),
3381            "image generation now has its own Meerkat tool gate"
3382        );
3383    }
3384
3385    #[test]
3386    fn image_generation_substrate_is_conservative_for_realm_profile_refs() {
3387        let definition = meerkat_mob::MobDefinition::from_toml(
3388            r#"
3389[mob]
3390id = "test"
3391
3392[profiles.worker]
3393realm_profile = "worker-v2"
3394"#,
3395        )
3396        .unwrap_or_else(|e| panic!("{e}"));
3397
3398        assert!(
3399            mob_definition_may_use_image_generation(&definition),
3400            "realm profiles resolve at spawn time, so MobKit wires substrate and lets Meerkat enforce profile policy"
3401        );
3402    }
3403
3404    #[test]
3405    fn sanitize_llm_request_drops_replay_unsafe_server_tool_blocks() {
3406        let request = meerkat_client::LlmRequest::new(
3407            "gpt-5.5",
3408            vec![meerkat_core::Message::BlockAssistant(
3409                meerkat_core::BlockAssistantMessage::new(
3410                    vec![
3411                        meerkat_core::AssistantBlock::Text {
3412                            text: "done".to_string(),
3413                            meta: None,
3414                        },
3415                        meerkat_core::AssistantBlock::ServerToolContent {
3416                            id: Some("ws-stream".to_string()),
3417                            name: "web_search".to_string(),
3418                            content: serde_json::json!({
3419                                "type": "response.web_search_call.searching",
3420                                "item_id": "ws_123"
3421                            }),
3422                            meta: None,
3423                        },
3424                        meerkat_core::AssistantBlock::ServerToolContent {
3425                            id: Some("ws_123".to_string()),
3426                            name: "web_search_call".to_string(),
3427                            content: serde_json::json!({
3428                                "type": "web_search_call",
3429                                "id": "ws_123",
3430                                "status": "completed"
3431                            }),
3432                            meta: None,
3433                        },
3434                        meerkat_core::AssistantBlock::ServerToolContent {
3435                            id: None,
3436                            name: "web_search_annotations".to_string(),
3437                            content: serde_json::json!({
3438                                "type": "message_annotations",
3439                                "annotations": []
3440                            }),
3441                            meta: None,
3442                        },
3443                    ],
3444                    meerkat_core::StopReason::EndTurn,
3445                ),
3446            )],
3447        );
3448
3449        let sanitized = sanitize_llm_request_for_stateless_replay(&request);
3450        let meerkat_core::Message::BlockAssistant(assistant) = &sanitized.messages[0] else {
3451            panic!("expected block assistant");
3452        };
3453
3454        assert_eq!(assistant.blocks.len(), 2);
3455        assert!(matches!(
3456            assistant.blocks[0],
3457            meerkat_core::AssistantBlock::Text { .. }
3458        ));
3459        assert!(matches!(
3460            assistant.blocks[1],
3461            meerkat_core::AssistantBlock::ServerToolContent { ref name, .. }
3462                if name == "web_search_call"
3463        ));
3464    }
3465
3466    #[test]
3467    fn sanitize_llm_request_preserves_generated_images_for_meerkat_062() {
3468        let request = meerkat_client::LlmRequest::new(
3469            "gpt-5.5",
3470            vec![meerkat_core::Message::BlockAssistant(
3471                meerkat_core::BlockAssistantMessage::new(
3472                    vec![
3473                        meerkat_core::AssistantBlock::Text {
3474                            text: "visible".to_string(),
3475                            meta: None,
3476                        },
3477                        generated_image_block_for_test(),
3478                    ],
3479                    meerkat_core::StopReason::EndTurn,
3480                ),
3481            )],
3482        );
3483
3484        let sanitized = sanitize_llm_request_for_stateless_replay(&request);
3485
3486        let meerkat_core::Message::BlockAssistant(original_assistant) = &request.messages[0] else {
3487            panic!("expected original block assistant");
3488        };
3489        assert!(
3490            original_assistant
3491                .blocks
3492                .iter()
3493                .any(|block| matches!(block, meerkat_core::AssistantBlock::Image { .. })),
3494            "request-view sanitization must not rewrite canonical caller-owned messages"
3495        );
3496
3497        let meerkat_core::Message::BlockAssistant(sanitized_assistant) = &sanitized.messages[0]
3498        else {
3499            panic!("expected sanitized block assistant");
3500        };
3501        assert!(
3502            sanitized_assistant
3503                .blocks
3504                .iter()
3505                .any(|block| matches!(block, meerkat_core::AssistantBlock::Image { .. })),
3506            "Meerkat 0.6.2 owns provider replay projection for generated images"
3507        );
3508    }
3509
3510    #[derive(Default)]
3511    struct CapturingLlmClient {
3512        projected_messages: std::sync::Mutex<Vec<meerkat_core::Message>>,
3513    }
3514
3515    #[async_trait]
3516    impl LlmClient for CapturingLlmClient {
3517        fn project_replay_messages(
3518            &self,
3519            messages: &[meerkat_core::Message],
3520        ) -> Result<Vec<meerkat_core::Message>, meerkat_client::LlmError> {
3521            *self
3522                .projected_messages
3523                .lock()
3524                .unwrap_or_else(std::sync::PoisonError::into_inner) = messages.to_vec();
3525            Ok(messages.to_vec())
3526        }
3527
3528        fn stream<'a>(&'a self, _request: &'a LlmRequest) -> LlmStream<'a> {
3529            Box::pin(futures::stream::iter([Ok(
3530                meerkat_client::LlmEvent::Done {
3531                    outcome: meerkat_client::LlmDoneOutcome::Success {
3532                        stop_reason: meerkat_core::StopReason::EndTurn,
3533                    },
3534                },
3535            )]))
3536        }
3537
3538        fn provider(&self) -> &'static str {
3539            "openai"
3540        }
3541
3542        async fn health_check(&self) -> Result<(), meerkat_client::LlmError> {
3543            Ok(())
3544        }
3545    }
3546
3547    #[test]
3548    fn replay_sanitizing_llm_client_delegates_provider_projection() {
3549        let capture = Arc::new(CapturingLlmClient::default());
3550        let inner: Arc<dyn LlmClient> = capture.clone();
3551        let wrapped = ReplaySanitizingLlmClient::new(inner);
3552        let messages = vec![meerkat_core::Message::BlockAssistant(
3553            meerkat_core::BlockAssistantMessage::new(
3554                vec![
3555                    meerkat_core::AssistantBlock::Text {
3556                        text: "visible".to_string(),
3557                        meta: None,
3558                    },
3559                    meerkat_core::AssistantBlock::ServerToolContent {
3560                        id: Some("ws-stream".to_string()),
3561                        name: "web_search".to_string(),
3562                        content: serde_json::json!({
3563                            "type": "response.web_search_call.searching",
3564                            "item_id": "ws_123"
3565                        }),
3566                        meta: None,
3567                    },
3568                ],
3569                meerkat_core::StopReason::EndTurn,
3570            ),
3571        )];
3572
3573        let projected = wrapped
3574            .project_replay_messages(&messages)
3575            .expect("wrapped client should delegate provider projection");
3576
3577        let seen = capture
3578            .projected_messages
3579            .lock()
3580            .unwrap_or_else(std::sync::PoisonError::into_inner)
3581            .clone();
3582        let meerkat_core::Message::BlockAssistant(assistant) = &seen[0] else {
3583            panic!("expected block assistant");
3584        };
3585        assert_eq!(
3586            assistant.blocks.len(),
3587            1,
3588            "MobKit sanitization must happen before Meerkat provider projection"
3589        );
3590        assert!(matches!(
3591            assistant.blocks[0],
3592            meerkat_core::AssistantBlock::Text { .. }
3593        ));
3594        assert_eq!(
3595            serde_json::to_value(&projected).expect("projected messages serialize"),
3596            serde_json::to_value(&seen).expect("seen messages serialize")
3597        );
3598    }
3599
3600    #[derive(Default)]
3601    struct CapturingAgentLlmClient {
3602        seen_messages: std::sync::Mutex<Vec<meerkat_core::Message>>,
3603    }
3604
3605    #[async_trait]
3606    impl meerkat_core::AgentLlmClient for CapturingAgentLlmClient {
3607        async fn stream_response(
3608            &self,
3609            messages: &[meerkat_core::Message],
3610            _tools: &[Arc<meerkat_core::ToolDef>],
3611            _max_tokens: u32,
3612            _temperature: Option<f32>,
3613            _provider_params: Option<
3614                &meerkat_core::lifecycle::run_primitive::ProviderParamsOverride,
3615            >,
3616        ) -> Result<meerkat_core::agent::LlmStreamResult, meerkat_core::AgentError> {
3617            *self
3618                .seen_messages
3619                .lock()
3620                .unwrap_or_else(std::sync::PoisonError::into_inner) = messages.to_vec();
3621            Ok(meerkat_core::agent::LlmStreamResult::new(
3622                Vec::new(),
3623                meerkat_core::StopReason::EndTurn,
3624                meerkat_core::Usage::default(),
3625            ))
3626        }
3627
3628        fn provider(&self) -> &'static str {
3629            "openai"
3630        }
3631
3632        fn model(&self) -> &'static str {
3633            "gpt-5.5"
3634        }
3635    }
3636
3637    #[tokio::test]
3638    async fn sanitize_agent_llm_client_drops_replay_unsafe_server_tool_blocks() {
3639        let capture = Arc::new(CapturingAgentLlmClient::default());
3640        let inner: Arc<dyn meerkat_core::AgentLlmClient> = capture.clone();
3641        let wrapped = ReplaySanitizingAgentLlmClient::wrap(inner);
3642        let messages = vec![meerkat_core::Message::BlockAssistant(
3643            meerkat_core::BlockAssistantMessage::new(
3644                vec![
3645                    meerkat_core::AssistantBlock::Text {
3646                        text: "visible".to_string(),
3647                        meta: None,
3648                    },
3649                    meerkat_core::AssistantBlock::ServerToolContent {
3650                        id: Some("ws-stream".to_string()),
3651                        name: "web_search".to_string(),
3652                        content: serde_json::json!({
3653                            "type": "response.web_search_call.searching",
3654                            "item_id": "ws_123"
3655                        }),
3656                        meta: None,
3657                    },
3658                    meerkat_core::AssistantBlock::ServerToolContent {
3659                        id: Some("ok".to_string()),
3660                        name: "web_search_call".to_string(),
3661                        content: serde_json::json!({
3662                            "type": "web_search_call",
3663                            "id": "ws_123",
3664                            "status": "completed"
3665                        }),
3666                        meta: None,
3667                    },
3668                ],
3669                meerkat_core::StopReason::EndTurn,
3670            ),
3671        )];
3672        let tools: Vec<Arc<meerkat_core::ToolDef>> = Vec::new();
3673
3674        wrapped
3675            .stream_response(&messages, &tools, 512, None, None)
3676            .await
3677            .expect("wrapped client should delegate");
3678
3679        let seen = capture
3680            .seen_messages
3681            .lock()
3682            .unwrap_or_else(std::sync::PoisonError::into_inner)
3683            .clone();
3684        let meerkat_core::Message::BlockAssistant(assistant) = &seen[0] else {
3685            panic!("expected block assistant");
3686        };
3687        assert_eq!(assistant.blocks.len(), 2);
3688        assert!(matches!(
3689            assistant.blocks[0],
3690            meerkat_core::AssistantBlock::Text { .. }
3691        ));
3692        assert!(matches!(
3693            assistant.blocks[1],
3694            meerkat_core::AssistantBlock::ServerToolContent { ref name, .. }
3695                if name == "web_search_call"
3696        ));
3697    }
3698
3699    fn generated_image_block_for_test() -> meerkat_core::AssistantBlock {
3700        serde_json::from_value(serde_json::json!({
3701            "block_type": "image",
3702            "data": {
3703                "image_id": "00000000-0000-0000-0000-000000000051",
3704                "blob_ref": {
3705                    "blob_id": "sha256:test-generated-image",
3706                    "media_type": "image/png"
3707                },
3708                "media_type": "image/png",
3709                "width": 1024,
3710                "height": 1024,
3711                "revised_prompt": { "disposition": "not_requested" },
3712                "meta": { "provider": "not_emitted" }
3713            }
3714        }))
3715        .expect("test image block should deserialize")
3716    }
3717
3718    #[test]
3719    fn sanitize_message_preserves_assistant_image_blocks() {
3720        let message =
3721            meerkat_core::Message::BlockAssistant(meerkat_core::BlockAssistantMessage::new(
3722                vec![
3723                    meerkat_core::AssistantBlock::Text {
3724                        text: "Here is the image.".to_string(),
3725                        meta: None,
3726                    },
3727                    generated_image_block_for_test(),
3728                ],
3729                meerkat_core::StopReason::EndTurn,
3730            ));
3731
3732        let sanitized = sanitize_message_for_stateless_replay(message);
3733        let meerkat_core::Message::BlockAssistant(assistant) = sanitized else {
3734            panic!("expected block assistant");
3735        };
3736
3737        assert_eq!(assistant.blocks.len(), 2);
3738        assert!(matches!(
3739            assistant.blocks[0],
3740            meerkat_core::AssistantBlock::Text { .. }
3741        ));
3742        assert!(
3743            matches!(
3744                assistant.blocks[1],
3745                meerkat_core::AssistantBlock::Image { .. }
3746            ),
3747            "generated image blocks should reach Meerkat's provider projection"
3748        );
3749    }
3750
3751    /// Verify that persistent_with_hook wraps the session service with
3752    /// PreBuildMobSessionService (hook is Some).
3753    #[test]
3754    fn persistent_with_hook_wraps_session_service() {
3755        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3756        let store_path = dir.path().to_path_buf();
3757        let Ok(sqlite) = meerkat_store::SqliteSessionStore::open(store_path.join("sessions.db"))
3758        else {
3759            panic!("failed to open sqlite session store");
3760        };
3761        let session_store: Arc<dyn SessionStore> = Arc::new(sqlite);
3762        let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
3763            panic!("failed to parse minimal mob definition");
3764        };
3765
3766        let hook_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
3767        let hook_called_clone = hook_called.clone();
3768
3769        let spec = MobBootstrapSpec::persistent_with_hook(
3770            definition,
3771            meerkat_mob::MobStorage::in_memory(),
3772            store_path.clone(),
3773            4,
3774            session_store,
3775            move |_req: &mut CreateSessionRequest| {
3776                hook_called_clone.store(true, std::sync::atomic::Ordering::Relaxed);
3777                Box::pin(async { Ok(()) })
3778            },
3779        );
3780
3781        // The session service is wired with a SqliteRuntimeStore so that
3782        // both `load_persisted_session` (resume) and
3783        // `load_persisted_session_for_control` (archive/retire) succeed
3784        // across process restart. spec.runtime_adapter is also set
3785        // explicitly so the bootstrap path uses the same store. See
3786        // `persistent_bootstrap_uses_sqlite_runtime_store` for the full
3787        // regression coverage.
3788        assert!(
3789            spec.runtime_adapter.is_some(),
3790            "persistent_with_hook must provide a runtime adapter via spec.runtime_adapter"
3791        );
3792        assert!(
3793            spec.session_service.runtime_adapter().is_some(),
3794            "session service must own a runtime_store so archive/retire don't \
3795             hit the store-only-projection rejection in meerkat-session"
3796        );
3797        assert!(
3798            store_path.join("runtime.sqlite").exists(),
3799            "persistent_inner must open a SqliteRuntimeStore at <store_path>/runtime.sqlite"
3800        );
3801
3802        // The hook isn't called until create_session — verify the wrapper exists
3803        // by checking the service is not the raw PersistentSessionService (it
3804        // wraps it). We can't call create_session without a full LLM stack, but
3805        // we can verify the hook_called flag is false (not prematurely invoked).
3806        assert!(
3807            !hook_called.load(std::sync::atomic::Ordering::Relaxed),
3808            "hook must not be called before create_session"
3809        );
3810    }
3811
3812    /// Verify that ephemeral_with_hook accepts and stores a hook.
3813    #[test]
3814    fn ephemeral_with_hook_creates_spec() {
3815        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3816        let store_path = dir.path().to_path_buf();
3817        let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
3818            panic!("failed to parse minimal mob definition");
3819        };
3820
3821        let hook_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
3822        let hook_called_clone = hook_called.clone();
3823
3824        let spec = MobBootstrapSpec::ephemeral_with_hook(
3825            definition,
3826            meerkat_mob::MobStorage::in_memory(),
3827            store_path,
3828            4,
3829            None,
3830            move |_req: &mut CreateSessionRequest| {
3831                hook_called_clone.store(true, std::sync::atomic::Ordering::Relaxed);
3832                Box::pin(async { Ok(()) })
3833            },
3834        );
3835
3836        // Ephemeral specs don't have a runtime adapter.
3837        assert!(spec.runtime_adapter.is_none());
3838
3839        // Hook not yet called.
3840        assert!(
3841            !hook_called.load(std::sync::atomic::Ordering::Relaxed),
3842            "hook must not be called before create_session"
3843        );
3844    }
3845
3846    /// Verify that PreBuildMobSessionService applies the hook to the request
3847    /// in create_session. The hook mutates the model and adds labels; we
3848    /// verify by capturing the state inside the hook itself.
3849    #[tokio::test]
3850    async fn pre_build_hook_mutates_create_session_request() {
3851        use std::sync::Mutex;
3852
3853        let captured = Arc::new(Mutex::new(None::<(String, Option<String>)>));
3854        let captured_clone = captured.clone();
3855
3856        // Build a minimal ephemeral service as the inner.
3857        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3858        let factory = AgentFactory::new(dir.path()).builtins(true);
3859        let config = Config::default();
3860        let builder = FactoryAgentBuilder::new(factory, config);
3861        let inner: Arc<dyn MobSessionService> =
3862            Arc::new(meerkat_session::EphemeralSessionService::new(builder, 4));
3863
3864        // Hook that mutates and captures the post-mutation state.
3865        let hook: PreBuildHook = Arc::new(move |req: &mut CreateSessionRequest| {
3866            req.model = "hooked-model".to_string();
3867            req.system_prompt = Some("injected-prompt".to_string());
3868            let labels = req.labels.get_or_insert_with(Default::default);
3869            labels.insert("hook_label".to_string(), "hook_value".to_string());
3870            // Capture to prove the hook ran and mutated the request.
3871            let mut lock = captured_clone
3872                .lock()
3873                .unwrap_or_else(std::sync::PoisonError::into_inner);
3874            *lock = Some((req.model.clone(), req.system_prompt.clone()));
3875            Box::pin(async { Ok(()) })
3876        });
3877        let wrapped = PreBuildMobSessionService {
3878            inner,
3879            hook,
3880            after_create_hook: None,
3881            runtime_adapter_override: None,
3882        };
3883
3884        let req = CreateSessionRequest {
3885            model: "original-model".to_string(),
3886            prompt: meerkat_core::ContentInput::Text("test".to_string()),
3887            render_metadata: None,
3888            system_prompt: None,
3889            max_tokens: None,
3890            event_tx: None,
3891            skill_references: None,
3892            initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
3893            build: None,
3894            labels: None,
3895            deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
3896        };
3897
3898        // create_session will fail (no LLM) but the hook runs first.
3899        let _ = meerkat_core::service::SessionService::create_session(&wrapped, req).await;
3900
3901        let (model, prompt) = captured
3902            .lock()
3903            .unwrap_or_else(std::sync::PoisonError::into_inner)
3904            .clone()
3905            .expect("hook must have been called");
3906        assert_eq!(model, "hooked-model", "hook must mutate the model");
3907        assert_eq!(
3908            prompt.as_deref(),
3909            Some("injected-prompt"),
3910            "hook must set the system prompt"
3911        );
3912    }
3913
3914    #[derive(Default)]
3915    struct ForwardingProbe {
3916        calls: Mutex<Vec<&'static str>>,
3917    }
3918
3919    impl ForwardingProbe {
3920        fn record(&self, call: &'static str) {
3921            self.calls
3922                .lock()
3923                .unwrap_or_else(std::sync::PoisonError::into_inner)
3924                .push(call);
3925        }
3926
3927        fn calls(&self) -> Vec<&'static str> {
3928            self.calls
3929                .lock()
3930                .unwrap_or_else(std::sync::PoisonError::into_inner)
3931                .clone()
3932        }
3933    }
3934
3935    #[async_trait]
3936    impl meerkat_core::service::SessionService for ForwardingProbe {
3937        async fn create_session(
3938            &self,
3939            _req: CreateSessionRequest,
3940        ) -> Result<meerkat_core::types::RunResult, SessionError> {
3941            Err(SessionError::Unsupported("create_session".to_string()))
3942        }
3943
3944        async fn start_turn(
3945            &self,
3946            _id: &meerkat_core::types::SessionId,
3947            _req: meerkat_core::service::StartTurnRequest,
3948        ) -> Result<meerkat_core::types::RunResult, SessionError> {
3949            Err(SessionError::Unsupported("start_turn".to_string()))
3950        }
3951
3952        async fn interrupt(
3953            &self,
3954            _id: &meerkat_core::types::SessionId,
3955        ) -> Result<(), SessionError> {
3956            self.record("interrupt");
3957            Ok(())
3958        }
3959
3960        async fn read(
3961            &self,
3962            id: &meerkat_core::types::SessionId,
3963        ) -> Result<meerkat_core::service::SessionView, SessionError> {
3964            Err(SessionError::NotFound { id: id.clone() })
3965        }
3966
3967        async fn list(
3968            &self,
3969            _query: meerkat_core::service::SessionQuery,
3970        ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
3971            Ok(Vec::new())
3972        }
3973
3974        async fn archive(&self, _id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
3975            self.record("archive");
3976            Ok(())
3977        }
3978    }
3979
3980    #[async_trait]
3981    impl meerkat_core::service::SessionServiceCommsExt for ForwardingProbe {}
3982
3983    #[async_trait]
3984    impl meerkat_core::service::SessionServiceControlExt for ForwardingProbe {
3985        async fn append_system_context(
3986            &self,
3987            _id: &meerkat_core::types::SessionId,
3988            _req: meerkat_core::service::AppendSystemContextRequest,
3989        ) -> Result<
3990            meerkat_core::service::AppendSystemContextResult,
3991            meerkat_core::service::SessionControlError,
3992        > {
3993            self.record("append_system_context");
3994            Ok(meerkat_core::service::AppendSystemContextResult {
3995                status: meerkat_core::service::AppendSystemContextStatus::Applied,
3996            })
3997        }
3998
3999        async fn stage_tool_results(
4000            &self,
4001            _id: &meerkat_core::types::SessionId,
4002            _req: meerkat_core::service::StageToolResultsRequest,
4003        ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
4004            self.record("stage_tool_results");
4005            Ok(meerkat_core::service::StageToolResultsResult {
4006                accepted_result_count: 7,
4007            })
4008        }
4009    }
4010
4011    #[async_trait]
4012    impl meerkat_core::service::SessionServiceHistoryExt for ForwardingProbe {
4013        async fn read_history(
4014            &self,
4015            id: &meerkat_core::types::SessionId,
4016            _query: meerkat_core::service::SessionHistoryQuery,
4017        ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
4018            Err(SessionError::NotFound { id: id.clone() })
4019        }
4020    }
4021
4022    #[async_trait]
4023    impl MobSessionService for ForwardingProbe {
4024        fn supports_persistent_sessions(&self) -> bool {
4025            true
4026        }
4027
4028        fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
4029            Some(Arc::new(meerkat_runtime::MeerkatMachine::ephemeral()))
4030        }
4031
4032        async fn archive_with_mob_lifecycle_authority(
4033            &self,
4034            _session_id: &meerkat_core::types::SessionId,
4035        ) -> Result<(), SessionError> {
4036            self.record("archive_with_mob_lifecycle_authority");
4037            Ok(())
4038        }
4039
4040        async fn stage_runtime_system_context_for_active_turn(
4041            &self,
4042            _session_id: &meerkat_core::types::SessionId,
4043            _expected_run_id: &meerkat_core::lifecycle::RunId,
4044            _appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
4045        ) -> Result<Option<Vec<u8>>, SessionError> {
4046            self.record("stage_runtime_system_context_for_active_turn");
4047            Ok(Some(b"snapshot".to_vec()))
4048        }
4049
4050        async fn discard_runtime_system_context_for_active_turn(
4051            &self,
4052            _session_id: &meerkat_core::types::SessionId,
4053            _expected_run_id: &meerkat_core::lifecycle::RunId,
4054            _idempotency_keys: Vec<String>,
4055        ) -> Result<(), SessionError> {
4056            self.record("discard_runtime_system_context_for_active_turn");
4057            Ok(())
4058        }
4059
4060        async fn active_turn_system_context_boundary_available(
4061            &self,
4062            _session_id: &meerkat_core::types::SessionId,
4063        ) -> Result<Option<bool>, SessionError> {
4064            self.record("active_turn_system_context_boundary_available");
4065            Ok(Some(true))
4066        }
4067    }
4068
4069    #[tokio::test]
4070    async fn pre_build_wrapper_forwards_mob_authority_and_control_extensions() {
4071        let probe = Arc::new(ForwardingProbe::default());
4072        let inner: Arc<dyn MobSessionService> = probe.clone();
4073        let wrapped = PreBuildMobSessionService {
4074            inner,
4075            hook: no_op_pre_build_hook(),
4076            after_create_hook: None,
4077            runtime_adapter_override: Some(Arc::new(meerkat_runtime::MeerkatMachine::ephemeral())),
4078        };
4079        let session_id = meerkat_core::types::SessionId::new();
4080
4081        MobSessionService::archive_with_mob_lifecycle_authority(&wrapped, &session_id)
4082            .await
4083            .expect("archive_with_mob_lifecycle_authority should forward to inner service");
4084        let staged = meerkat_core::service::SessionServiceControlExt::stage_tool_results(
4085            &wrapped,
4086            &session_id,
4087            meerkat_core::service::StageToolResultsRequest {
4088                results: Vec::new(),
4089            },
4090        )
4091        .await
4092        .expect("stage_tool_results should forward to inner service");
4093
4094        assert_eq!(staged.accepted_result_count, 7);
4095        let boundary_available = wrapped
4096            .active_turn_system_context_boundary_available(&session_id)
4097            .await
4098            .expect("active-turn boundary probe should forward");
4099        assert_eq!(boundary_available, Some(true));
4100        let snapshot = wrapped
4101            .stage_runtime_system_context_for_active_turn(
4102                &session_id,
4103                &meerkat_core::lifecycle::RunId::new(),
4104                vec![meerkat_core::session::PendingSystemContextAppend {
4105                    text: "steer".to_string(),
4106                    source: Some("test".to_string()),
4107                    idempotency_key: Some("test".to_string()),
4108                    accepted_at: meerkat_core::time_compat::SystemTime::now(),
4109                }],
4110            )
4111            .await
4112            .expect("active-turn staging should forward");
4113        assert_eq!(snapshot.as_deref(), Some(&b"snapshot"[..]));
4114        wrapped
4115            .discard_runtime_system_context_for_active_turn(
4116                &session_id,
4117                &meerkat_core::lifecycle::RunId::new(),
4118                vec!["test".to_string()],
4119            )
4120            .await
4121            .expect("active-turn rollback should forward");
4122        assert_eq!(
4123            probe.calls(),
4124            vec![
4125                "archive_with_mob_lifecycle_authority",
4126                "stage_tool_results",
4127                "active_turn_system_context_boundary_available",
4128                "stage_runtime_system_context_for_active_turn",
4129                "discard_runtime_system_context_for_active_turn",
4130            ]
4131        );
4132    }
4133
4134    /// Regression for two compounding bugs in the persistent wiring:
4135    ///
4136    /// 1. **0.6.0**: `persistent_inner` handed the
4137    ///    `PersistentSessionService` an `InMemoryRuntimeStore`. With the
4138    ///    runtime_store path active the `StoreCheckpointer` was disabled
4139    ///    (it's gated on `runtime_store.is_none()`), and the in-memory
4140    ///    store didn't survive process restart. Resume raised "missing
4141    ///    durable session snapshot for '<sid>'".
4142    ///
4143    /// 2. **0.6.1**: switching the session service to `runtime_store=None`
4144    ///    re-enabled the checkpointer (fixing #1) but broke archive/retire,
4145    ///    because `load_persisted_session_for_control` rejects mutations
4146    ///    when runtime_store is None and the session exists in the store
4147    ///    (the "store-only compatibility projection" error from
4148    ///    meerkat-session/src/persistent.rs:786).
4149    ///
4150    /// The 0.6.3 fix uses a **persistent** SqliteRuntimeStore — durable
4151    /// across restart AND control-op authoritative — at
4152    /// `<store_path>/runtime.sqlite`.
4153    #[test]
4154    fn persistent_bootstrap_uses_sqlite_runtime_store() {
4155        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4156        let store_path = dir.path().to_path_buf();
4157        let Ok(sqlite) = meerkat_store::SqliteSessionStore::open(store_path.join("sessions.db"))
4158        else {
4159            panic!("failed to open sqlite session store");
4160        };
4161        let session_store: Arc<dyn SessionStore> = Arc::new(sqlite);
4162        let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
4163            panic!("failed to parse minimal mob definition");
4164        };
4165        let spec = MobBootstrapSpec::persistent(
4166            definition,
4167            meerkat_mob::MobStorage::in_memory(),
4168            store_path.clone(),
4169            4,
4170            session_store,
4171        );
4172        assert!(
4173            spec.runtime_adapter.is_some(),
4174            "persistent bootstrap must provide its own runtime adapter via spec.runtime_adapter"
4175        );
4176        assert!(
4177            spec.session_service.runtime_adapter().is_some(),
4178            "session service must own a runtime_store so archive/retire don't \
4179             hit the store-only-projection rejection"
4180        );
4181        assert!(
4182            store_path.join("runtime.sqlite").exists(),
4183            "persistent_inner must open a SqliteRuntimeStore at <store_path>/runtime.sqlite"
4184        );
4185    }
4186
4187    /// Ephemeral counterpart: runtime-backed ephemeral builds must use a
4188    /// single in-memory machine authority for session service, comms, and
4189    /// image-generation tooling.
4190    #[test]
4191    fn ephemeral_runtime_backed_uses_session_service_runtime_adapter() {
4192        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4193        let store_path = dir.path().to_path_buf();
4194        let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
4195            panic!("failed to parse minimal mob definition");
4196        };
4197        let spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4198            definition,
4199            meerkat_mob::MobStorage::in_memory(),
4200            store_path,
4201            4,
4202            None,
4203            None,
4204            None,
4205            CapabilityFlags::default(),
4206            None,
4207            None,
4208        );
4209        assert!(
4210            spec.runtime_adapter.is_some(),
4211            "ephemeral_runtime_backed_inner must expose the shared runtime authority"
4212        );
4213        assert!(
4214            spec.session_service.runtime_adapter().is_some(),
4215            "session service must still expose a runtime adapter so autonomous-host comms can wire"
4216        );
4217    }
4218
4219    #[tokio::test]
4220    async fn agent_mob_tools_expose_definition_profiles_as_realm_profiles() {
4221        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4222        let store_path = dir.path().to_path_buf();
4223        let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
4224            "[mob]\nid = \"test\"\n\n[profiles.investigation-worker]\nmodel = \"gpt-5.5\"\n[profiles.investigation-worker.tools]\ncomms = true\nmob = true\n\n[profiles.person-worker]\nmodel = \"gpt-5.5\"\n[profiles.person-worker.tools]\ncomms = true\n",
4225        ) else {
4226            panic!("failed to parse mob definition with worker profiles");
4227        };
4228
4229        let spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4230            definition,
4231            meerkat_mob::MobStorage::in_memory(),
4232            store_path,
4233            4,
4234            None,
4235            None,
4236            None,
4237            CapabilityFlags::default(),
4238            None,
4239            None,
4240        );
4241        let state = spec
4242            .agent_mob_mcp_state
4243            .expect("agent mob MCP state should be installed");
4244
4245        let profiles = state
4246            .realm_profile_list()
4247            .await
4248            .expect("definition profiles should list through agent mob tools");
4249        let names = profiles
4250            .iter()
4251            .map(|profile| profile.name.as_str())
4252            .collect::<Vec<_>>();
4253        assert!(
4254            names.contains(&"investigation-worker"),
4255            "definition profiles must be visible to mob_profile_list so agents can create mobs that reference them"
4256        );
4257        assert!(names.contains(&"person-worker"));
4258
4259        let worker = state
4260            .realm_profile_get("investigation-worker")
4261            .await
4262            .expect("definition profile lookup should succeed")
4263            .expect("definition profile should exist");
4264        assert_eq!(worker.profile.model, "gpt-5.5");
4265        assert_eq!(
4266            worker.revision, 0,
4267            "definition-backed profiles are immutable runtime seeds, not persisted realm revisions"
4268        );
4269    }
4270
4271    #[tokio::test]
4272    async fn agent_created_mobs_can_spawn_definition_seeded_realm_profiles() {
4273        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4274        let store_path = dir.path().to_path_buf();
4275        let Ok(parent_definition) = meerkat_mob::MobDefinition::from_toml(
4276            "[mob]\nid = \"parent\"\n\n[profiles.investigation-worker]\nmodel = \"gpt-5.5\"\n[profiles.investigation-worker.tools]\ncomms = true\nmob = true\n",
4277        ) else {
4278            panic!("failed to parse parent mob definition");
4279        };
4280
4281        let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4282            parent_definition,
4283            meerkat_mob::MobStorage::in_memory(),
4284            store_path,
4285            4,
4286            None,
4287            None,
4288            None,
4289            CapabilityFlags::default(),
4290            None,
4291            None,
4292        );
4293        spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
4294        let runtime = MobRuntime::bootstrap(spec)
4295            .await
4296            .unwrap_or_else(|e| panic!("{e}"));
4297        let state = runtime
4298            .agent_mob_mcp_state
4299            .clone()
4300            .expect("agent mob MCP state should be installed");
4301        let Ok(child_definition) = meerkat_mob::MobDefinition::from_toml(
4302            "[mob]\nid = \"child\"\n\n[profiles.investigation-worker]\nrealm_profile = \"investigation-worker\"\n",
4303        ) else {
4304            panic!("failed to parse child mob definition");
4305        };
4306
4307        let mob_id = state
4308            .mob_create_definition(child_definition)
4309            .await
4310            .expect("child mob should be created");
4311        state
4312            .mob_spawn_spec(
4313                &mob_id,
4314                SpawnMemberSpec::new(
4315                    ProfileName::from("investigation-worker"),
4316                    meerkat_mob::AgentIdentity::from("investigation-worker:one"),
4317                ),
4318            )
4319            .await
4320            .expect("created mob should resolve definition-seeded realm profile at spawn time");
4321    }
4322
4323    #[tokio::test]
4324    async fn ephemeral_runtime_backed_custom_session_store_persists_created_session() {
4325        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4326        let store_path = dir.path().to_path_buf();
4327        let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
4328            "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\n[profiles.worker.tools]\ncomms = true\n",
4329        ) else {
4330            panic!("failed to parse minimal mob definition");
4331        };
4332        let custom_store: Arc<dyn SessionStore> = Arc::new(meerkat_store::MemoryStore::new());
4333        let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4334            definition,
4335            meerkat_mob::MobStorage::in_memory(),
4336            store_path,
4337            4,
4338            Some(custom_store.clone()),
4339            None,
4340            None,
4341            CapabilityFlags::default(),
4342            None,
4343            None,
4344        );
4345        spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
4346
4347        let runtime = MobRuntime::bootstrap(spec)
4348            .await
4349            .unwrap_or_else(|e| panic!("{e}"));
4350        let mid = meerkat_mob::ids::MeerkatId::from("worker:one");
4351        runtime
4352            .handle
4353            .spawn_spec(SpawnMemberSpec::new(
4354                ProfileName::from("worker"),
4355                mid.clone(),
4356            ))
4357            .await
4358            .unwrap_or_else(|e| panic!("{e}"));
4359        let session_id = runtime
4360            .handle
4361            .resolve_bridge_session_id(&mid)
4362            .await
4363            .unwrap_or_else(|| panic!("spawned worker has no bridge session id"));
4364
4365        let stored = custom_store
4366            .load(&session_id)
4367            .await
4368            .unwrap_or_else(|e| panic!("{e}"));
4369        assert!(
4370            stored.is_some(),
4371            "ephemeral runtime-backed builds with a custom store must persist through that store"
4372        );
4373    }
4374
4375    #[tokio::test]
4376    async fn ephemeral_runtime_backed_custom_session_store_resumes_after_runtime_restart() {
4377        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4378        let store_path = dir.path().to_path_buf();
4379        let definition_toml = "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\n[profiles.worker.tools]\ncomms = true\n";
4380        let Ok(definition) = meerkat_mob::MobDefinition::from_toml(definition_toml) else {
4381            panic!("failed to parse minimal mob definition");
4382        };
4383        let custom_store: Arc<dyn SessionStore> = Arc::new(meerkat_store::MemoryStore::new());
4384        let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4385            definition,
4386            meerkat_mob::MobStorage::in_memory(),
4387            store_path.clone(),
4388            4,
4389            Some(custom_store.clone()),
4390            None,
4391            None,
4392            CapabilityFlags::default(),
4393            None,
4394            None,
4395        );
4396        spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
4397
4398        let runtime = MobRuntime::bootstrap(spec)
4399            .await
4400            .unwrap_or_else(|e| panic!("{e}"));
4401        let mid = meerkat_mob::ids::MeerkatId::from("worker:one");
4402        runtime
4403            .handle
4404            .spawn_spec(SpawnMemberSpec::new(
4405                ProfileName::from("worker"),
4406                mid.clone(),
4407            ))
4408            .await
4409            .unwrap_or_else(|e| panic!("{e}"));
4410        let session_id = runtime
4411            .handle
4412            .resolve_bridge_session_id(&mid)
4413            .await
4414            .unwrap_or_else(|| panic!("spawned worker has no bridge session id"));
4415        drop(runtime);
4416
4417        let Ok(definition) = meerkat_mob::MobDefinition::from_toml(definition_toml) else {
4418            panic!("failed to parse minimal mob definition");
4419        };
4420        let mut restarted_spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
4421            definition,
4422            meerkat_mob::MobStorage::in_memory(),
4423            store_path,
4424            4,
4425            Some(custom_store),
4426            None,
4427            None,
4428            CapabilityFlags::default(),
4429            None,
4430            None,
4431        );
4432        restarted_spec.options.default_llm_client =
4433            Some(Arc::new(meerkat_client::TestClient::default()));
4434
4435        let restarted = MobRuntime::bootstrap(restarted_spec)
4436            .await
4437            .unwrap_or_else(|e| panic!("{e}"));
4438        let mut resume_spec = SpawnMemberSpec::new(ProfileName::from("worker"), mid.clone());
4439        resume_spec.launch_mode = meerkat_mob::MemberLaunchMode::Resume {
4440            bridge_session_id: session_id.clone(),
4441        };
4442        restarted
4443            .handle
4444            .spawn_spec(resume_spec)
4445            .await
4446            .unwrap_or_else(|e| panic!("resume should load the external session snapshot: {e}"));
4447
4448        let resumed_session_id = restarted
4449            .handle
4450            .resolve_bridge_session_id(&mid)
4451            .await
4452            .unwrap_or_else(|| panic!("resumed worker has no bridge session id"));
4453        assert_eq!(resumed_session_id, session_id);
4454    }
4455
4456    /// Regression: public ephemeral builds without image generation stay on the
4457    /// lighter direct session-service path.
4458    #[test]
4459    fn ephemeral_bootstrap_without_image_generation_stays_direct() {
4460        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4461        let store_path = dir.path().to_path_buf();
4462        let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
4463            "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\nruntime_mode = \"autonomous_host\"\n[profiles.worker.tools]\ncomms = true\n",
4464        ) else {
4465            panic!("failed to parse definition");
4466        };
4467        let spec = MobBootstrapSpec::ephemeral_inner(
4468            definition,
4469            meerkat_mob::MobStorage::in_memory(),
4470            store_path,
4471            4,
4472            None,
4473            None,
4474            CapabilityFlags::default(),
4475            None,
4476            None,
4477        );
4478        assert!(
4479            spec.runtime_adapter.is_none(),
4480            "public ephemeral builds only need a runtime adapter when the definition may use image generation"
4481        );
4482    }
4483
4484    /// Regression: public ephemeral image-generation builds must expose the same
4485    /// runtime adapter through the spec and the session service. The generated
4486    /// image tool consults runtime session/image-operation state by session id,
4487    /// so a fresh, tool-only MeerkatMachine cannot be used here.
4488    #[test]
4489    fn ephemeral_bootstrap_with_image_generation_shares_runtime_adapter() {
4490        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
4491        let store_path = dir.path().to_path_buf();
4492        let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
4493            r#"
4494[mob]
4495id = "test"
4496
4497[profiles.commander]
4498model = "gpt-5.5"
4499
4500[profiles.commander.tools]
4501builtins = true
4502image_generation = true
4503"#,
4504        ) else {
4505            panic!("failed to parse image-generation definition");
4506        };
4507        let spec = MobBootstrapSpec::ephemeral(
4508            definition,
4509            meerkat_mob::MobStorage::in_memory(),
4510            store_path,
4511            4,
4512            None,
4513        );
4514        let spec_adapter = spec
4515            .runtime_adapter
4516            .as_ref()
4517            .expect("image-generation ephemeral builds must expose a runtime adapter");
4518        let service_adapter = spec
4519            .session_service
4520            .runtime_adapter()
4521            .expect("session service must expose the same runtime adapter");
4522        assert!(
4523            spec_adapter.shares_runtime_persistence_with(&service_adapter),
4524            "image-generation tool state and session state must share one runtime authority"
4525        );
4526    }
4527
4528    /// Runtime-owned handling/routing semantics must be stripped before a
4529    /// runtime-applied turn reaches the direct session-service path.
4530    #[test]
4531    fn normalize_runtime_turn_request_strips_runtime_owned_semantics() {
4532        let req = meerkat_core::service::StartTurnRequest {
4533            prompt: meerkat_core::ContentInput::Text("checkpoint".to_string()),
4534            system_prompt: Some("system".to_string()),
4535            event_tx: None,
4536            runtime: meerkat_core::service::StartTurnRuntimeSemantics {
4537                render_metadata: Some(meerkat_core::types::RenderMetadata {
4538                    class: meerkat_core::types::RenderClass::OpsProgress,
4539                    salience: meerkat_core::types::RenderSalience::Urgent,
4540                }),
4541                handling_mode: meerkat_core::types::HandlingMode::Steer,
4542                skill_references: None,
4543                flow_tool_overlay: None,
4544                pre_turn_context_appends: Vec::new(),
4545                typed_turn_appends: Vec::new(),
4546                turn_metadata: None,
4547            },
4548        };
4549
4550        let expected_prompt = req.prompt.clone();
4551        let expected_system_prompt = req.system_prompt.clone();
4552
4553        let normalized = normalize_runtime_turn_request(req);
4554
4555        assert_eq!(
4556            normalized.runtime.handling_mode,
4557            meerkat_core::types::HandlingMode::Queue,
4558            "runtime-applied turns must downgrade Steer before reaching direct session services"
4559        );
4560        assert!(
4561            normalized.runtime.render_metadata.is_none(),
4562            "runtime-owned render metadata must not be forwarded through the direct agent path"
4563        );
4564        assert_eq!(normalized.prompt, expected_prompt);
4565        assert_eq!(normalized.system_prompt, expected_system_prompt);
4566    }
4567
4568    /// SessionCreatedContext must carry model, labels, and optional system_prompt.
4569    #[test]
4570    fn session_created_context_fields() {
4571        let ctx = SessionCreatedContext {
4572            model: "claude-sonnet-4-5".to_string(),
4573            labels: std::collections::BTreeMap::from([(
4574                "agent_type".to_string(),
4575                "lead".to_string(),
4576            )]),
4577            system_prompt: Some("You are a lead agent.".to_string()),
4578        };
4579        assert_eq!(ctx.model, "claude-sonnet-4-5");
4580        assert_eq!(ctx.labels["agent_type"], "lead");
4581        assert_eq!(ctx.system_prompt.as_deref(), Some("You are a lead agent."));
4582    }
4583
4584    /// SessionHook default implementations are no-ops — calling them must not panic.
4585    #[tokio::test]
4586    async fn session_hook_default_impls_are_noop() {
4587        struct EmptyHook;
4588        #[async_trait]
4589        impl SessionHook for EmptyHook {}
4590
4591        let hook = EmptyHook;
4592        let mut req = CreateSessionRequest {
4593            model: "test".to_string(),
4594            prompt: meerkat_core::ContentInput::Text("test".to_string()),
4595            render_metadata: None,
4596            system_prompt: None,
4597            max_tokens: None,
4598            event_tx: None,
4599            skill_references: None,
4600            initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
4601            build: None,
4602            labels: None,
4603            deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
4604        };
4605        // before_create must succeed with default impl.
4606        hook.before_create(&mut req).await.unwrap();
4607        // after_create must not panic.
4608        let ctx = SessionCreatedContext {
4609            model: "test".to_string(),
4610            labels: Default::default(),
4611            system_prompt: None,
4612        };
4613        hook.after_create(&meerkat_core::types::SessionId::new(), &ctx)
4614            .await;
4615    }
4616
4617    /// before_create returning Err must abort (the caller decides how).
4618    #[tokio::test]
4619    async fn session_hook_before_create_can_abort() {
4620        struct AbortHook;
4621        #[async_trait]
4622        impl SessionHook for AbortHook {
4623            async fn before_create(
4624                &self,
4625                _req: &mut CreateSessionRequest,
4626            ) -> Result<(), SessionError> {
4627                Err(SessionError::Unsupported("hook abort".into()))
4628            }
4629        }
4630
4631        let hook = AbortHook;
4632        let mut req = CreateSessionRequest {
4633            model: "test".to_string(),
4634            prompt: meerkat_core::ContentInput::Text("test".to_string()),
4635            render_metadata: None,
4636            system_prompt: None,
4637            max_tokens: None,
4638            event_tx: None,
4639            skill_references: None,
4640            initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
4641            build: None,
4642            labels: None,
4643            deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
4644        };
4645        let result = hook.before_create(&mut req).await;
4646        assert!(result.is_err());
4647    }
4648
4649    /// before_create mutations must be visible in the request.
4650    #[tokio::test]
4651    async fn session_hook_before_create_mutates_request() {
4652        struct MutatingHook;
4653        #[async_trait]
4654        impl SessionHook for MutatingHook {
4655            async fn before_create(
4656                &self,
4657                req: &mut CreateSessionRequest,
4658            ) -> Result<(), SessionError> {
4659                req.model = "hook-overridden".to_string();
4660                req.system_prompt = Some("injected by hook".to_string());
4661                Ok(())
4662            }
4663        }
4664
4665        let hook = MutatingHook;
4666        let mut req = CreateSessionRequest {
4667            model: "original".to_string(),
4668            prompt: meerkat_core::ContentInput::Text("test".to_string()),
4669            render_metadata: None,
4670            system_prompt: None,
4671            max_tokens: None,
4672            event_tx: None,
4673            skill_references: None,
4674            initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
4675            build: None,
4676            labels: None,
4677            deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
4678        };
4679        hook.before_create(&mut req).await.unwrap();
4680        assert_eq!(req.model, "hook-overridden");
4681        assert_eq!(req.system_prompt.as_deref(), Some("injected by hook"));
4682    }
4683
4684    #[test]
4685    fn recoverable_lifecycle_cleanup_accepts_ambiguous_member_cleanup() {
4686        let error = "previous member cleanup ambiguous for member rt:deep-investigator:singleton:0";
4687
4688        assert!(is_previous_member_cleanup_ambiguous_error(error));
4689        assert!(is_recoverable_lifecycle_cleanup_error(error));
4690    }
4691
4692    #[test]
4693    fn recoverable_lifecycle_cleanup_preserves_archive_cancel_race() {
4694        let error = "internal error: disposal completed but ArchiveSession failed: \
4695            session error: agent error: Internal error: runtime cancel-before-retire failed \
4696            for 019e3c52-0f1b-73d3-a5c7-4b21c2bbf131: Runtime not ready: running";
4697
4698        assert!(is_recoverable_lifecycle_cleanup_error(error));
4699    }
4700
4701    #[test]
4702    fn recoverable_lifecycle_cleanup_rejects_unrelated_errors() {
4703        assert!(!is_recoverable_lifecycle_cleanup_error(
4704            "actor task dropped"
4705        ));
4706        assert!(!is_recoverable_lifecycle_cleanup_error(
4707            "model provider returned rate limit"
4708        ));
4709    }
4710}