Skip to main content

meerkat_mobkit/
mob_handle_runtime.rs

1//! Mob member lifecycle management — bootstrap, spawn, reconcile, and roster queries.
2
3use std::collections::BTreeMap;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use meerkat::{AgentFactory, Config, FactoryAgentBuilder, SessionStore};
10use meerkat_client::types::LlmStream;
11use meerkat_client::{LlmClient, LlmRequest};
12use meerkat_core::agent::CommsRuntime;
13use meerkat_core::service::{
14    CreateSessionRequest, SessionError, SessionHistoryPage, SessionHistoryQuery,
15    SessionServiceHistoryExt,
16};
17use meerkat_core::{AgentSessionStore, AssistantBlock, Message, Provider};
18use meerkat_mob::{
19    MobBuilder, MobDefinition, MobError, MobHandle, MobSessionService, MobStorage, Profile,
20    ProfileName, SpawnMemberSpec,
21};
22use meerkat_runtime::input_state::StoredInputState;
23use meerkat_runtime::store::MachineLifecycleCommit;
24use meerkat_store::StoreAdapter;
25use serde_json::Value;
26
27use crate::blob_store::{
28    Base64BlobStoreAdapter, BinaryBlobStore, BinaryBlobStoreAdapter, ObjectStoreBlobStore,
29};
30
31pub(crate) const DELEGATE_IDLE_RETIRE_SECS_LABEL: &str = "implicit_delegate_idle_retire_secs";
32pub(crate) const DELEGATE_IDLE_RETIRE_DISABLED_LABEL: &str = "disabled";
33
34#[cfg(test)]
35use std::sync::{Mutex, OnceLock};
36
37/// Member state constant for active members.
38pub const MEMBER_STATE_ACTIVE: &str = "active";
39/// Member state constant for members transitioning to retired.
40pub const MEMBER_STATE_RETIRING: &str = "retiring";
41
42/// Options for bootstrapping a mob runtime.
43#[derive(Clone, Default)]
44pub struct MobBootstrapOptions {
45    pub allow_ephemeral_sessions: bool,
46    pub notify_orchestrator_on_resume: bool,
47    pub default_llm_client: Option<Arc<dyn LlmClient>>,
48}
49
50/// Wraps an LLM client and strips provider-emitted evidence blocks that are
51/// useful for UI/citation projection but unsafe to replay into the next
52/// stateless provider request.
53pub struct ReplaySanitizingLlmClient {
54    inner: Arc<dyn LlmClient>,
55}
56
57impl ReplaySanitizingLlmClient {
58    pub fn new(inner: Arc<dyn LlmClient>) -> Self {
59        Self { inner }
60    }
61
62    pub fn wrap(inner: Arc<dyn LlmClient>) -> Arc<dyn LlmClient> {
63        Arc::new(Self::new(inner))
64    }
65}
66
67/// Agent-layer companion to [`ReplaySanitizingLlmClient`].
68///
69/// Meerkat session services can also receive already-adapted
70/// `AgentLlmClient`s through live replacement and hot-swap APIs. Sanitize at
71/// that boundary too so provider-emitted server tool telemetry is never
72/// replayed into the next stateless model request just because the client
73/// entered below the raw `LlmClient` adapter seam.
74pub struct ReplaySanitizingAgentLlmClient {
75    inner: Arc<dyn meerkat_core::AgentLlmClient>,
76}
77
78impl ReplaySanitizingAgentLlmClient {
79    pub fn new(inner: Arc<dyn meerkat_core::AgentLlmClient>) -> Self {
80        Self { inner }
81    }
82
83    pub fn wrap(
84        inner: Arc<dyn meerkat_core::AgentLlmClient>,
85    ) -> Arc<dyn meerkat_core::AgentLlmClient> {
86        Arc::new(Self::new(inner))
87    }
88}
89
90#[async_trait]
91impl meerkat_core::AgentLlmClient for ReplaySanitizingAgentLlmClient {
92    async fn stream_response(
93        &self,
94        messages: &[Message],
95        tools: &[Arc<meerkat_core::ToolDef>],
96        max_tokens: u32,
97        temperature: Option<f32>,
98        provider_params: Option<&meerkat_core::lifecycle::run_primitive::ProviderParamsOverride>,
99    ) -> Result<meerkat_core::agent::LlmStreamResult, meerkat_core::AgentError> {
100        let sanitized: Vec<Message> = messages
101            .iter()
102            .cloned()
103            .map(sanitize_message_for_stateless_replay)
104            .collect();
105        self.inner
106            .stream_response(&sanitized, tools, max_tokens, temperature, provider_params)
107            .await
108    }
109
110    fn provider(&self) -> &'static str {
111        self.inner.provider()
112    }
113
114    fn model(&self) -> &str {
115        self.inner.model()
116    }
117
118    fn compile_schema(
119        &self,
120        output_schema: &meerkat_core::OutputSchema,
121    ) -> Result<meerkat_core::schema::CompiledSchema, meerkat_core::schema::SchemaError> {
122        self.inner.compile_schema(output_schema)
123    }
124}
125
126#[async_trait]
127impl LlmClient for ReplaySanitizingLlmClient {
128    fn project_replay_messages(
129        &self,
130        messages: &[Message],
131    ) -> Result<Vec<Message>, meerkat_client::LlmError> {
132        let sanitized: Vec<Message> = messages
133            .iter()
134            .cloned()
135            .map(sanitize_message_for_stateless_replay)
136            .collect();
137        self.inner.project_replay_messages(&sanitized)
138    }
139
140    fn stream<'a>(&'a self, request: &'a LlmRequest) -> LlmStream<'a> {
141        let inner = Arc::clone(&self.inner);
142        let sanitized = sanitize_llm_request_for_stateless_replay(request);
143        Box::pin(async_stream::stream! {
144            let mut stream = inner.stream(&sanitized);
145            while let Some(event) = stream.next().await {
146                if runtime_turn_diagnostics_enabled()
147                    && let Err(error) = &event
148                {
149                    tracing::error!(
150                        error = %error,
151                        error_debug = ?error,
152                        "mobkit llm client stream error"
153                    );
154                }
155                yield event;
156            }
157        })
158    }
159
160    fn provider(&self) -> &'static str {
161        self.inner.provider()
162    }
163
164    async fn health_check(&self) -> Result<(), meerkat_client::LlmError> {
165        self.inner.health_check().await
166    }
167
168    fn compile_schema(
169        &self,
170        output_schema: &meerkat_core::OutputSchema,
171    ) -> Result<meerkat_core::schema::CompiledSchema, meerkat_core::schema::SchemaError> {
172        self.inner.compile_schema(output_schema)
173    }
174}
175
176/// Async hook called before each session is created. Receives the mutable
177/// `CreateSessionRequest` so the app can inject external tools, augment the
178/// system prompt, set labels, override the model, load session resume data
179/// from external stores, etc.
180///
181/// The hook runs **before** `create_session` captures labels and LLM identity,
182/// so all mutations are reflected in session metadata, not just the agent build.
183///
184/// ```rust,ignore
185/// let spec = MobBootstrapSpec::persistent_with_hook(
186///     definition, storage, store_path, 64, session_store,
187///     |req: &mut CreateSessionRequest| {
188///         Box::pin(async move {
189///             // Async: load session from external store
190///             let session = my_store.load_by_owner(&owner_id).await;
191///             if let Some(s) = session {
192///                 let build = req.build.get_or_insert_with(SessionBuildOptions::default);
193///                 build.resume_session = Some(s);
194///             }
195///             // Sync: inject tools, augment prompt
196///             let build = req.build.get_or_insert_with(SessionBuildOptions::default);
197///             build.external_tools = Some(my_tools());
198///             Ok(())
199///         })
200///     },
201/// );
202/// ```
203pub(crate) type PreBuildHook = Arc<
204    dyn Fn(
205            &mut CreateSessionRequest,
206        ) -> std::pin::Pin<
207            Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
208        > + Send
209        + Sync,
210>;
211
212/// Optional post-creation hook invoked after `create_session` succeeds.
213pub type AfterCreateHook = Arc<
214    dyn Fn(
215            meerkat_core::types::SessionId,
216            SessionCreatedContext,
217        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
218        + Send
219        + Sync,
220>;
221
222/// Wraps a `MobSessionService`, applying a `PreBuildHook` to the
223/// `CreateSessionRequest` in `create_session()` before delegating.
224///
225/// The hook runs before labels and LLM identity are captured by the inner
226/// session service, so mutations to `req.labels`, `req.model`, `req.build`,
227/// and `req.system_prompt` are fully reflected in session metadata.
228struct PreBuildMobSessionService {
229    inner: Arc<dyn MobSessionService>,
230    hook: PreBuildHook,
231    after_create_hook: Option<AfterCreateHook>,
232    runtime_adapter_override: Option<Arc<meerkat_runtime::MeerkatMachine>>,
233}
234
235fn no_op_pre_build_hook() -> PreBuildHook {
236    Arc::new(|_req: &mut CreateSessionRequest| Box::pin(async { Ok(()) }))
237}
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq)]
240pub(crate) enum DelegateIdleRetireOverride {
241    Disabled,
242    Seconds(u64),
243}
244
245#[derive(Clone, Default)]
246pub(crate) struct ImplicitDelegateRetirementOverrides {
247    inner: Arc<tokio::sync::RwLock<BTreeMap<(String, String), DelegateIdleRetireOverride>>>,
248}
249
250impl ImplicitDelegateRetirementOverrides {
251    pub(crate) async fn set(
252        &self,
253        mob_id: impl Into<String>,
254        member_id: impl Into<String>,
255        override_policy: DelegateIdleRetireOverride,
256    ) {
257        self.inner
258            .write()
259            .await
260            .insert((mob_id.into(), member_id.into()), override_policy);
261    }
262
263    pub(crate) async fn get(
264        &self,
265        mob_id: &str,
266        member_id: &str,
267    ) -> Option<DelegateIdleRetireOverride> {
268        self.inner
269            .read()
270            .await
271            .get(&(mob_id.to_string(), member_id.to_string()))
272            .copied()
273    }
274}
275
276struct AutoWireParentMobToolsFactory {
277    inner: Arc<dyn meerkat_core::service::MobToolsFactory>,
278    state: Arc<meerkat_mob_mcp::MobMcpState>,
279    implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides,
280}
281
282#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
283#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
284impl meerkat_core::service::MobToolsFactory for AutoWireParentMobToolsFactory {
285    async fn build_mob_tools(
286        &self,
287        args: meerkat_core::service::MobToolsBuildArgs,
288    ) -> Result<Arc<dyn meerkat_core::AgentToolDispatcher>, Box<dyn std::error::Error + Send + Sync>>
289    {
290        let owner_identity = args
291            .comms_name
292            .as_deref()
293            .and_then(owner_identity_from_comms_name);
294        let inner = self.inner.build_mob_tools(args).await?;
295        Ok(Arc::new(AutoWireParentMobToolDispatcher {
296            inner,
297            state: Arc::clone(&self.state),
298            owner_identity,
299            implicit_delegate_retirement_overrides: self
300                .implicit_delegate_retirement_overrides
301                .clone(),
302        }))
303    }
304}
305
306struct AutoWireParentMobToolDispatcher {
307    inner: Arc<dyn meerkat_core::AgentToolDispatcher>,
308    state: Arc<meerkat_mob_mcp::MobMcpState>,
309    owner_identity: Option<String>,
310    implicit_delegate_retirement_overrides: ImplicitDelegateRetirementOverrides,
311}
312
313#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
314#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
315impl meerkat_core::AgentToolDispatcher for AutoWireParentMobToolDispatcher {
316    fn tools(&self) -> Arc<[Arc<meerkat_core::types::ToolDef>]> {
317        self.inner
318            .tools()
319            .iter()
320            .map(|tool| {
321                if tool.name == "delegate" {
322                    Arc::new(delegate_tool_def_with_idle_retire_secs(tool))
323                } else {
324                    Arc::clone(tool)
325                }
326            })
327            .collect::<Vec<_>>()
328            .into()
329    }
330
331    async fn dispatch(
332        &self,
333        call: meerkat_core::types::ToolCallView<'_>,
334    ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
335        if call.name == "delegate" {
336            return self.dispatch_delegate(call).await;
337        }
338        if call.name != "mob_spawn_member" {
339            return self.inner.dispatch(call).await;
340        }
341
342        let mut args = serde_json::from_str::<Value>(call.args.get()).map_err(|error| {
343            meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
344        })?;
345        let mut auto_wire_parent = true;
346        if let Some(object) = args.as_object_mut() {
347            auto_wire_parent = object
348                .get("auto_wire_parent")
349                .and_then(Value::as_bool)
350                .unwrap_or(true);
351            object
352                .entry("auto_wire_parent".to_string())
353                .or_insert(Value::Bool(true));
354        }
355        let mob_id = args
356            .get("mob_id")
357            .and_then(Value::as_str)
358            .map(str::to_string);
359        let member_id = args
360            .get("member_id")
361            .and_then(Value::as_str)
362            .map(str::to_string);
363        let args = serde_json::value::RawValue::from_string(args.to_string()).map_err(|error| {
364            meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
365        })?;
366        let call = meerkat_core::types::ToolCallView {
367            id: call.id,
368            name: call.name,
369            args: &args,
370        };
371        let outcome = self.inner.dispatch(call).await?;
372        if auto_wire_parent
373            && let (Some(mob_id), Some(member_id), Some(owner_identity)) =
374                (mob_id, member_id, self.owner_identity.as_deref())
375            && member_id != owner_identity
376        {
377            self.state
378                .mob_wire(
379                    &meerkat_mob::MobId::from(mob_id.as_str()),
380                    meerkat_mob::AgentIdentity::from(member_id.as_str()),
381                    meerkat_mob::PeerTarget::Local(meerkat_mob::AgentIdentity::from(
382                        owner_identity,
383                    )),
384                )
385                .await
386                .map_err(|error| {
387                    meerkat_core::ToolError::execution_failed(format!(
388                        "spawned member but failed to wire to spawning agent: {error}"
389                    ))
390                })?;
391        }
392        Ok(outcome)
393    }
394}
395
396impl AutoWireParentMobToolDispatcher {
397    async fn dispatch_delegate(
398        &self,
399        call: meerkat_core::types::ToolCallView<'_>,
400    ) -> Result<meerkat_core::ToolDispatchOutcome, meerkat_core::ToolError> {
401        let mut args = serde_json::from_str::<Value>(call.args.get()).map_err(|error| {
402            meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
403        })?;
404        let idle_retire_override = delegate_idle_retire_override_from_args(call.name, &mut args)?;
405        let args = serde_json::value::RawValue::from_string(args.to_string()).map_err(|error| {
406            meerkat_core::ToolError::invalid_arguments(call.name, error.to_string())
407        })?;
408        let call = meerkat_core::types::ToolCallView {
409            id: call.id,
410            name: call.name,
411            args: &args,
412        };
413        let outcome = self.inner.dispatch(call).await?;
414
415        if !outcome.result.is_error
416            && let Some(override_policy) = idle_retire_override
417            && let Ok(payload) = serde_json::from_str::<Value>(&outcome.result.text_content())
418            && let (Some(mob_id), Some(member_id)) = (
419                payload.get("mob_id").and_then(Value::as_str),
420                payload.get("agent_identity").and_then(Value::as_str),
421            )
422        {
423            self.implicit_delegate_retirement_overrides
424                .set(mob_id, member_id, override_policy)
425                .await;
426        }
427
428        Ok(outcome)
429    }
430}
431
432fn delegate_idle_retire_override_from_args(
433    tool_name: &str,
434    args: &mut Value,
435) -> Result<Option<DelegateIdleRetireOverride>, meerkat_core::ToolError> {
436    let Some(object) = args.as_object_mut() else {
437        return Ok(None);
438    };
439    let Some(value) = object.remove("idle_retire_secs") else {
440        return Ok(None);
441    };
442    if value.is_null() {
443        return Ok(Some(DelegateIdleRetireOverride::Disabled));
444    }
445    value
446        .as_u64()
447        .map(DelegateIdleRetireOverride::Seconds)
448        .map(Some)
449        .ok_or_else(|| {
450            meerkat_core::ToolError::invalid_arguments(
451                tool_name,
452                "idle_retire_secs must be a non-negative integer or null",
453            )
454        })
455}
456
457fn delegate_tool_def_with_idle_retire_secs(
458    tool: &meerkat_core::types::ToolDef,
459) -> meerkat_core::types::ToolDef {
460    let mut patched = tool.clone();
461    if !patched.description.contains("IDLE RETIREMENT:") {
462        patched.description.push_str(
463            "\n\nIDLE RETIREMENT:\n\
464             Omit idle_retire_secs to use the runtime default. Pass an integer \
465             number of seconds to override idle auto-retirement for this helper. \
466             Pass null to disable auto-retirement for this helper.",
467        );
468    }
469    if let Some(properties) = patched
470        .input_schema
471        .get_mut("properties")
472        .and_then(Value::as_object_mut)
473    {
474        properties
475            .entry("idle_retire_secs".to_string())
476            .or_insert_with(|| {
477                serde_json::json!({
478                    "description": "Override idle auto-retirement for this helper. Omit to use the runtime default, use an integer number of seconds to override, or null to disable auto-retirement for this helper.",
479                    "anyOf": [
480                        {"type": "integer", "minimum": 0},
481                        {"type": "null"}
482                    ]
483                })
484            });
485    }
486    patched
487}
488
489fn owner_identity_from_comms_name(name: &str) -> Option<String> {
490    name.rsplit('/')
491        .next()
492        .filter(|value| !value.is_empty())
493        .map(str::to_string)
494}
495
496fn install_agent_mob_tools(
497    slot: Arc<std::sync::RwLock<Option<Arc<dyn meerkat_core::service::MobToolsFactory>>>>,
498    session_service: Arc<dyn MobSessionService>,
499) -> (
500    Arc<meerkat_mob_mcp::MobMcpState>,
501    ImplicitDelegateRetirementOverrides,
502) {
503    let state = Arc::new(meerkat_mob_mcp::MobMcpState::new(session_service));
504    let implicit_delegate_retirement_overrides = ImplicitDelegateRetirementOverrides::default();
505    let inner = Arc::new(meerkat_mob_mcp::AgentMobToolSurfaceFactory::new(
506        Arc::clone(&state),
507    ));
508    let factory = Arc::new(AutoWireParentMobToolsFactory {
509        inner,
510        state: Arc::clone(&state),
511        implicit_delegate_retirement_overrides: implicit_delegate_retirement_overrides.clone(),
512    });
513    *slot
514        .write()
515        .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(factory);
516    (state, implicit_delegate_retirement_overrides)
517}
518
519#[cfg(test)]
520#[allow(dead_code)]
521#[derive(Debug, Clone)]
522pub(crate) struct RuntimeTurnTrace {
523    pub(crate) session_id: String,
524    pub(crate) boundary: String,
525    pub(crate) contributing_input_count: usize,
526    pub(crate) outcome: String,
527}
528
529fn is_replay_unsafe_server_tool_content(name: &str, content: &Value) -> bool {
530    name == "web_search_annotations"
531        || content
532            .get("type")
533            .and_then(Value::as_str)
534            .is_some_and(|kind| kind.starts_with("response."))
535}
536
537fn sanitize_llm_request_for_stateless_replay(request: &LlmRequest) -> LlmRequest {
538    let mut sanitized = request.clone();
539    sanitized.messages = request
540        .messages
541        .iter()
542        .cloned()
543        .map(sanitize_message_for_stateless_replay)
544        .collect();
545    sanitized
546}
547
548fn sanitize_create_session_request_llm_override(req: &mut CreateSessionRequest) {
549    let Some(build) = req.build.as_mut() else {
550        return;
551    };
552    let Some(client) = build
553        .llm_client_override
554        .as_ref()
555        .and_then(meerkat::decode_llm_client_override_from_service)
556    else {
557        return;
558    };
559    build.llm_client_override = Some(meerkat::encode_llm_client_override_for_service(
560        ReplaySanitizingLlmClient::wrap(client),
561    ));
562}
563
564fn sanitize_message_for_stateless_replay(message: Message) -> Message {
565    match message {
566        Message::BlockAssistant(mut assistant) => {
567            assistant.blocks = assistant
568                .blocks
569                .into_iter()
570                .filter_map(|block| match block {
571                    AssistantBlock::ServerToolContent { name, content, .. }
572                        if is_replay_unsafe_server_tool_content(&name, &content) =>
573                    {
574                        None
575                    }
576                    other => Some(other),
577                })
578                .collect();
579            Message::BlockAssistant(assistant)
580        }
581        other => other,
582    }
583}
584
585/// Open the persistent runtime store that holds the authoritative
586/// session snapshot used by `load_persisted_session` (resume path) and
587/// `load_persisted_session_for_control` (archive/retire path). Lives at
588/// `<store_path>/runtime.sqlite` — separate file from the session
589/// store so we don't depend on the session_store's concrete type. If
590/// the SQLite open fails (rare: disk full, permissions), fall back to
591/// `InMemoryRuntimeStore` so the runtime can still bootstrap. In that
592/// degraded mode resume across restart and archive operations will
593/// fail; the warning makes the cause visible in operator logs.
594fn build_persistent_runtime_store(store_path: &Path) -> Arc<dyn meerkat_runtime::RuntimeStore> {
595    let runtime_db = store_path.join("runtime.sqlite");
596    match meerkat_runtime::store::SqliteRuntimeStore::new(&runtime_db) {
597        Ok(store) => Arc::new(store),
598        Err(err) => {
599            tracing::warn!(
600                path = %runtime_db.display(),
601                error = %err,
602                "failed to open SqliteRuntimeStore; falling back to InMemoryRuntimeStore. \
603                 Sessions will not survive process restart and archive operations may fail.",
604            );
605            Arc::new(meerkat_runtime::InMemoryRuntimeStore::new())
606        }
607    }
608}
609
610/// RuntimeStore facade for external-authoritative identity-first apps.
611///
612/// `PersistentSessionService` treats `RuntimeStore` as the authoritative
613/// session snapshot source whenever one is installed. External apps such as
614/// OB3 supply a durable `SessionStore` through `ContinuitySessionStoreAdapter`,
615/// so this bridge makes that store visible to the runtime snapshot path while
616/// delegating non-session runtime bookkeeping to the process-local store.
617struct SessionStoreBackedRuntimeStore {
618    inner: Arc<dyn meerkat_runtime::RuntimeStore>,
619    session_store: Arc<dyn SessionStore>,
620}
621
622impl SessionStoreBackedRuntimeStore {
623    fn new(
624        inner: Arc<dyn meerkat_runtime::RuntimeStore>,
625        session_store: Arc<dyn SessionStore>,
626    ) -> Self {
627        Self {
628            inner,
629            session_store,
630        }
631    }
632
633    fn session_id_for_runtime(
634        runtime_id: &meerkat_runtime::LogicalRuntimeId,
635    ) -> Result<meerkat_core::types::SessionId, meerkat_runtime::store::RuntimeStoreError> {
636        const PREFIX: &str = "rt:session:";
637        let Some(raw) = runtime_id.0.strip_prefix(PREFIX) else {
638            return Err(meerkat_runtime::store::RuntimeStoreError::Unsupported(
639                format!("cannot map non-session runtime id '{runtime_id}' to SessionStore"),
640            ));
641        };
642        meerkat_core::types::SessionId::parse(raw).map_err(|err| {
643            meerkat_runtime::store::RuntimeStoreError::Internal(format!(
644                "invalid session runtime id '{runtime_id}': {err}"
645            ))
646        })
647    }
648
649    fn decode_session(
650        snapshot: &[u8],
651        session_store_key: Option<&meerkat_core::types::SessionId>,
652    ) -> Result<meerkat_core::Session, meerkat_runtime::store::RuntimeStoreError> {
653        let session: meerkat_core::Session = serde_json::from_slice(snapshot).map_err(|err| {
654            meerkat_runtime::store::RuntimeStoreError::Internal(format!(
655                "failed to deserialize runtime session snapshot: {err}"
656            ))
657        })?;
658        if let Some(expected) = session_store_key
659            && session.id() != expected
660        {
661            return Err(
662                meerkat_runtime::store::RuntimeStoreError::SessionKeyMismatch {
663                    expected: expected.clone(),
664                    actual: session.id().clone(),
665                },
666            );
667        }
668        Ok(session)
669    }
670
671    async fn save_session_snapshot_to_store(
672        &self,
673        snapshot: &[u8],
674        session_store_key: Option<&meerkat_core::types::SessionId>,
675    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
676        let session = Self::decode_session(snapshot, session_store_key)?;
677        self.session_store.save(&session).await.map_err(|err| {
678            meerkat_runtime::store::RuntimeStoreError::WriteFailed(format!(
679                "external SessionStore save failed: {err}"
680            ))
681        })
682    }
683}
684
685#[async_trait]
686impl meerkat_runtime::RuntimeStore for SessionStoreBackedRuntimeStore {
687    fn auth_authority_key(&self) -> Option<String> {
688        self.inner.auth_authority_key()
689    }
690
691    fn persist_auth_oauth_flow_snapshot(
692        &self,
693        snapshot_json: &[u8],
694    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
695        self.inner.persist_auth_oauth_flow_snapshot(snapshot_json)
696    }
697
698    fn load_auth_oauth_flow_snapshot(
699        &self,
700    ) -> Result<Option<Vec<u8>>, meerkat_runtime::store::RuntimeStoreError> {
701        self.inner.load_auth_oauth_flow_snapshot()
702    }
703
704    fn update_auth_oauth_flow_snapshot(
705        &self,
706        update: &mut meerkat_runtime::store::AuthOAuthFlowSnapshotUpdate<'_>,
707    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
708        self.inner.update_auth_oauth_flow_snapshot(update)
709    }
710
711    async fn commit_session_snapshot(
712        &self,
713        runtime_id: &meerkat_runtime::LogicalRuntimeId,
714        session_delta: meerkat_runtime::store::SessionDelta,
715    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
716        let session_id = Self::session_id_for_runtime(runtime_id)?;
717        self.save_session_snapshot_to_store(&session_delta.session_snapshot, Some(&session_id))
718            .await?;
719        self.inner
720            .commit_session_snapshot(runtime_id, session_delta)
721            .await
722    }
723
724    async fn atomic_apply(
725        &self,
726        runtime_id: &meerkat_runtime::LogicalRuntimeId,
727        session_delta: Option<meerkat_runtime::store::SessionDelta>,
728        receipt: meerkat_core::lifecycle::RunBoundaryReceipt,
729        input_updates: Vec<StoredInputState>,
730        session_store_key: Option<meerkat_core::types::SessionId>,
731    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
732        if let Some(delta) = session_delta.as_ref() {
733            self.save_session_snapshot_to_store(
734                &delta.session_snapshot,
735                session_store_key.as_ref(),
736            )
737            .await?;
738        }
739        self.inner
740            .atomic_apply(
741                runtime_id,
742                session_delta,
743                receipt,
744                input_updates,
745                session_store_key,
746            )
747            .await
748    }
749
750    async fn load_input_states(
751        &self,
752        runtime_id: &meerkat_runtime::LogicalRuntimeId,
753    ) -> Result<Vec<StoredInputState>, meerkat_runtime::store::RuntimeStoreError> {
754        self.inner.load_input_states(runtime_id).await
755    }
756
757    async fn load_boundary_receipt(
758        &self,
759        runtime_id: &meerkat_runtime::LogicalRuntimeId,
760        run_id: &meerkat_core::lifecycle::RunId,
761        sequence: u64,
762    ) -> Result<
763        Option<meerkat_core::lifecycle::RunBoundaryReceipt>,
764        meerkat_runtime::store::RuntimeStoreError,
765    > {
766        self.inner
767            .load_boundary_receipt(runtime_id, run_id, sequence)
768            .await
769    }
770
771    async fn load_session_snapshot(
772        &self,
773        runtime_id: &meerkat_runtime::LogicalRuntimeId,
774    ) -> Result<Option<Vec<u8>>, meerkat_runtime::store::RuntimeStoreError> {
775        if let Some(snapshot) = self.inner.load_session_snapshot(runtime_id).await? {
776            return Ok(Some(snapshot));
777        }
778        let session_id = Self::session_id_for_runtime(runtime_id)?;
779        let Some(session) = self.session_store.load(&session_id).await.map_err(|err| {
780            meerkat_runtime::store::RuntimeStoreError::ReadFailed(format!(
781                "external SessionStore load failed: {err}"
782            ))
783        })?
784        else {
785            return Ok(None);
786        };
787        serde_json::to_vec(&session).map(Some).map_err(|err| {
788            meerkat_runtime::store::RuntimeStoreError::Internal(format!(
789                "failed to serialize external SessionStore snapshot: {err}"
790            ))
791        })
792    }
793
794    async fn persist_input_state(
795        &self,
796        runtime_id: &meerkat_runtime::LogicalRuntimeId,
797        state: &StoredInputState,
798    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
799        self.inner.persist_input_state(runtime_id, state).await
800    }
801
802    async fn load_input_state(
803        &self,
804        runtime_id: &meerkat_runtime::LogicalRuntimeId,
805        input_id: &meerkat_core::lifecycle::InputId,
806    ) -> Result<Option<StoredInputState>, meerkat_runtime::store::RuntimeStoreError> {
807        self.inner.load_input_state(runtime_id, input_id).await
808    }
809
810    async fn load_runtime_state(
811        &self,
812        runtime_id: &meerkat_runtime::LogicalRuntimeId,
813    ) -> Result<Option<meerkat_runtime::RuntimeState>, meerkat_runtime::store::RuntimeStoreError>
814    {
815        self.inner.load_runtime_state(runtime_id).await
816    }
817
818    async fn commit_machine_lifecycle(
819        &self,
820        runtime_id: &meerkat_runtime::LogicalRuntimeId,
821        commit: MachineLifecycleCommit,
822        input_states: &[StoredInputState],
823    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
824        self.inner
825            .commit_machine_lifecycle(runtime_id, commit, input_states)
826            .await
827    }
828
829    async fn persist_ops_lifecycle(
830        &self,
831        runtime_id: &meerkat_runtime::LogicalRuntimeId,
832        snapshot: &meerkat_runtime::ops_lifecycle::PersistedOpsSnapshot,
833    ) -> Result<(), meerkat_runtime::store::RuntimeStoreError> {
834        self.inner.persist_ops_lifecycle(runtime_id, snapshot).await
835    }
836
837    async fn load_ops_lifecycle(
838        &self,
839        runtime_id: &meerkat_runtime::LogicalRuntimeId,
840    ) -> Result<
841        Option<meerkat_runtime::ops_lifecycle::PersistedOpsSnapshot>,
842        meerkat_runtime::store::RuntimeStoreError,
843    > {
844        self.inner.load_ops_lifecycle(runtime_id).await
845    }
846}
847
848#[cfg(test)]
849static RUNTIME_TURN_TRACES: OnceLock<Mutex<Vec<RuntimeTurnTrace>>> = OnceLock::new();
850
851#[cfg(test)]
852fn runtime_turn_traces() -> &'static Mutex<Vec<RuntimeTurnTrace>> {
853    RUNTIME_TURN_TRACES.get_or_init(|| Mutex::new(Vec::new()))
854}
855
856#[cfg(test)]
857#[allow(clippy::expect_used)]
858fn record_runtime_turn_trace(trace: RuntimeTurnTrace) {
859    runtime_turn_traces()
860        .lock()
861        .expect("runtime turn traces mutex")
862        .push(trace);
863}
864
865#[cfg(test)]
866#[allow(dead_code)]
867#[allow(clippy::expect_used)]
868pub(crate) fn take_runtime_turn_traces() -> Vec<RuntimeTurnTrace> {
869    std::mem::take(
870        &mut *runtime_turn_traces()
871            .lock()
872            .expect("runtime turn traces mutex"),
873    )
874}
875
876#[cfg(not(test))]
877#[allow(dead_code)]
878fn record_runtime_turn_trace(_trace: ()) {}
879
880fn runtime_turn_diagnostics_enabled() -> bool {
881    std::env::var_os("MOBKIT_TRACE_RUNTIME_TURNS").is_some()
882}
883
884fn summarize_runtime_prompt(prompt: &meerkat_core::ContentInput) -> String {
885    match prompt {
886        meerkat_core::ContentInput::Text(text) => {
887            text.lines().take(6).collect::<Vec<_>>().join(" ")
888        }
889        meerkat_core::ContentInput::Blocks(blocks) => blocks
890            .iter()
891            .map(|block| block.text_projection().to_string())
892            .collect::<Vec<_>>()
893            .join(" ")
894            .lines()
895            .take(6)
896            .collect::<Vec<_>>()
897            .join(" "),
898    }
899}
900
901/// Whether the session factory should wire the image-generation substrate for
902/// this definition. Meerkat owns the per-profile visibility decision via
903/// `profile.tools.image_generation`; MobKit only needs to make the runtime
904/// machine available when a profile opts in, or when a realm profile may resolve
905/// to an opt-in profile at spawn time.
906pub fn mob_definition_may_use_image_generation(definition: &MobDefinition) -> bool {
907    definition.profiles.values().any(|binding| {
908        binding
909            .as_inline()
910            .is_none_or(|profile| profile.tools.image_generation)
911    })
912}
913
914fn normalize_runtime_turn_request(
915    mut req: meerkat_core::service::StartTurnRequest,
916) -> meerkat_core::service::StartTurnRequest {
917    // Queue/Steer and render metadata are runtime-owned semantics. By the
918    // time apply_runtime_turn() invokes the session service, the runtime
919    // has already chosen the boundary and recorded the metadata it needs.
920    // The direct agent/session path is queue-only, so forward a normalized
921    // turn request to avoid re-injecting runtime-only semantics.
922    req.runtime.handling_mode = meerkat_core::types::HandlingMode::Queue;
923    req.runtime.render_metadata = None;
924    req
925}
926
927/// Implement all `MobSessionService` super-traits by delegating to `self.inner`,
928/// overriding only `create_session` to apply the pre-build hook.
929macro_rules! delegate_mob_session_service {
930    ($wrapper:ty) => {
931        #[async_trait]
932        impl meerkat_core::service::SessionService for $wrapper {
933            async fn create_session(
934                &self,
935                mut req: CreateSessionRequest,
936            ) -> Result<meerkat_core::types::RunResult, SessionError> {
937                (self.hook)(&mut req).await?;
938                sanitize_create_session_request_llm_override(&mut req);
939
940                // Capture context before create_session consumes the request.
941                let ctx = SessionCreatedContext {
942                    model: req.model.clone(),
943                    labels: req.labels.clone().unwrap_or_default(),
944                    system_prompt: req.system_prompt.clone(),
945                };
946                let result = self.inner.create_session(req).await?;
947
948                // Best-effort after_create — errors logged, not propagated.
949                if let Some(ref after_hook) = self.after_create_hook {
950                    after_hook(result.session_id.clone(), ctx).await;
951                }
952
953                Ok(result)
954            }
955            async fn start_turn(
956                &self,
957                id: &meerkat_core::types::SessionId,
958                req: meerkat_core::service::StartTurnRequest,
959            ) -> Result<meerkat_core::types::RunResult, SessionError> {
960                self.inner.start_turn(id, req).await
961            }
962            async fn interrupt(
963                &self,
964                id: &meerkat_core::types::SessionId,
965            ) -> Result<(), SessionError> {
966                self.inner.interrupt(id).await
967            }
968            async fn cancel_after_boundary(
969                &self,
970                id: &meerkat_core::types::SessionId,
971            ) -> Result<(), SessionError> {
972                self.inner.cancel_after_boundary(id).await
973            }
974            async fn set_session_client(
975                &self,
976                id: &meerkat_core::types::SessionId,
977                client: Arc<dyn meerkat_core::AgentLlmClient>,
978            ) -> Result<(), SessionError> {
979                self.inner
980                    .set_session_client(id, ReplaySanitizingAgentLlmClient::wrap(client))
981                    .await
982            }
983            async fn hot_swap_session_llm_identity(
984                &self,
985                id: &meerkat_core::types::SessionId,
986                client: Arc<dyn meerkat_core::AgentLlmClient>,
987                identity: meerkat_core::session::SessionLlmIdentity,
988                request_policy: meerkat_core::SessionLlmRequestPolicy,
989            ) -> Result<(), SessionError> {
990                self.inner
991                    .hot_swap_session_llm_identity(
992                        id,
993                        ReplaySanitizingAgentLlmClient::wrap(client),
994                        identity,
995                        request_policy,
996                    )
997                    .await
998            }
999            async fn update_session_keep_alive(
1000                &self,
1001                id: &meerkat_core::types::SessionId,
1002                keep_alive: bool,
1003            ) -> Result<(), SessionError> {
1004                self.inner.update_session_keep_alive(id, keep_alive).await
1005            }
1006            async fn update_session_mob_authority_context(
1007                &self,
1008                id: &meerkat_core::types::SessionId,
1009                authority_context: Option<meerkat_core::service::MobToolAuthorityContext>,
1010            ) -> Result<(), SessionError> {
1011                self.inner
1012                    .update_session_mob_authority_context(id, authority_context)
1013                    .await
1014            }
1015            async fn has_live_session(
1016                &self,
1017                id: &meerkat_core::types::SessionId,
1018            ) -> Result<bool, SessionError> {
1019                self.inner.has_live_session(id).await
1020            }
1021            async fn set_session_tool_visibility_state(
1022                &self,
1023                id: &meerkat_core::types::SessionId,
1024                state: Option<meerkat_core::SessionToolVisibilityState>,
1025            ) -> Result<(), SessionError> {
1026                self.inner
1027                    .set_session_tool_visibility_state(id, state)
1028                    .await
1029            }
1030            async fn set_session_tool_filter(
1031                &self,
1032                id: &meerkat_core::types::SessionId,
1033                filter: meerkat_core::ToolFilter,
1034            ) -> Result<(), SessionError> {
1035                self.inner.set_session_tool_filter(id, filter).await
1036            }
1037            async fn read(
1038                &self,
1039                id: &meerkat_core::types::SessionId,
1040            ) -> Result<meerkat_core::service::SessionView, SessionError> {
1041                self.inner.read(id).await
1042            }
1043            async fn list(
1044                &self,
1045                query: meerkat_core::service::SessionQuery,
1046            ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
1047                self.inner.list(query).await
1048            }
1049            async fn archive(
1050                &self,
1051                id: &meerkat_core::types::SessionId,
1052            ) -> Result<(), SessionError> {
1053                self.inner.archive(id).await
1054            }
1055            async fn subscribe_session_events(
1056                &self,
1057                id: &meerkat_core::types::SessionId,
1058            ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1059                meerkat_core::service::SessionService::subscribe_session_events(
1060                    self.inner.as_ref(),
1061                    id,
1062                )
1063                .await
1064            }
1065        }
1066
1067        #[async_trait]
1068        impl meerkat_core::service::SessionServiceCommsExt for $wrapper {
1069            async fn comms_runtime(
1070                &self,
1071                id: &meerkat_core::types::SessionId,
1072            ) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
1073                self.inner.comms_runtime(id).await
1074            }
1075
1076            async fn event_injector(
1077                &self,
1078                id: &meerkat_core::types::SessionId,
1079            ) -> Option<Arc<dyn meerkat_core::EventInjector>> {
1080                self.inner.event_injector(id).await
1081            }
1082
1083            async fn interaction_event_injector(
1084                &self,
1085                id: &meerkat_core::types::SessionId,
1086            ) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
1087                self.inner.interaction_event_injector(id).await
1088            }
1089        }
1090
1091        #[async_trait]
1092        impl meerkat_core::service::SessionServiceControlExt for $wrapper {
1093            async fn append_system_context(
1094                &self,
1095                id: &meerkat_core::types::SessionId,
1096                req: meerkat_core::service::AppendSystemContextRequest,
1097            ) -> Result<
1098                meerkat_core::service::AppendSystemContextResult,
1099                meerkat_core::service::SessionControlError,
1100            > {
1101                self.inner.append_system_context(id, req).await
1102            }
1103            async fn stage_tool_results(
1104                &self,
1105                id: &meerkat_core::types::SessionId,
1106                req: meerkat_core::service::StageToolResultsRequest,
1107            ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
1108                self.inner.stage_tool_results(id, req).await
1109            }
1110        }
1111
1112        #[async_trait]
1113        impl meerkat_core::service::SessionServiceHistoryExt for $wrapper {
1114            async fn read_history(
1115                &self,
1116                id: &meerkat_core::types::SessionId,
1117                query: meerkat_core::service::SessionHistoryQuery,
1118            ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
1119                self.inner.read_history(id, query).await
1120            }
1121        }
1122
1123        #[async_trait]
1124        impl MobSessionService for $wrapper {
1125            fn supports_persistent_sessions(&self) -> bool {
1126                self.inner.supports_persistent_sessions()
1127            }
1128            fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
1129                self.runtime_adapter_override
1130                    .clone()
1131                    .or_else(|| self.inner.runtime_adapter())
1132            }
1133            async fn interrupt_with_machine_authority(
1134                &self,
1135                session_id: &meerkat_core::types::SessionId,
1136                authority: meerkat_runtime::MachineSessionControlAuthority,
1137            ) -> Result<(), SessionError> {
1138                self.inner
1139                    .interrupt_with_machine_authority(session_id, authority)
1140                    .await
1141            }
1142            async fn cancel_after_boundary_with_machine_authority(
1143                &self,
1144                session_id: &meerkat_core::types::SessionId,
1145                authority: meerkat_runtime::MachineSessionControlAuthority,
1146            ) -> Result<(), SessionError> {
1147                self.inner
1148                    .cancel_after_boundary_with_machine_authority(session_id, authority)
1149                    .await
1150            }
1151            async fn session_belongs_to_mob(
1152                &self,
1153                session_id: &meerkat_core::types::SessionId,
1154                mob_id: &meerkat_mob::MobId,
1155            ) -> bool {
1156                self.inner.session_belongs_to_mob(session_id, mob_id).await
1157            }
1158            async fn load_persisted_session(
1159                &self,
1160                session_id: &meerkat_core::types::SessionId,
1161            ) -> Result<Option<meerkat_core::session::Session>, SessionError> {
1162                self.inner.load_persisted_session(session_id).await
1163            }
1164            async fn subscribe_session_events(
1165                &self,
1166                session_id: &meerkat_core::types::SessionId,
1167            ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1168                meerkat_mob::MobSessionService::subscribe_session_events(
1169                    self.inner.as_ref(),
1170                    session_id,
1171                )
1172                .await
1173            }
1174            async fn archive_with_mob_lifecycle_authority(
1175                &self,
1176                session_id: &meerkat_core::types::SessionId,
1177            ) -> Result<(), SessionError> {
1178                self.inner
1179                    .archive_with_mob_lifecycle_authority(session_id)
1180                    .await
1181            }
1182            async fn execution_snapshot(
1183                &self,
1184                session_id: &meerkat_core::types::SessionId,
1185            ) -> Result<Option<meerkat_core::agent::AgentExecutionSnapshot>, SessionError> {
1186                self.inner.execution_snapshot(session_id).await
1187            }
1188            async fn tool_scope_snapshot(
1189                &self,
1190                session_id: &meerkat_core::types::SessionId,
1191            ) -> Result<Option<meerkat_core::ToolScopeSnapshot>, SessionError> {
1192                self.inner.tool_scope_snapshot(session_id).await
1193            }
1194            async fn external_tool_surface_snapshot(
1195                &self,
1196                session_id: &meerkat_core::types::SessionId,
1197            ) -> Result<Option<meerkat_core::ExternalToolSurfaceSnapshot>, SessionError> {
1198                self.inner.external_tool_surface_snapshot(session_id).await
1199            }
1200            async fn peer_ingress_runtime_snapshot(
1201                &self,
1202                session_id: &meerkat_core::types::SessionId,
1203            ) -> Result<Option<meerkat_core::PeerIngressRuntimeSnapshot>, SessionError> {
1204                self.inner.peer_ingress_runtime_snapshot(session_id).await
1205            }
1206            async fn apply_runtime_turn(
1207                &self,
1208                session_id: &meerkat_core::types::SessionId,
1209                run_id: meerkat_core::lifecycle::RunId,
1210                req: meerkat_core::service::StartTurnRequest,
1211                boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1212                contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1213            ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1214                #[cfg(test)]
1215                let boundary_name = format!("{boundary:?}");
1216                #[cfg(test)]
1217                let contributing_count = contributing_input_ids.len();
1218                let run_id_for_log = run_id.to_string();
1219                let prompt_summary = if runtime_turn_diagnostics_enabled() {
1220                    Some(summarize_runtime_prompt(&req.prompt))
1221                } else {
1222                    None
1223                };
1224                if let Some(summary) = prompt_summary.as_ref() {
1225                    tracing::warn!(
1226                        session_id = %session_id,
1227                        run_id = %run_id_for_log,
1228                        boundary = ?boundary,
1229                        contributing_inputs = contributing_input_ids.len(),
1230                        prompt = %summary,
1231                        runtime = ?req.runtime,
1232                        "mobkit runtime turn start"
1233                    );
1234                }
1235                let result = self
1236                    .inner
1237                    .apply_runtime_turn(
1238                        session_id,
1239                        run_id,
1240                        normalize_runtime_turn_request(req),
1241                        boundary,
1242                        contributing_input_ids,
1243                    )
1244                    .await;
1245                #[cfg(test)]
1246                record_runtime_turn_trace(RuntimeTurnTrace {
1247                    session_id: session_id.to_string(),
1248                    boundary: boundary_name,
1249                    contributing_input_count: contributing_count,
1250                    outcome: match &result {
1251                        Ok(_) => "ok".to_string(),
1252                        Err(error) => format!("err:{error}"),
1253                    },
1254                });
1255                if runtime_turn_diagnostics_enabled() {
1256                    match &result {
1257                        Ok(_) => tracing::warn!(
1258                            session_id = %session_id,
1259                            run_id = %run_id_for_log,
1260                            "mobkit runtime turn ok"
1261                        ),
1262                        Err(error) => tracing::error!(
1263                            session_id = %session_id,
1264                            run_id = %run_id_for_log,
1265                            error = %error,
1266                            error_debug = ?error,
1267                            "mobkit runtime turn error"
1268                        ),
1269                    }
1270                }
1271                result
1272            }
1273            async fn apply_runtime_context_appends(
1274                &self,
1275                session_id: &meerkat_core::types::SessionId,
1276                run_id: meerkat_core::lifecycle::RunId,
1277                appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1278                contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1279            ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1280                self.inner
1281                    .apply_runtime_context_appends(
1282                        session_id,
1283                        run_id,
1284                        appends,
1285                        contributing_input_ids,
1286                    )
1287                    .await
1288            }
1289            async fn apply_runtime_context_appends_with_boundary(
1290                &self,
1291                session_id: &meerkat_core::types::SessionId,
1292                run_id: meerkat_core::lifecycle::RunId,
1293                appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1294                boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1295                contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1296            ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1297                self.inner
1298                    .apply_runtime_context_appends_with_boundary(
1299                        session_id,
1300                        run_id,
1301                        appends,
1302                        boundary,
1303                        contributing_input_ids,
1304                    )
1305                    .await
1306            }
1307            async fn apply_runtime_system_context_for_turn(
1308                &self,
1309                session_id: &meerkat_core::types::SessionId,
1310                appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1311            ) -> Result<(), SessionError> {
1312                self.inner
1313                    .apply_runtime_system_context_for_turn(session_id, appends)
1314                    .await
1315            }
1316            async fn discard_live_session(
1317                &self,
1318                session_id: &meerkat_core::types::SessionId,
1319            ) -> Result<(), SessionError> {
1320                self.inner.discard_live_session(session_id).await
1321            }
1322            async fn checkpoint_committed_runtime_session_snapshot(
1323                &self,
1324                session_id: &meerkat_core::types::SessionId,
1325                session_snapshot: &[u8],
1326            ) -> Result<(), SessionError> {
1327                self.inner
1328                    .checkpoint_committed_runtime_session_snapshot(session_id, session_snapshot)
1329                    .await
1330            }
1331            async fn cancel_all_checkpointers(&self) {
1332                self.inner.cancel_all_checkpointers().await;
1333            }
1334            async fn rearm_all_checkpointers(&self) {
1335                self.inner.rearm_all_checkpointers().await;
1336            }
1337        }
1338    };
1339}
1340
1341delegate_mob_session_service!(PreBuildMobSessionService);
1342
1343/// Wraps a `MobSessionService` to fire an `AfterCreateHook` after each
1344/// successful `create_session`. Unlike `PreBuildMobSessionService`, this
1345/// captures context **after** the inner service (including any pre-build hooks)
1346/// has finished, so the context reflects all mutations.
1347struct AfterCreateMobSessionService {
1348    inner: Arc<dyn MobSessionService>,
1349    after_hook: AfterCreateHook,
1350}
1351
1352#[async_trait]
1353impl meerkat_core::service::SessionService for AfterCreateMobSessionService {
1354    async fn create_session(
1355        &self,
1356        mut req: CreateSessionRequest,
1357    ) -> Result<meerkat_core::types::RunResult, SessionError> {
1358        sanitize_create_session_request_llm_override(&mut req);
1359        // Capture pre-create context from the request (before inner consumes it).
1360        // The inner service's pre-build hooks may mutate the request further,
1361        // but we capture here because we can't read the request after inner
1362        // consumes it. The pre-build hook runs inside inner.create_session.
1363        //
1364        // For accurate post-mutation context, we re-read from the request
1365        // that was already mutated by any outer hooks, and accept that inner
1366        // hooks are not visible here. This is the correct trade-off: the
1367        // after_create context matches the request as seen by this layer.
1368        let ctx = SessionCreatedContext {
1369            model: req.model.clone(),
1370            labels: req.labels.clone().unwrap_or_default(),
1371            system_prompt: req.system_prompt.clone(),
1372        };
1373        let result = self.inner.create_session(req).await?;
1374        (self.after_hook)(result.session_id.clone(), ctx).await;
1375        Ok(result)
1376    }
1377    async fn start_turn(
1378        &self,
1379        id: &meerkat_core::types::SessionId,
1380        req: meerkat_core::service::StartTurnRequest,
1381    ) -> Result<meerkat_core::types::RunResult, SessionError> {
1382        self.inner.start_turn(id, req).await
1383    }
1384    async fn interrupt(&self, id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
1385        self.inner.interrupt(id).await
1386    }
1387    async fn cancel_after_boundary(
1388        &self,
1389        id: &meerkat_core::types::SessionId,
1390    ) -> Result<(), SessionError> {
1391        self.inner.cancel_after_boundary(id).await
1392    }
1393    async fn set_session_client(
1394        &self,
1395        id: &meerkat_core::types::SessionId,
1396        client: Arc<dyn meerkat_core::AgentLlmClient>,
1397    ) -> Result<(), SessionError> {
1398        self.inner
1399            .set_session_client(id, ReplaySanitizingAgentLlmClient::wrap(client))
1400            .await
1401    }
1402    async fn hot_swap_session_llm_identity(
1403        &self,
1404        id: &meerkat_core::types::SessionId,
1405        client: Arc<dyn meerkat_core::AgentLlmClient>,
1406        identity: meerkat_core::session::SessionLlmIdentity,
1407        request_policy: meerkat_core::SessionLlmRequestPolicy,
1408    ) -> Result<(), SessionError> {
1409        self.inner
1410            .hot_swap_session_llm_identity(
1411                id,
1412                ReplaySanitizingAgentLlmClient::wrap(client),
1413                identity,
1414                request_policy,
1415            )
1416            .await
1417    }
1418    async fn update_session_keep_alive(
1419        &self,
1420        id: &meerkat_core::types::SessionId,
1421        keep_alive: bool,
1422    ) -> Result<(), SessionError> {
1423        self.inner.update_session_keep_alive(id, keep_alive).await
1424    }
1425    async fn update_session_mob_authority_context(
1426        &self,
1427        id: &meerkat_core::types::SessionId,
1428        authority_context: Option<meerkat_core::service::MobToolAuthorityContext>,
1429    ) -> Result<(), SessionError> {
1430        self.inner
1431            .update_session_mob_authority_context(id, authority_context)
1432            .await
1433    }
1434    async fn has_live_session(
1435        &self,
1436        id: &meerkat_core::types::SessionId,
1437    ) -> Result<bool, SessionError> {
1438        self.inner.has_live_session(id).await
1439    }
1440    async fn set_session_tool_visibility_state(
1441        &self,
1442        id: &meerkat_core::types::SessionId,
1443        state: Option<meerkat_core::SessionToolVisibilityState>,
1444    ) -> Result<(), SessionError> {
1445        self.inner
1446            .set_session_tool_visibility_state(id, state)
1447            .await
1448    }
1449    async fn set_session_tool_filter(
1450        &self,
1451        id: &meerkat_core::types::SessionId,
1452        filter: meerkat_core::ToolFilter,
1453    ) -> Result<(), SessionError> {
1454        self.inner.set_session_tool_filter(id, filter).await
1455    }
1456    async fn read(
1457        &self,
1458        id: &meerkat_core::types::SessionId,
1459    ) -> Result<meerkat_core::service::SessionView, SessionError> {
1460        self.inner.read(id).await
1461    }
1462    async fn list(
1463        &self,
1464        query: meerkat_core::service::SessionQuery,
1465    ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
1466        self.inner.list(query).await
1467    }
1468    async fn archive(&self, id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
1469        self.inner.archive(id).await
1470    }
1471    async fn subscribe_session_events(
1472        &self,
1473        id: &meerkat_core::types::SessionId,
1474    ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1475        meerkat_core::service::SessionService::subscribe_session_events(self.inner.as_ref(), id)
1476            .await
1477    }
1478}
1479
1480#[async_trait]
1481impl meerkat_core::service::SessionServiceCommsExt for AfterCreateMobSessionService {
1482    async fn comms_runtime(
1483        &self,
1484        id: &meerkat_core::types::SessionId,
1485    ) -> Option<Arc<dyn meerkat_core::agent::CommsRuntime>> {
1486        self.inner.comms_runtime(id).await
1487    }
1488
1489    async fn event_injector(
1490        &self,
1491        id: &meerkat_core::types::SessionId,
1492    ) -> Option<Arc<dyn meerkat_core::EventInjector>> {
1493        self.inner.event_injector(id).await
1494    }
1495
1496    async fn interaction_event_injector(
1497        &self,
1498        id: &meerkat_core::types::SessionId,
1499    ) -> Option<Arc<dyn meerkat_core::event_injector::SubscribableInjector>> {
1500        self.inner.interaction_event_injector(id).await
1501    }
1502}
1503
1504#[async_trait]
1505impl meerkat_core::service::SessionServiceControlExt for AfterCreateMobSessionService {
1506    async fn append_system_context(
1507        &self,
1508        id: &meerkat_core::types::SessionId,
1509        req: meerkat_core::service::AppendSystemContextRequest,
1510    ) -> Result<
1511        meerkat_core::service::AppendSystemContextResult,
1512        meerkat_core::service::SessionControlError,
1513    > {
1514        self.inner.append_system_context(id, req).await
1515    }
1516    async fn stage_tool_results(
1517        &self,
1518        id: &meerkat_core::types::SessionId,
1519        req: meerkat_core::service::StageToolResultsRequest,
1520    ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
1521        self.inner.stage_tool_results(id, req).await
1522    }
1523}
1524
1525#[async_trait]
1526impl meerkat_core::service::SessionServiceHistoryExt for AfterCreateMobSessionService {
1527    async fn read_history(
1528        &self,
1529        id: &meerkat_core::types::SessionId,
1530        query: meerkat_core::service::SessionHistoryQuery,
1531    ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
1532        self.inner.read_history(id, query).await
1533    }
1534}
1535
1536#[async_trait]
1537impl MobSessionService for AfterCreateMobSessionService {
1538    fn supports_persistent_sessions(&self) -> bool {
1539        self.inner.supports_persistent_sessions()
1540    }
1541    fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
1542        self.inner.runtime_adapter()
1543    }
1544    async fn interrupt_with_machine_authority(
1545        &self,
1546        session_id: &meerkat_core::types::SessionId,
1547        authority: meerkat_runtime::MachineSessionControlAuthority,
1548    ) -> Result<(), SessionError> {
1549        self.inner
1550            .interrupt_with_machine_authority(session_id, authority)
1551            .await
1552    }
1553    async fn cancel_after_boundary_with_machine_authority(
1554        &self,
1555        session_id: &meerkat_core::types::SessionId,
1556        authority: meerkat_runtime::MachineSessionControlAuthority,
1557    ) -> Result<(), SessionError> {
1558        self.inner
1559            .cancel_after_boundary_with_machine_authority(session_id, authority)
1560            .await
1561    }
1562    async fn session_belongs_to_mob(
1563        &self,
1564        session_id: &meerkat_core::types::SessionId,
1565        mob_id: &meerkat_mob::MobId,
1566    ) -> bool {
1567        self.inner.session_belongs_to_mob(session_id, mob_id).await
1568    }
1569    async fn load_persisted_session(
1570        &self,
1571        session_id: &meerkat_core::types::SessionId,
1572    ) -> Result<Option<meerkat_core::session::Session>, SessionError> {
1573        self.inner.load_persisted_session(session_id).await
1574    }
1575    async fn subscribe_session_events(
1576        &self,
1577        session_id: &meerkat_core::types::SessionId,
1578    ) -> Result<meerkat_core::comms::EventStream, meerkat_core::comms::StreamError> {
1579        meerkat_mob::MobSessionService::subscribe_session_events(self.inner.as_ref(), session_id)
1580            .await
1581    }
1582    async fn archive_with_mob_lifecycle_authority(
1583        &self,
1584        session_id: &meerkat_core::types::SessionId,
1585    ) -> Result<(), SessionError> {
1586        self.inner
1587            .archive_with_mob_lifecycle_authority(session_id)
1588            .await
1589    }
1590    async fn execution_snapshot(
1591        &self,
1592        session_id: &meerkat_core::types::SessionId,
1593    ) -> Result<Option<meerkat_core::agent::AgentExecutionSnapshot>, SessionError> {
1594        self.inner.execution_snapshot(session_id).await
1595    }
1596    async fn tool_scope_snapshot(
1597        &self,
1598        session_id: &meerkat_core::types::SessionId,
1599    ) -> Result<Option<meerkat_core::ToolScopeSnapshot>, SessionError> {
1600        self.inner.tool_scope_snapshot(session_id).await
1601    }
1602    async fn external_tool_surface_snapshot(
1603        &self,
1604        session_id: &meerkat_core::types::SessionId,
1605    ) -> Result<Option<meerkat_core::ExternalToolSurfaceSnapshot>, SessionError> {
1606        self.inner.external_tool_surface_snapshot(session_id).await
1607    }
1608    async fn peer_ingress_runtime_snapshot(
1609        &self,
1610        session_id: &meerkat_core::types::SessionId,
1611    ) -> Result<Option<meerkat_core::PeerIngressRuntimeSnapshot>, SessionError> {
1612        self.inner.peer_ingress_runtime_snapshot(session_id).await
1613    }
1614    async fn apply_runtime_turn(
1615        &self,
1616        session_id: &meerkat_core::types::SessionId,
1617        run_id: meerkat_core::lifecycle::RunId,
1618        req: meerkat_core::service::StartTurnRequest,
1619        boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1620        contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1621    ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1622        self.inner
1623            .apply_runtime_turn(session_id, run_id, req, boundary, contributing_input_ids)
1624            .await
1625    }
1626    async fn apply_runtime_context_appends(
1627        &self,
1628        session_id: &meerkat_core::types::SessionId,
1629        run_id: meerkat_core::lifecycle::RunId,
1630        appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1631        contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1632    ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1633        self.inner
1634            .apply_runtime_context_appends(session_id, run_id, appends, contributing_input_ids)
1635            .await
1636    }
1637    async fn apply_runtime_context_appends_with_boundary(
1638        &self,
1639        session_id: &meerkat_core::types::SessionId,
1640        run_id: meerkat_core::lifecycle::RunId,
1641        appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1642        boundary: meerkat_core::lifecycle::run_primitive::RunApplyBoundary,
1643        contributing_input_ids: Vec<meerkat_core::lifecycle::InputId>,
1644    ) -> Result<meerkat_core::lifecycle::core_executor::CoreApplyOutput, SessionError> {
1645        self.inner
1646            .apply_runtime_context_appends_with_boundary(
1647                session_id,
1648                run_id,
1649                appends,
1650                boundary,
1651                contributing_input_ids,
1652            )
1653            .await
1654    }
1655    async fn apply_runtime_system_context_for_turn(
1656        &self,
1657        session_id: &meerkat_core::types::SessionId,
1658        appends: Vec<meerkat_core::session::PendingSystemContextAppend>,
1659    ) -> Result<(), SessionError> {
1660        self.inner
1661            .apply_runtime_system_context_for_turn(session_id, appends)
1662            .await
1663    }
1664    async fn discard_live_session(
1665        &self,
1666        session_id: &meerkat_core::types::SessionId,
1667    ) -> Result<(), SessionError> {
1668        self.inner.discard_live_session(session_id).await
1669    }
1670    async fn checkpoint_committed_runtime_session_snapshot(
1671        &self,
1672        session_id: &meerkat_core::types::SessionId,
1673        session_snapshot: &[u8],
1674    ) -> Result<(), SessionError> {
1675        self.inner
1676            .checkpoint_committed_runtime_session_snapshot(session_id, session_snapshot)
1677            .await
1678    }
1679    async fn cancel_all_checkpointers(&self) {
1680        self.inner.cancel_all_checkpointers().await;
1681    }
1682    async fn rearm_all_checkpointers(&self) {
1683        self.inner.rearm_all_checkpointers().await;
1684    }
1685}
1686
1687/// Specification for bootstrapping a mob runtime from a definition, storage, and session service.
1688pub struct MobBootstrapSpec {
1689    pub definition: MobDefinition,
1690    pub storage: MobStorage,
1691    pub session_service: Arc<dyn MobSessionService>,
1692    pub binary_blob_store: Option<Arc<dyn BinaryBlobStore>>,
1693    pub(crate) agent_mob_mcp_state: Option<Arc<meerkat_mob_mcp::MobMcpState>>,
1694    pub(crate) implicit_delegate_retirement_overrides: Option<ImplicitDelegateRetirementOverrides>,
1695    pub options: MobBootstrapOptions,
1696    /// Explicit runtime adapter — bypasses `session_service.runtime_adapter()`.
1697    ///
1698    /// Used by `persistent()` to supply the adapter directly so the session
1699    /// service's `runtime_store` can stay `None` (keeping the checkpointer
1700    /// enabled). See meerkat-session#checkpointer-enabled-flag.
1701    pub runtime_adapter: Option<Arc<meerkat_runtime::MeerkatMachine>>,
1702    /// Holds the ephemeral temp directory alive for the lifetime of the spec.
1703    /// Only populated when the builder creates an ephemeral runtime.
1704    pub(crate) _ephemeral_dir: Option<Arc<tempfile::TempDir>>,
1705}
1706
1707impl MobBootstrapSpec {
1708    pub fn new(
1709        definition: MobDefinition,
1710        storage: MobStorage,
1711        session_service: Arc<dyn MobSessionService>,
1712    ) -> Self {
1713        let session_service = Arc::new(PreBuildMobSessionService {
1714            inner: session_service,
1715            hook: no_op_pre_build_hook(),
1716            after_create_hook: None,
1717            runtime_adapter_override: None,
1718        }) as Arc<dyn MobSessionService>;
1719        Self {
1720            definition,
1721            storage,
1722            session_service,
1723            binary_blob_store: None,
1724            agent_mob_mcp_state: None,
1725            implicit_delegate_retirement_overrides: None,
1726            options: MobBootstrapOptions {
1727                allow_ephemeral_sessions: true,
1728                notify_orchestrator_on_resume: true,
1729                default_llm_client: None,
1730            },
1731            runtime_adapter: None,
1732            _ephemeral_dir: None,
1733        }
1734    }
1735
1736    pub fn with_options(mut self, options: MobBootstrapOptions) -> Self {
1737        self.options = options;
1738        self
1739    }
1740
1741    /// Expose a runtime adapter through the session-service facade.
1742    ///
1743    /// Custom embedders that construct their own `MobSessionService` still need
1744    /// MobKit's session-service surface to report the same runtime authority
1745    /// that `MobBuilder::with_runtime_adapter(...)` receives. This keeps
1746    /// autonomous-host comms, runtime inspection, and control paths pointed at
1747    /// one machine without forcing embedders through the stock factory helpers.
1748    pub fn with_session_runtime_adapter(
1749        mut self,
1750        adapter: Arc<meerkat_runtime::MeerkatMachine>,
1751    ) -> Self {
1752        self.session_service = Arc::new(PreBuildMobSessionService {
1753            inner: self.session_service,
1754            hook: no_op_pre_build_hook(),
1755            after_create_hook: None,
1756            runtime_adapter_override: Some(adapter),
1757        });
1758        self
1759    }
1760
1761    /// Wrap the session service with an after-create hook that fires after
1762    /// each successful `create_session`. The hook is best-effort: errors are
1763    /// not propagated. Uses `AfterCreateMobSessionService` which wraps the
1764    /// inner service without a pre-build hook, so any pre-build mutations
1765    /// from inner wrappers are fully reflected in the context.
1766    pub fn with_after_create_hook(mut self, hook: AfterCreateHook) -> Self {
1767        self.session_service = Arc::new(AfterCreateMobSessionService {
1768            inner: self.session_service,
1769            after_hook: hook,
1770        });
1771        self
1772    }
1773
1774    /// Build an ephemeral session service with a correctly wired `AgentFactory`.
1775    ///
1776    /// If `session_store` is provided, it is set on the `FactoryAgentBuilder` so
1777    /// that agents use the given store instead of falling back to JSONL.
1778    pub fn ephemeral(
1779        definition: MobDefinition,
1780        storage: MobStorage,
1781        store_path: PathBuf,
1782        max_sessions: usize,
1783        session_store: Option<Arc<dyn AgentSessionStore>>,
1784    ) -> Self {
1785        Self::ephemeral_inner(
1786            definition,
1787            storage,
1788            store_path,
1789            max_sessions,
1790            session_store,
1791            None,
1792            CapabilityFlags::default(),
1793            None,
1794        )
1795    }
1796
1797    /// Like [`ephemeral`](Self::ephemeral), but with a pre-build hook that is
1798    /// called before each agent is constructed. Use this to inject external
1799    /// tools, augment system prompts, or set per-agent labels.
1800    pub fn ephemeral_with_hook(
1801        definition: MobDefinition,
1802        storage: MobStorage,
1803        store_path: PathBuf,
1804        max_sessions: usize,
1805        session_store: Option<Arc<dyn AgentSessionStore>>,
1806        hook: impl Fn(
1807            &mut CreateSessionRequest,
1808        ) -> std::pin::Pin<
1809            Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
1810        > + Send
1811        + Sync
1812        + 'static,
1813    ) -> Self {
1814        Self::ephemeral_inner(
1815            definition,
1816            storage,
1817            store_path,
1818            max_sessions,
1819            session_store,
1820            Some(Arc::new(hook)),
1821            CapabilityFlags::default(),
1822            None,
1823        )
1824    }
1825
1826    #[allow(clippy::too_many_arguments)]
1827    pub(crate) fn ephemeral_inner(
1828        definition: MobDefinition,
1829        storage: MobStorage,
1830        store_path: PathBuf,
1831        max_sessions: usize,
1832        session_store: Option<Arc<dyn AgentSessionStore>>,
1833        hook: Option<PreBuildHook>,
1834        mut caps: CapabilityFlags,
1835        after_create_hook: Option<AfterCreateHook>,
1836    ) -> Self {
1837        caps.image_generation |= mob_definition_may_use_image_generation(&definition);
1838        let binary_blob_store: Arc<dyn BinaryBlobStore> = Arc::new(ObjectStoreBlobStore::memory());
1839        let blob_store: Arc<dyn meerkat_core::BlobStore> =
1840            Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
1841        let runtime_adapter = if caps.image_generation {
1842            let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
1843                Arc::new(meerkat_runtime::InMemoryRuntimeStore::new());
1844            Some(Arc::new(meerkat_runtime::MeerkatMachine::persistent(
1845                runtime_store,
1846                Arc::clone(&blob_store),
1847            )))
1848        } else {
1849            None
1850        };
1851        let mut factory = AgentFactory::new(&store_path)
1852            .builtins(caps.builtins)
1853            .shell(caps.shell)
1854            .mob(caps.mob)
1855            .comms(caps.comms)
1856            .memory(caps.memory);
1857        if let Some(machine) = runtime_adapter.clone() {
1858            factory = factory.with_image_generation_machine(machine);
1859        }
1860        let config = Config::default();
1861        let mut builder = FactoryAgentBuilder::new(factory, config);
1862        builder.default_blob_store = Some(blob_store);
1863        if let Some(store) = session_store {
1864            builder.default_session_store = Some(store);
1865        }
1866        let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
1867        let session_service: Arc<dyn MobSessionService> = Arc::new(
1868            meerkat_session::EphemeralSessionService::new(builder, max_sessions),
1869        );
1870        let hook = hook.unwrap_or_else(no_op_pre_build_hook);
1871        let after_create_hook = if let Some(runtime_adapter) = runtime_adapter.clone() {
1872            let user_after_create_hook = after_create_hook.clone();
1873            Some(Arc::new(
1874                move |session_id: meerkat_core::types::SessionId, ctx: SessionCreatedContext| {
1875                    let runtime_adapter = runtime_adapter.clone();
1876                    let user_after_create_hook = user_after_create_hook.clone();
1877                    Box::pin(async move {
1878                        runtime_adapter.register_session(session_id.clone()).await;
1879                        if let Some(user_after_create_hook) = user_after_create_hook {
1880                            user_after_create_hook(session_id, ctx).await;
1881                        }
1882                    })
1883                        as std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
1884                },
1885            ) as AfterCreateHook)
1886        } else {
1887            after_create_hook
1888        };
1889        let session_service = Arc::new(PreBuildMobSessionService {
1890            inner: session_service,
1891            hook,
1892            after_create_hook,
1893            runtime_adapter_override: runtime_adapter.clone(),
1894        }) as Arc<dyn MobSessionService>;
1895        let (agent_mob_mcp_state, implicit_delegate_retirement_overrides) =
1896            install_agent_mob_tools(mob_tools_slot, Arc::clone(&session_service));
1897        let mut spec = Self::new(definition, storage, session_service);
1898        spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
1899        spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
1900        spec.runtime_adapter = runtime_adapter;
1901        spec.binary_blob_store = Some(binary_blob_store);
1902        spec
1903    }
1904
1905    /// Build a persistent session service with a correctly wired `AgentFactory`.
1906    ///
1907    /// The `session_store` is used in two places:
1908    /// 1. As the persistence backend for `PersistentSessionService` (checkpoint/restore).
1909    /// 2. Adapted via `StoreAdapter` and set on `FactoryAgentBuilder.default_session_store`
1910    ///    so that agents use it directly instead of falling back to JSONL.
1911    pub fn persistent(
1912        definition: MobDefinition,
1913        storage: MobStorage,
1914        store_path: PathBuf,
1915        max_sessions: usize,
1916        session_store: Arc<dyn SessionStore>,
1917    ) -> Self {
1918        Self::persistent_inner(
1919            definition,
1920            storage,
1921            store_path,
1922            max_sessions,
1923            session_store,
1924            None,
1925            None,
1926            CapabilityFlags::default(),
1927            None,
1928        )
1929    }
1930
1931    /// Like [`persistent`](Self::persistent), but with a pre-build hook that
1932    /// is called before each agent is constructed. Use this to inject external
1933    /// tools, augment system prompts, or set per-agent labels.
1934    pub fn persistent_with_hook(
1935        definition: MobDefinition,
1936        storage: MobStorage,
1937        store_path: PathBuf,
1938        max_sessions: usize,
1939        session_store: Arc<dyn SessionStore>,
1940        hook: impl Fn(
1941            &mut CreateSessionRequest,
1942        ) -> std::pin::Pin<
1943            Box<dyn std::future::Future<Output = Result<(), SessionError>> + Send + '_>,
1944        > + Send
1945        + Sync
1946        + 'static,
1947    ) -> Self {
1948        Self::persistent_inner(
1949            definition,
1950            storage,
1951            store_path,
1952            max_sessions,
1953            session_store,
1954            None,
1955            Some(Arc::new(hook)),
1956            CapabilityFlags::default(),
1957            None,
1958        )
1959    }
1960
1961    #[allow(clippy::too_many_arguments)]
1962    pub(crate) fn persistent_inner(
1963        definition: MobDefinition,
1964        storage: MobStorage,
1965        store_path: PathBuf,
1966        max_sessions: usize,
1967        session_store: Arc<dyn SessionStore>,
1968        custom_blob_store: Option<Arc<dyn meerkat_core::BlobStore>>,
1969        hook: Option<PreBuildHook>,
1970        mut caps: CapabilityFlags,
1971        after_create_hook: Option<AfterCreateHook>,
1972    ) -> Self {
1973        caps.image_generation |= mob_definition_may_use_image_generation(&definition);
1974        let (binary_blob_store, blob_store): (
1975            Arc<dyn BinaryBlobStore>,
1976            Arc<dyn meerkat_core::BlobStore>,
1977        ) = if let Some(blob_store) = custom_blob_store {
1978            (
1979                Arc::new(BinaryBlobStoreAdapter::new(blob_store.clone())),
1980                blob_store,
1981            )
1982        } else {
1983            let binary_blob_store: Arc<dyn BinaryBlobStore> = match ObjectStoreBlobStore::local(
1984                store_path.join("blobs"),
1985            ) {
1986                Ok(store) => Arc::new(store),
1987                Err(err) => {
1988                    tracing::warn!(
1989                        error = %err,
1990                        "failed to initialize persistent binary blob store; falling back to in-memory blobs"
1991                    );
1992                    Arc::new(ObjectStoreBlobStore::memory())
1993                }
1994            };
1995            let blob_store: Arc<dyn meerkat_core::BlobStore> =
1996                Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
1997            (binary_blob_store, blob_store)
1998        };
1999        // Use a SQLite-backed runtime store so we get BOTH durability across
2000        // process restart AND control-op authority (archive/retire). The
2001        // earlier 0.6.1 wiring used `Some(InMemoryRuntimeStore)`, which was
2002        // a half-fix: it kept the session-service's runtime_store path on
2003        // (so `load_authoritative_session` resolved through runtime_store —
2004        // good for control ops), but the in-memory store died on restart so
2005        // resume failed. Switching the in-memory store for a persistent one
2006        // satisfies both. The store lives at `store_path/runtime.sqlite`,
2007        // sibling to whatever path the caller's `session_store` uses.
2008        let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2009            build_persistent_runtime_store(&store_path);
2010        let runtime_adapter = Arc::new(meerkat_runtime::MeerkatMachine::persistent(
2011            Arc::clone(&runtime_store),
2012            Arc::clone(&blob_store),
2013        ));
2014        let mut factory = AgentFactory::new(&store_path)
2015            .builtins(caps.builtins)
2016            .shell(caps.shell)
2017            .mob(caps.mob)
2018            .comms(caps.comms)
2019            .memory(caps.memory);
2020        if caps.image_generation {
2021            factory = factory.with_image_generation_machine(runtime_adapter.clone());
2022        }
2023        let config = Config::default();
2024        let mut builder = FactoryAgentBuilder::new(factory, config);
2025        builder.default_session_store = Some(Arc::new(StoreAdapter::new(session_store.clone())));
2026        builder.default_blob_store = Some(blob_store.clone());
2027        let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
2028        let session_service: Arc<dyn MobSessionService> =
2029            Arc::new(meerkat_session::PersistentSessionService::new(
2030                builder,
2031                max_sessions,
2032                session_store,
2033                Some(runtime_store),
2034                blob_store,
2035            ));
2036        let hook = hook.unwrap_or_else(no_op_pre_build_hook);
2037        let session_service = Arc::new(PreBuildMobSessionService {
2038            inner: session_service,
2039            hook,
2040            after_create_hook,
2041            runtime_adapter_override: None,
2042        }) as Arc<dyn MobSessionService>;
2043        let (agent_mob_mcp_state, implicit_delegate_retirement_overrides) =
2044            install_agent_mob_tools(mob_tools_slot, Arc::clone(&session_service));
2045        let mut spec = Self::new(definition, storage, session_service);
2046        spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
2047        spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
2048        spec.runtime_adapter = Some(runtime_adapter);
2049        spec.binary_blob_store = Some(binary_blob_store);
2050        spec
2051    }
2052
2053    #[allow(clippy::too_many_arguments)]
2054    pub(crate) fn ephemeral_runtime_backed_inner(
2055        definition: MobDefinition,
2056        storage: MobStorage,
2057        store_path: PathBuf,
2058        max_sessions: usize,
2059        custom_session_store: Option<Arc<dyn SessionStore>>,
2060        custom_blob_store: Option<Arc<dyn meerkat_core::BlobStore>>,
2061        hook: Option<PreBuildHook>,
2062        mut caps: CapabilityFlags,
2063        after_create_hook: Option<AfterCreateHook>,
2064    ) -> Self {
2065        caps.image_generation |= mob_definition_may_use_image_generation(&definition);
2066        let config = Config::default();
2067        let session_store: Arc<dyn SessionStore> = custom_session_store
2068            .clone()
2069            .unwrap_or_else(|| Arc::new(meerkat_store::MemoryStore::new()));
2070        let (binary_blob_store, blob_store): (
2071            Arc<dyn BinaryBlobStore>,
2072            Arc<dyn meerkat_core::BlobStore>,
2073        ) = if let Some(blob_store) = custom_blob_store {
2074            (
2075                Arc::new(BinaryBlobStoreAdapter::new(blob_store.clone())),
2076                blob_store,
2077            )
2078        } else {
2079            let binary_blob_store: Arc<dyn BinaryBlobStore> =
2080                Arc::new(ObjectStoreBlobStore::memory());
2081            let blob_store: Arc<dyn meerkat_core::BlobStore> =
2082                Arc::new(Base64BlobStoreAdapter::new(binary_blob_store.clone()));
2083            (binary_blob_store, blob_store)
2084        };
2085        // Runtime-backed ephemeral mode keeps the live EphemeralSessionService
2086        // as the comms authority, but registers each created session with the
2087        // same in-memory machine used by image generation. Meerkat 0.6.4's
2088        // persistent runtime-backed create path does not expose member comms
2089        // handles early enough for mob edge reconciliation; this bounded bridge
2090        // preserves live comms while avoiding the old "image tool sees the
2091        // session as destroyed" split-machine bug.
2092        let base_runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2093            Arc::new(meerkat_runtime::InMemoryRuntimeStore::new());
2094        let runtime_store: Arc<dyn meerkat_runtime::RuntimeStore> =
2095            if let Some(custom_session_store) = custom_session_store.clone() {
2096                Arc::new(SessionStoreBackedRuntimeStore::new(
2097                    Arc::clone(&base_runtime_store),
2098                    custom_session_store,
2099                ))
2100            } else {
2101                Arc::clone(&base_runtime_store)
2102            };
2103        let runtime_adapter = Arc::new(meerkat_runtime::MeerkatMachine::persistent(
2104            Arc::clone(&runtime_store),
2105            Arc::clone(&blob_store),
2106        ));
2107        let mut factory = AgentFactory::new(&store_path)
2108            .builtins(caps.builtins)
2109            .shell(caps.shell)
2110            .mob(caps.mob)
2111            .comms(caps.comms)
2112            .memory(caps.memory);
2113        if caps.image_generation {
2114            factory = factory.with_image_generation_machine(runtime_adapter.clone());
2115        }
2116        let mut builder = FactoryAgentBuilder::new(factory, config);
2117        builder.default_session_store = Some(Arc::new(StoreAdapter::new(session_store.clone())));
2118        builder.default_blob_store = Some(blob_store.clone());
2119        let mob_tools_slot = Arc::clone(&builder.default_mob_tools);
2120        let session_service: Arc<dyn MobSessionService> =
2121            if let Some(custom_session_store) = custom_session_store {
2122                Arc::new(meerkat_session::PersistentSessionService::new(
2123                    builder,
2124                    max_sessions,
2125                    custom_session_store,
2126                    Some(runtime_store.clone()),
2127                    blob_store,
2128                ))
2129            } else {
2130                Arc::new(meerkat_session::EphemeralSessionService::new(
2131                    builder,
2132                    max_sessions,
2133                ))
2134            };
2135        let hook = hook.unwrap_or_else(no_op_pre_build_hook);
2136        let runtime_adapter_for_after_create = runtime_adapter.clone();
2137        let combined_after_create_hook: AfterCreateHook = Arc::new(move |session_id, ctx| {
2138            let runtime_adapter = runtime_adapter_for_after_create.clone();
2139            let after_create_hook = after_create_hook.clone();
2140            Box::pin(async move {
2141                runtime_adapter.register_session(session_id.clone()).await;
2142                if let Some(after_create_hook) = after_create_hook {
2143                    after_create_hook(session_id, ctx).await;
2144                }
2145            })
2146        });
2147        let session_service = Arc::new(PreBuildMobSessionService {
2148            inner: session_service,
2149            hook,
2150            after_create_hook: Some(combined_after_create_hook),
2151            runtime_adapter_override: Some(runtime_adapter.clone()),
2152        }) as Arc<dyn MobSessionService>;
2153        let (agent_mob_mcp_state, implicit_delegate_retirement_overrides) =
2154            install_agent_mob_tools(mob_tools_slot, Arc::clone(&session_service));
2155        let mut spec = Self::new(definition, storage, session_service);
2156        spec.agent_mob_mcp_state = Some(agent_mob_mcp_state);
2157        spec.implicit_delegate_retirement_overrides = Some(implicit_delegate_retirement_overrides);
2158        spec.runtime_adapter = Some(runtime_adapter);
2159        spec.binary_blob_store = Some(binary_blob_store);
2160        spec
2161    }
2162}
2163
2164/// Error returned by mob runtime operations.
2165#[derive(Debug)]
2166pub enum MobRuntimeError {
2167    Mob(MobError),
2168    InvalidInput(&'static str),
2169    InvalidConfig(String),
2170}
2171
2172impl std::fmt::Display for MobRuntimeError {
2173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2174        match self {
2175            Self::Mob(err) => write!(f, "{err}"),
2176            Self::InvalidInput(message) => write!(f, "{message}"),
2177            Self::InvalidConfig(message) => write!(f, "{message}"),
2178        }
2179    }
2180}
2181
2182impl std::error::Error for MobRuntimeError {}
2183
2184impl From<MobError> for MobRuntimeError {
2185    fn from(value: MobError) -> Self {
2186        Self::Mob(value)
2187    }
2188}
2189
2190// Mobkit's `MobMemberSnapshot`, `MobReconcileReport`, `MobReconcileOptions`
2191// wrapper types were removed as part of the meerkat 0.6 thin-shell cleanup.
2192// Consumers now use `meerkat_mob::runtime::MobMemberListEntry` and
2193// `meerkat_mob::runtime::reconcile::{ReconcileReport, ReconcileOptions,
2194// MemberFilter}` directly.
2195
2196/// Context delivered to [`SessionHook::after_create`] after a session is
2197/// successfully created.
2198#[derive(Clone, Debug)]
2199pub struct SessionCreatedContext {
2200    pub model: String,
2201    pub labels: std::collections::BTreeMap<String, String>,
2202    pub system_prompt: Option<String>,
2203}
2204
2205/// Hook trait for customising session lifecycle.
2206///
2207/// - `before_create` — runs before `create_session`. Returning `Err` aborts
2208///   session creation (both Rust-native and Python/TS boundary).
2209/// - `after_create` — runs after session creation succeeds. Best-effort: errors
2210///   are logged at `warn`, not propagated. The session is already live.
2211#[async_trait]
2212pub trait SessionHook: Send + Sync {
2213    /// Called before session creation. Mutate the request to inject tools,
2214    /// augment prompts, set labels, override model, etc. Return `Err` to
2215    /// abort session creation.
2216    async fn before_create(&self, _req: &mut CreateSessionRequest) -> Result<(), SessionError> {
2217        Ok(())
2218    }
2219
2220    /// Called after a session is successfully created. Best-effort — errors
2221    /// logged, not propagated.
2222    async fn after_create(
2223        &self,
2224        _session_id: &meerkat_core::types::SessionId,
2225        _ctx: &SessionCreatedContext,
2226    ) {
2227    }
2228}
2229
2230/// Capability flags controlling which agent capabilities are enabled.
2231#[derive(Clone, Copy, Debug)]
2232pub struct CapabilityFlags {
2233    pub builtins: bool,
2234    pub shell: bool,
2235    pub mob: bool,
2236    pub comms: bool,
2237    pub memory: bool,
2238    pub image_generation: bool,
2239}
2240
2241impl Default for CapabilityFlags {
2242    fn default() -> Self {
2243        Self {
2244            builtins: true,
2245            shell: true,
2246            mob: true,
2247            comms: true,
2248            memory: true,
2249            image_generation: false,
2250        }
2251    }
2252}
2253
2254/// Backward-compatible alias for [`MobRuntime`].
2255pub type RealMobRuntime = MobRuntime;
2256
2257/// Live mob runtime backed by a `MobHandle`.
2258#[derive(Clone)]
2259pub struct MobRuntime {
2260    handle: MobHandle,
2261    session_service: Option<Arc<dyn MobSessionService>>,
2262    agent_mob_mcp_state: Option<Arc<meerkat_mob_mcp::MobMcpState>>,
2263    implicit_delegate_retirement_overrides: Option<ImplicitDelegateRetirementOverrides>,
2264    binary_blob_store: Option<Arc<dyn BinaryBlobStore>>,
2265    baseline_member_specs: Arc<tokio::sync::RwLock<Vec<SpawnMemberSpec>>>,
2266    /// Keeps the ephemeral temp directory alive for the lifetime of the runtime.
2267    /// Dropped when the runtime is dropped, cleaning up the temp dir.
2268    _ephemeral_dir: Option<Arc<tempfile::TempDir>>,
2269}
2270
2271impl MobRuntime {
2272    pub async fn bootstrap(spec: MobBootstrapSpec) -> Result<Self, MobRuntimeError> {
2273        let ephemeral_dir = spec._ephemeral_dir.clone();
2274        let session_service = spec.session_service.clone();
2275        let binary_blob_store = spec.binary_blob_store.clone();
2276        let mob_id = spec.definition.id.clone();
2277        let agent_mob_mcp_state = spec.agent_mob_mcp_state.clone();
2278        let implicit_delegate_retirement_overrides =
2279            spec.implicit_delegate_retirement_overrides.clone();
2280        let effective_runtime_adapter = spec
2281            .runtime_adapter
2282            .clone()
2283            .or_else(|| session_service.runtime_adapter());
2284
2285        let mut builder = MobBuilder::new(spec.definition, spec.storage);
2286
2287        // MobActor's autonomous readiness/comms-drain path consults the
2288        // builder-published runtime adapter directly. For session services
2289        // that already embed a runtime adapter (definition-based ephemeral
2290        // and persistent-with-runtime-backed-service), forward that adapter
2291        // explicitly so autonomous members do not come up session-backed but
2292        // runtime-unattached.
2293        if let Some(adapter) = effective_runtime_adapter {
2294            builder = builder.with_runtime_adapter(adapter);
2295        }
2296
2297        builder = builder
2298            .with_session_service(session_service.clone())
2299            .allow_ephemeral_sessions(spec.options.allow_ephemeral_sessions)
2300            .notify_orchestrator_on_resume(spec.options.notify_orchestrator_on_resume);
2301
2302        if let Some(client) = spec.options.default_llm_client {
2303            builder = builder.with_default_llm_client(ReplaySanitizingLlmClient::wrap(client));
2304        }
2305
2306        let handle = builder.create().await?;
2307        if let Some(state) = agent_mob_mcp_state.as_ref() {
2308            state.mob_insert_handle(mob_id, handle.clone()).await;
2309        }
2310        Ok(Self {
2311            handle,
2312            session_service: Some(session_service),
2313            agent_mob_mcp_state,
2314            implicit_delegate_retirement_overrides,
2315            binary_blob_store,
2316            baseline_member_specs: Arc::new(tokio::sync::RwLock::new(Vec::new())),
2317            _ephemeral_dir: ephemeral_dir,
2318        })
2319    }
2320
2321    pub fn from_handle(handle: MobHandle) -> Self {
2322        Self {
2323            handle,
2324            session_service: None,
2325            agent_mob_mcp_state: None,
2326            implicit_delegate_retirement_overrides: None,
2327            binary_blob_store: None,
2328            baseline_member_specs: Arc::new(tokio::sync::RwLock::new(Vec::new())),
2329            _ephemeral_dir: None,
2330        }
2331    }
2332
2333    pub fn handle(&self) -> MobHandle {
2334        self.handle.clone()
2335    }
2336
2337    pub fn agent_mob_mcp_state(&self) -> Option<Arc<meerkat_mob_mcp::MobMcpState>> {
2338        self.agent_mob_mcp_state.clone()
2339    }
2340
2341    pub(crate) fn implicit_delegate_retirement_overrides(
2342        &self,
2343    ) -> Option<ImplicitDelegateRetirementOverrides> {
2344        self.implicit_delegate_retirement_overrides.clone()
2345    }
2346
2347    pub async fn set_baseline_member_specs(&self, specs: Vec<SpawnMemberSpec>) {
2348        *self.baseline_member_specs.write().await = specs;
2349    }
2350
2351    pub async fn baseline_member_specs(&self) -> Vec<SpawnMemberSpec> {
2352        self.baseline_member_specs.read().await.clone()
2353    }
2354
2355    pub async fn read_session_history(
2356        &self,
2357        session_id_str: &str,
2358        offset: usize,
2359        limit: Option<usize>,
2360    ) -> Result<SessionHistoryPage, MobRuntimeError> {
2361        if session_id_str.trim().is_empty() {
2362            return Err(MobRuntimeError::InvalidInput(
2363                "session_id must not be empty",
2364            ));
2365        }
2366        let Some(session_service) = self.session_service.as_ref() else {
2367            return Err(MobRuntimeError::InvalidInput(
2368                "session history unavailable for this runtime",
2369            ));
2370        };
2371        let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2372            .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2373        SessionServiceHistoryExt::read_history(
2374            session_service.as_ref(),
2375            &session_id,
2376            SessionHistoryQuery { offset, limit },
2377        )
2378        .await
2379        .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))
2380    }
2381
2382    #[allow(dead_code)]
2383    pub(crate) async fn runtime_state_for_session(
2384        &self,
2385        session_id_str: &str,
2386    ) -> Result<Option<meerkat_runtime::RuntimeState>, MobRuntimeError> {
2387        if session_id_str.trim().is_empty() {
2388            return Err(MobRuntimeError::InvalidInput(
2389                "session_id must not be empty",
2390            ));
2391        }
2392        let Some(session_service) = self.session_service.as_ref() else {
2393            return Ok(None);
2394        };
2395        let Some(runtime_adapter) = session_service.runtime_adapter() else {
2396            return Ok(None);
2397        };
2398        let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2399            .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2400        let state = meerkat_runtime::service_ext::SessionServiceRuntimeExt::runtime_state(
2401            runtime_adapter.as_ref(),
2402            &session_id,
2403        )
2404        .await
2405        .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))?;
2406        Ok(Some(state))
2407    }
2408
2409    #[allow(dead_code)]
2410    pub(crate) async fn comms_runtime_for_session(
2411        &self,
2412        session_id_str: &str,
2413    ) -> Result<Option<Arc<dyn CommsRuntime>>, MobRuntimeError> {
2414        if session_id_str.trim().is_empty() {
2415            return Err(MobRuntimeError::InvalidInput(
2416                "session_id must not be empty",
2417            ));
2418        }
2419        let Some(session_service) = self.session_service.as_ref() else {
2420            return Ok(None);
2421        };
2422        let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2423            .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2424        Ok(
2425            meerkat_core::service::SessionServiceCommsExt::comms_runtime(
2426                session_service.as_ref(),
2427                &session_id,
2428            )
2429            .await,
2430        )
2431    }
2432
2433    #[allow(dead_code)]
2434    pub(crate) async fn active_input_ids_for_session(
2435        &self,
2436        session_id_str: &str,
2437    ) -> Result<Option<Vec<String>>, MobRuntimeError> {
2438        if session_id_str.trim().is_empty() {
2439            return Err(MobRuntimeError::InvalidInput(
2440                "session_id must not be empty",
2441            ));
2442        }
2443        let Some(session_service) = self.session_service.as_ref() else {
2444            return Ok(None);
2445        };
2446        let Some(runtime_adapter) = session_service.runtime_adapter() else {
2447            return Ok(None);
2448        };
2449        let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2450            .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2451        let input_ids = meerkat_runtime::service_ext::SessionServiceRuntimeExt::list_active_inputs(
2452            runtime_adapter.as_ref(),
2453            &session_id,
2454        )
2455        .await
2456        .map_err(|err| MobRuntimeError::Mob(MobError::Internal(err.to_string())))?;
2457        Ok(Some(
2458            input_ids.into_iter().map(|id| id.to_string()).collect(),
2459        ))
2460    }
2461
2462    #[allow(dead_code)]
2463    pub(crate) async fn ensure_comms_drain_for_session(
2464        &self,
2465        session_id_str: &str,
2466    ) -> Result<Option<bool>, MobRuntimeError> {
2467        if session_id_str.trim().is_empty() {
2468            return Err(MobRuntimeError::InvalidInput(
2469                "session_id must not be empty",
2470            ));
2471        }
2472        let Some(session_service) = self.session_service.as_ref() else {
2473            return Ok(None);
2474        };
2475        let Some(runtime_adapter) = session_service.runtime_adapter() else {
2476            return Ok(None);
2477        };
2478        let session_id = meerkat_core::types::SessionId::parse(session_id_str)
2479            .map_err(|_| MobRuntimeError::InvalidInput("invalid session_id format"))?;
2480        let comms_runtime = meerkat_core::service::SessionServiceCommsExt::comms_runtime(
2481            session_service.as_ref(),
2482            &session_id,
2483        )
2484        .await;
2485        if let Some(comms) = comms_runtime {
2486            let _handle = meerkat_runtime::comms_drain::spawn_comms_drain(
2487                runtime_adapter.clone(),
2488                session_id,
2489                comms,
2490                None,
2491            );
2492            Ok(Some(true))
2493        } else {
2494            Ok(Some(false))
2495        }
2496    }
2497
2498    /// Access the session service this runtime was bootstrapped with, if any.
2499    ///
2500    /// Present for `MobRuntime::bootstrap(...)`-produced runtimes; `None` for
2501    /// `MobRuntime::from_handle(...)`. HTTP handlers that need to read session
2502    /// history reach through this accessor.
2503    pub fn session_service(&self) -> Option<&Arc<dyn MobSessionService>> {
2504        self.session_service.as_ref()
2505    }
2506
2507    pub fn binary_blob_store(&self) -> Option<Arc<dyn BinaryBlobStore>> {
2508        self.binary_blob_store.clone()
2509    }
2510}
2511
2512/// Project a meerkat `MobMemberListEntry` into mobkit's HTTP JSON shape.
2513///
2514/// Aligns with meerkat 0.6's lightweight-roster design: list entries do
2515/// not carry a bridge `session_id`. Callers needing the realtime session
2516/// for a member must use `mobkit/member_status`, which serializes
2517/// `MobMemberSnapshot.current_session_id` natively.
2518pub fn member_entry_to_json(entry: &meerkat_mob::runtime::MobMemberListEntry) -> serde_json::Value {
2519    serde_json::to_value(entry).unwrap_or(serde_json::Value::Null)
2520}
2521
2522pub fn content_input_has_images(content: &meerkat_core::ContentInput) -> bool {
2523    match content {
2524        meerkat_core::ContentInput::Text(_) => false,
2525        meerkat_core::ContentInput::Blocks(blocks) => blocks
2526            .iter()
2527            .any(|block| matches!(block, meerkat_core::ContentBlock::Image { .. })),
2528    }
2529}
2530
2531pub fn model_capabilities_for_model(
2532    provider: Provider,
2533    model: &str,
2534) -> crate::runtime::ConsoleModelCapabilities {
2535    let image_input = meerkat_core::model_profile::profile_for(provider, model)
2536        .map(|profile| profile.vision)
2537        .unwrap_or(false);
2538    crate::runtime::ConsoleModelCapabilities { image_input }
2539}
2540
2541pub fn model_capabilities_for_profile(
2542    profile: &Profile,
2543) -> crate::runtime::ConsoleModelCapabilities {
2544    let image_input = Provider::infer_from_model(&profile.model)
2545        .and_then(|provider| meerkat_core::model_profile::profile_for(provider, &profile.model))
2546        .map(|profile| profile.vision)
2547        .unwrap_or(false);
2548    crate::runtime::ConsoleModelCapabilities { image_input }
2549}
2550
2551pub fn model_capabilities_for_role(
2552    definition: &MobDefinition,
2553    role: &str,
2554) -> crate::runtime::ConsoleModelCapabilities {
2555    let profile_name = ProfileName::from(role);
2556    definition
2557        .resolve_inline_profile(&profile_name)
2558        .map(model_capabilities_for_profile)
2559        .unwrap_or(crate::runtime::ConsoleModelCapabilities { image_input: false })
2560}
2561
2562pub fn model_capabilities_for_member_entry(
2563    definition: &MobDefinition,
2564    entry: &meerkat_mob::runtime::MobMemberListEntry,
2565) -> crate::runtime::ConsoleModelCapabilities {
2566    model_capabilities_for_role(definition, entry.role.as_str())
2567}
2568
2569pub async fn model_capabilities_for_member(
2570    handle: &MobHandle,
2571    session_service: Option<&Arc<dyn MobSessionService>>,
2572    member_id: &meerkat_mob::ids::MeerkatId,
2573) -> crate::runtime::ConsoleModelCapabilities {
2574    if let Some(service) = session_service
2575        && let Some(session_id) = handle.resolve_bridge_session_id(member_id).await
2576        && let Ok(view) = service.read(&session_id).await
2577    {
2578        return model_capabilities_for_model(view.state.provider, &view.state.model);
2579    }
2580
2581    handle
2582        .get_member(member_id)
2583        .await
2584        .map(|member| model_capabilities_for_role(handle.definition(), member.role.as_str()))
2585        .unwrap_or(crate::runtime::ConsoleModelCapabilities { image_input: false })
2586}
2587
2588pub async fn assert_member_accepts_images(
2589    handle: &MobHandle,
2590    session_service: Option<&Arc<dyn MobSessionService>>,
2591    member_id: &str,
2592    content: &meerkat_core::ContentInput,
2593) -> Result<(), MobRuntimeError> {
2594    if !content_input_has_images(content) {
2595        return Ok(());
2596    }
2597    let mid = meerkat_mob::ids::MeerkatId::from(member_id);
2598    let Some(member) = handle.get_member(&mid).await else {
2599        return Err(MobRuntimeError::InvalidInput("member not found"));
2600    };
2601    let caps = model_capabilities_for_member(handle, session_service, &member.agent_identity).await;
2602    if caps.image_input {
2603        Ok(())
2604    } else {
2605        Err(MobRuntimeError::InvalidInput(
2606            "target member model cannot accept image input",
2607        ))
2608    }
2609}
2610
2611/// Send content to a mob member and return the bridge session id that
2612/// accepted the injection.
2613///
2614/// Validates that `member_id` and `content` are non-empty, calls
2615/// `handle.member(&id).send(...)`, then queries the mob handle for the
2616/// currently-bound bridge session id. Meerkat 0.6 removed `session_id` from
2617/// `MemberDeliveryReceipt`; this helper is mobkit's glue for the
2618/// send-and-learn-what-session-took-it pattern used by HTTP/RPC handlers and
2619/// the scheduled-dispatch injection path.
2620pub async fn send_message_on_mob(
2621    handle: &MobHandle,
2622    member_id: &str,
2623    content: impl Into<meerkat_core::ContentInput>,
2624) -> Result<String, MobRuntimeError> {
2625    send_message_on_mob_with_mode(
2626        handle,
2627        member_id,
2628        content,
2629        meerkat_core::types::HandlingMode::Queue,
2630    )
2631    .await
2632}
2633
2634/// Variant that accepts the console's `Queue`/`Steer` wire contract while
2635/// delivering through MobKit's direct member-send path.
2636pub async fn send_message_on_mob_with_mode(
2637    handle: &MobHandle,
2638    member_id: &str,
2639    content: impl Into<meerkat_core::ContentInput>,
2640    handling_mode: meerkat_core::types::HandlingMode,
2641) -> Result<String, MobRuntimeError> {
2642    if member_id.trim().is_empty() {
2643        return Err(MobRuntimeError::InvalidInput("member_id must not be empty"));
2644    }
2645    let content = content.into();
2646    let is_empty = match &content {
2647        meerkat_core::ContentInput::Text(s) => s.trim().is_empty(),
2648        meerkat_core::ContentInput::Blocks(blocks) => blocks.is_empty(),
2649    };
2650    if is_empty {
2651        return Err(MobRuntimeError::InvalidInput("content must not be empty"));
2652    }
2653    let mid = meerkat_mob::ids::MeerkatId::from(member_id);
2654    let _receipt = handle
2655        .member(&mid)
2656        .await?
2657        .send(content, handling_mode)
2658        .await?;
2659    let session_id = handle
2660        .resolve_bridge_session_id(&mid)
2661        .await
2662        .ok_or_else(|| {
2663            MobRuntimeError::Mob(MobError::Internal(
2664                "member has no bridge session after send".to_string(),
2665            ))
2666        })?;
2667    Ok(session_id.to_string())
2668}
2669
2670#[cfg(test)]
2671#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2672mod tests {
2673    use super::*;
2674
2675    #[test]
2676    fn delegate_tool_schema_exposes_idle_retire_secs() {
2677        let tool = meerkat_core::types::ToolDef::new(
2678            "delegate",
2679            "Delegate work",
2680            serde_json::json!({
2681                "type": "object",
2682                "properties": {
2683                    "task": {"type": "string"}
2684                },
2685                "required": ["task"]
2686            }),
2687        );
2688
2689        let patched = delegate_tool_def_with_idle_retire_secs(&tool);
2690        let idle_retire_secs = &patched.input_schema["properties"]["idle_retire_secs"];
2691
2692        assert!(patched.description.contains("IDLE RETIREMENT:"));
2693        assert_eq!(idle_retire_secs["anyOf"][0]["type"], "integer");
2694        assert_eq!(idle_retire_secs["anyOf"][0]["minimum"], 0);
2695        assert_eq!(idle_retire_secs["anyOf"][1]["type"], "null");
2696    }
2697
2698    #[test]
2699    fn delegate_idle_retire_secs_arg_is_stripped_and_parsed() {
2700        let mut args = serde_json::json!({
2701            "task": "inspect",
2702            "idle_retire_secs": 42
2703        });
2704
2705        let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
2706            .expect("valid idle retire arg");
2707
2708        assert_eq!(parsed, Some(DelegateIdleRetireOverride::Seconds(42)));
2709        assert!(args.get("idle_retire_secs").is_none());
2710    }
2711
2712    #[test]
2713    fn delegate_idle_retire_secs_null_disables_member_retirement() {
2714        let mut args = serde_json::json!({
2715            "task": "inspect",
2716            "idle_retire_secs": null
2717        });
2718
2719        let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
2720            .expect("valid idle retire arg");
2721
2722        assert_eq!(parsed, Some(DelegateIdleRetireOverride::Disabled));
2723        assert!(args.get("idle_retire_secs").is_none());
2724    }
2725
2726    #[test]
2727    fn delegate_idle_retire_secs_omitted_inherits_runtime_default() {
2728        let mut args = serde_json::json!({"task": "inspect"});
2729
2730        let parsed = delegate_idle_retire_override_from_args("delegate", &mut args)
2731            .expect("omitted idle retire arg");
2732
2733        assert_eq!(parsed, None);
2734        assert_eq!(args, serde_json::json!({"task": "inspect"}));
2735    }
2736
2737    #[test]
2738    fn delegate_idle_retire_secs_rejects_negative_or_fractional_values() {
2739        let mut negative = serde_json::json!({"task": "inspect", "idle_retire_secs": -1});
2740        let mut fractional = serde_json::json!({"task": "inspect", "idle_retire_secs": 1.5});
2741
2742        assert!(delegate_idle_retire_override_from_args("delegate", &mut negative).is_err());
2743        assert!(delegate_idle_retire_override_from_args("delegate", &mut fractional).is_err());
2744    }
2745
2746    #[tokio::test]
2747    async fn implicit_delegate_retirement_overrides_round_trip_per_member() {
2748        let overrides = ImplicitDelegateRetirementOverrides::default();
2749
2750        overrides
2751            .set("mob-a", "worker-1", DelegateIdleRetireOverride::Seconds(12))
2752            .await;
2753        overrides
2754            .set("mob-a", "worker-2", DelegateIdleRetireOverride::Disabled)
2755            .await;
2756
2757        assert_eq!(
2758            overrides.get("mob-a", "worker-1").await,
2759            Some(DelegateIdleRetireOverride::Seconds(12))
2760        );
2761        assert_eq!(
2762            overrides.get("mob-a", "worker-2").await,
2763            Some(DelegateIdleRetireOverride::Disabled)
2764        );
2765        assert_eq!(overrides.get("mob-a", "worker-3").await, None);
2766    }
2767
2768    #[test]
2769    fn image_generation_substrate_defaults_off_for_inline_profiles() {
2770        let definition = meerkat_mob::MobDefinition::from_toml(
2771            r#"
2772[mob]
2773id = "test"
2774
2775[profiles.worker]
2776model = "gpt-5.5"
2777
2778[profiles.worker.tools]
2779builtins = true
2780"#,
2781        )
2782        .unwrap_or_else(|e| panic!("{e}"));
2783
2784        assert!(
2785            !mob_definition_may_use_image_generation(&definition),
2786            "inline profiles should not wire the image substrate unless a profile opts in"
2787        );
2788    }
2789
2790    #[test]
2791    fn image_generation_substrate_follows_profile_tool_config() {
2792        let definition = meerkat_mob::MobDefinition::from_toml(
2793            r#"
2794[mob]
2795id = "test"
2796
2797[profiles.commander]
2798model = "gpt-5.5"
2799
2800[profiles.commander.tools]
2801builtins = true
2802image_generation = true
2803
2804[profiles.investigator]
2805model = "gpt-5.5"
2806
2807[profiles.investigator.tools]
2808builtins = true
2809image_generation = false
2810"#,
2811        )
2812        .unwrap_or_else(|e| panic!("{e}"));
2813
2814        let commander = definition.profiles["commander"].as_inline().unwrap();
2815        let investigator = definition.profiles["investigator"].as_inline().unwrap();
2816        assert!(commander.tools.image_generation);
2817        assert!(!investigator.tools.image_generation);
2818        assert!(
2819            mob_definition_may_use_image_generation(&definition),
2820            "one opt-in profile is enough to wire substrate; Meerkat gates visibility per profile"
2821        );
2822    }
2823
2824    #[test]
2825    fn image_generation_profiles_can_disable_builtins_with_meerkat_062() {
2826        let definition = meerkat_mob::MobDefinition::from_toml(
2827            r#"
2828[mob]
2829id = "test"
2830
2831[profiles.commander]
2832model = "gpt-5.5"
2833
2834[profiles.commander.tools]
2835builtins = false
2836image_generation = true
2837"#,
2838        )
2839        .unwrap_or_else(|e| panic!("{e}"));
2840
2841        let commander = definition.profiles["commander"].as_inline().unwrap();
2842        assert!(!commander.tools.builtins);
2843        assert!(commander.tools.image_generation);
2844        assert!(
2845            mob_definition_may_use_image_generation(&definition),
2846            "image generation now has its own Meerkat tool gate"
2847        );
2848    }
2849
2850    #[test]
2851    fn image_generation_substrate_is_conservative_for_realm_profile_refs() {
2852        let definition = meerkat_mob::MobDefinition::from_toml(
2853            r#"
2854[mob]
2855id = "test"
2856
2857[profiles.worker]
2858realm_profile = "worker-v2"
2859"#,
2860        )
2861        .unwrap_or_else(|e| panic!("{e}"));
2862
2863        assert!(
2864            mob_definition_may_use_image_generation(&definition),
2865            "realm profiles resolve at spawn time, so MobKit wires substrate and lets Meerkat enforce profile policy"
2866        );
2867    }
2868
2869    #[test]
2870    fn sanitize_llm_request_drops_replay_unsafe_server_tool_blocks() {
2871        let request = meerkat_client::LlmRequest::new(
2872            "gpt-5.5",
2873            vec![meerkat_core::Message::BlockAssistant(
2874                meerkat_core::BlockAssistantMessage::new(
2875                    vec![
2876                        meerkat_core::AssistantBlock::Text {
2877                            text: "done".to_string(),
2878                            meta: None,
2879                        },
2880                        meerkat_core::AssistantBlock::ServerToolContent {
2881                            id: Some("ws-stream".to_string()),
2882                            name: "web_search".to_string(),
2883                            content: serde_json::json!({
2884                                "type": "response.web_search_call.searching",
2885                                "item_id": "ws_123"
2886                            }),
2887                            meta: None,
2888                        },
2889                        meerkat_core::AssistantBlock::ServerToolContent {
2890                            id: Some("ws_123".to_string()),
2891                            name: "web_search_call".to_string(),
2892                            content: serde_json::json!({
2893                                "type": "web_search_call",
2894                                "id": "ws_123",
2895                                "status": "completed"
2896                            }),
2897                            meta: None,
2898                        },
2899                        meerkat_core::AssistantBlock::ServerToolContent {
2900                            id: None,
2901                            name: "web_search_annotations".to_string(),
2902                            content: serde_json::json!({
2903                                "type": "message_annotations",
2904                                "annotations": []
2905                            }),
2906                            meta: None,
2907                        },
2908                    ],
2909                    meerkat_core::StopReason::EndTurn,
2910                ),
2911            )],
2912        );
2913
2914        let sanitized = sanitize_llm_request_for_stateless_replay(&request);
2915        let meerkat_core::Message::BlockAssistant(assistant) = &sanitized.messages[0] else {
2916            panic!("expected block assistant");
2917        };
2918
2919        assert_eq!(assistant.blocks.len(), 2);
2920        assert!(matches!(
2921            assistant.blocks[0],
2922            meerkat_core::AssistantBlock::Text { .. }
2923        ));
2924        assert!(matches!(
2925            assistant.blocks[1],
2926            meerkat_core::AssistantBlock::ServerToolContent { ref name, .. }
2927                if name == "web_search_call"
2928        ));
2929    }
2930
2931    #[test]
2932    fn sanitize_llm_request_preserves_generated_images_for_meerkat_062() {
2933        let request = meerkat_client::LlmRequest::new(
2934            "gpt-5.5",
2935            vec![meerkat_core::Message::BlockAssistant(
2936                meerkat_core::BlockAssistantMessage::new(
2937                    vec![
2938                        meerkat_core::AssistantBlock::Text {
2939                            text: "visible".to_string(),
2940                            meta: None,
2941                        },
2942                        generated_image_block_for_test(),
2943                    ],
2944                    meerkat_core::StopReason::EndTurn,
2945                ),
2946            )],
2947        );
2948
2949        let sanitized = sanitize_llm_request_for_stateless_replay(&request);
2950
2951        let meerkat_core::Message::BlockAssistant(original_assistant) = &request.messages[0] else {
2952            panic!("expected original block assistant");
2953        };
2954        assert!(
2955            original_assistant
2956                .blocks
2957                .iter()
2958                .any(|block| matches!(block, meerkat_core::AssistantBlock::Image { .. })),
2959            "request-view sanitization must not rewrite canonical caller-owned messages"
2960        );
2961
2962        let meerkat_core::Message::BlockAssistant(sanitized_assistant) = &sanitized.messages[0]
2963        else {
2964            panic!("expected sanitized block assistant");
2965        };
2966        assert!(
2967            sanitized_assistant
2968                .blocks
2969                .iter()
2970                .any(|block| matches!(block, meerkat_core::AssistantBlock::Image { .. })),
2971            "Meerkat 0.6.2 owns provider replay projection for generated images"
2972        );
2973    }
2974
2975    #[derive(Default)]
2976    struct CapturingLlmClient {
2977        projected_messages: std::sync::Mutex<Vec<meerkat_core::Message>>,
2978    }
2979
2980    #[async_trait]
2981    impl LlmClient for CapturingLlmClient {
2982        fn project_replay_messages(
2983            &self,
2984            messages: &[meerkat_core::Message],
2985        ) -> Result<Vec<meerkat_core::Message>, meerkat_client::LlmError> {
2986            *self
2987                .projected_messages
2988                .lock()
2989                .unwrap_or_else(std::sync::PoisonError::into_inner) = messages.to_vec();
2990            Ok(messages.to_vec())
2991        }
2992
2993        fn stream<'a>(&'a self, _request: &'a LlmRequest) -> LlmStream<'a> {
2994            Box::pin(futures::stream::iter([Ok(
2995                meerkat_client::LlmEvent::Done {
2996                    outcome: meerkat_client::LlmDoneOutcome::Success {
2997                        stop_reason: meerkat_core::StopReason::EndTurn,
2998                    },
2999                },
3000            )]))
3001        }
3002
3003        fn provider(&self) -> &'static str {
3004            "openai"
3005        }
3006
3007        async fn health_check(&self) -> Result<(), meerkat_client::LlmError> {
3008            Ok(())
3009        }
3010    }
3011
3012    #[test]
3013    fn replay_sanitizing_llm_client_delegates_provider_projection() {
3014        let capture = Arc::new(CapturingLlmClient::default());
3015        let inner: Arc<dyn LlmClient> = capture.clone();
3016        let wrapped = ReplaySanitizingLlmClient::new(inner);
3017        let messages = vec![meerkat_core::Message::BlockAssistant(
3018            meerkat_core::BlockAssistantMessage::new(
3019                vec![
3020                    meerkat_core::AssistantBlock::Text {
3021                        text: "visible".to_string(),
3022                        meta: None,
3023                    },
3024                    meerkat_core::AssistantBlock::ServerToolContent {
3025                        id: Some("ws-stream".to_string()),
3026                        name: "web_search".to_string(),
3027                        content: serde_json::json!({
3028                            "type": "response.web_search_call.searching",
3029                            "item_id": "ws_123"
3030                        }),
3031                        meta: None,
3032                    },
3033                ],
3034                meerkat_core::StopReason::EndTurn,
3035            ),
3036        )];
3037
3038        let projected = wrapped
3039            .project_replay_messages(&messages)
3040            .expect("wrapped client should delegate provider projection");
3041
3042        let seen = capture
3043            .projected_messages
3044            .lock()
3045            .unwrap_or_else(std::sync::PoisonError::into_inner)
3046            .clone();
3047        let meerkat_core::Message::BlockAssistant(assistant) = &seen[0] else {
3048            panic!("expected block assistant");
3049        };
3050        assert_eq!(
3051            assistant.blocks.len(),
3052            1,
3053            "MobKit sanitization must happen before Meerkat provider projection"
3054        );
3055        assert!(matches!(
3056            assistant.blocks[0],
3057            meerkat_core::AssistantBlock::Text { .. }
3058        ));
3059        assert_eq!(
3060            serde_json::to_value(&projected).expect("projected messages serialize"),
3061            serde_json::to_value(&seen).expect("seen messages serialize")
3062        );
3063    }
3064
3065    #[derive(Default)]
3066    struct CapturingAgentLlmClient {
3067        seen_messages: std::sync::Mutex<Vec<meerkat_core::Message>>,
3068    }
3069
3070    #[async_trait]
3071    impl meerkat_core::AgentLlmClient for CapturingAgentLlmClient {
3072        async fn stream_response(
3073            &self,
3074            messages: &[meerkat_core::Message],
3075            _tools: &[Arc<meerkat_core::ToolDef>],
3076            _max_tokens: u32,
3077            _temperature: Option<f32>,
3078            _provider_params: Option<
3079                &meerkat_core::lifecycle::run_primitive::ProviderParamsOverride,
3080            >,
3081        ) -> Result<meerkat_core::agent::LlmStreamResult, meerkat_core::AgentError> {
3082            *self
3083                .seen_messages
3084                .lock()
3085                .unwrap_or_else(std::sync::PoisonError::into_inner) = messages.to_vec();
3086            Ok(meerkat_core::agent::LlmStreamResult::new(
3087                Vec::new(),
3088                meerkat_core::StopReason::EndTurn,
3089                meerkat_core::Usage::default(),
3090            ))
3091        }
3092
3093        fn provider(&self) -> &'static str {
3094            "openai"
3095        }
3096
3097        fn model(&self) -> &'static str {
3098            "gpt-5.5"
3099        }
3100    }
3101
3102    #[tokio::test]
3103    async fn sanitize_agent_llm_client_drops_replay_unsafe_server_tool_blocks() {
3104        let capture = Arc::new(CapturingAgentLlmClient::default());
3105        let inner: Arc<dyn meerkat_core::AgentLlmClient> = capture.clone();
3106        let wrapped = ReplaySanitizingAgentLlmClient::wrap(inner);
3107        let messages = vec![meerkat_core::Message::BlockAssistant(
3108            meerkat_core::BlockAssistantMessage::new(
3109                vec![
3110                    meerkat_core::AssistantBlock::Text {
3111                        text: "visible".to_string(),
3112                        meta: None,
3113                    },
3114                    meerkat_core::AssistantBlock::ServerToolContent {
3115                        id: Some("ws-stream".to_string()),
3116                        name: "web_search".to_string(),
3117                        content: serde_json::json!({
3118                            "type": "response.web_search_call.searching",
3119                            "item_id": "ws_123"
3120                        }),
3121                        meta: None,
3122                    },
3123                    meerkat_core::AssistantBlock::ServerToolContent {
3124                        id: Some("ok".to_string()),
3125                        name: "web_search_call".to_string(),
3126                        content: serde_json::json!({
3127                            "type": "web_search_call",
3128                            "id": "ws_123",
3129                            "status": "completed"
3130                        }),
3131                        meta: None,
3132                    },
3133                ],
3134                meerkat_core::StopReason::EndTurn,
3135            ),
3136        )];
3137        let tools: Vec<Arc<meerkat_core::ToolDef>> = Vec::new();
3138
3139        wrapped
3140            .stream_response(&messages, &tools, 512, None, None)
3141            .await
3142            .expect("wrapped client should delegate");
3143
3144        let seen = capture
3145            .seen_messages
3146            .lock()
3147            .unwrap_or_else(std::sync::PoisonError::into_inner)
3148            .clone();
3149        let meerkat_core::Message::BlockAssistant(assistant) = &seen[0] else {
3150            panic!("expected block assistant");
3151        };
3152        assert_eq!(assistant.blocks.len(), 2);
3153        assert!(matches!(
3154            assistant.blocks[0],
3155            meerkat_core::AssistantBlock::Text { .. }
3156        ));
3157        assert!(matches!(
3158            assistant.blocks[1],
3159            meerkat_core::AssistantBlock::ServerToolContent { ref name, .. }
3160                if name == "web_search_call"
3161        ));
3162    }
3163
3164    fn generated_image_block_for_test() -> meerkat_core::AssistantBlock {
3165        serde_json::from_value(serde_json::json!({
3166            "block_type": "image",
3167            "data": {
3168                "image_id": "00000000-0000-0000-0000-000000000051",
3169                "blob_ref": {
3170                    "blob_id": "sha256:test-generated-image",
3171                    "media_type": "image/png"
3172                },
3173                "media_type": "image/png",
3174                "width": 1024,
3175                "height": 1024,
3176                "revised_prompt": { "disposition": "not_requested" },
3177                "meta": { "provider": "not_emitted" }
3178            }
3179        }))
3180        .expect("test image block should deserialize")
3181    }
3182
3183    #[test]
3184    fn sanitize_message_preserves_assistant_image_blocks() {
3185        let message =
3186            meerkat_core::Message::BlockAssistant(meerkat_core::BlockAssistantMessage::new(
3187                vec![
3188                    meerkat_core::AssistantBlock::Text {
3189                        text: "Here is the image.".to_string(),
3190                        meta: None,
3191                    },
3192                    generated_image_block_for_test(),
3193                ],
3194                meerkat_core::StopReason::EndTurn,
3195            ));
3196
3197        let sanitized = sanitize_message_for_stateless_replay(message);
3198        let meerkat_core::Message::BlockAssistant(assistant) = sanitized else {
3199            panic!("expected block assistant");
3200        };
3201
3202        assert_eq!(assistant.blocks.len(), 2);
3203        assert!(matches!(
3204            assistant.blocks[0],
3205            meerkat_core::AssistantBlock::Text { .. }
3206        ));
3207        assert!(
3208            matches!(
3209                assistant.blocks[1],
3210                meerkat_core::AssistantBlock::Image { .. }
3211            ),
3212            "generated image blocks should reach Meerkat's provider projection"
3213        );
3214    }
3215
3216    /// Verify that persistent_with_hook wraps the session service with
3217    /// PreBuildMobSessionService (hook is Some).
3218    #[test]
3219    fn persistent_with_hook_wraps_session_service() {
3220        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3221        let store_path = dir.path().to_path_buf();
3222        let Ok(sqlite) = meerkat_store::SqliteSessionStore::open(store_path.join("sessions.db"))
3223        else {
3224            panic!("failed to open sqlite session store");
3225        };
3226        let session_store: Arc<dyn SessionStore> = Arc::new(sqlite);
3227        let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
3228            panic!("failed to parse minimal mob definition");
3229        };
3230
3231        let hook_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
3232        let hook_called_clone = hook_called.clone();
3233
3234        let spec = MobBootstrapSpec::persistent_with_hook(
3235            definition,
3236            meerkat_mob::MobStorage::in_memory(),
3237            store_path.clone(),
3238            4,
3239            session_store,
3240            move |_req: &mut CreateSessionRequest| {
3241                hook_called_clone.store(true, std::sync::atomic::Ordering::Relaxed);
3242                Box::pin(async { Ok(()) })
3243            },
3244        );
3245
3246        // The session service is wired with a SqliteRuntimeStore so that
3247        // both `load_persisted_session` (resume) and
3248        // `load_persisted_session_for_control` (archive/retire) succeed
3249        // across process restart. spec.runtime_adapter is also set
3250        // explicitly so the bootstrap path uses the same store. See
3251        // `persistent_bootstrap_uses_sqlite_runtime_store` for the full
3252        // regression coverage.
3253        assert!(
3254            spec.runtime_adapter.is_some(),
3255            "persistent_with_hook must provide a runtime adapter via spec.runtime_adapter"
3256        );
3257        assert!(
3258            spec.session_service.runtime_adapter().is_some(),
3259            "session service must own a runtime_store so archive/retire don't \
3260             hit the store-only-projection rejection in meerkat-session"
3261        );
3262        assert!(
3263            store_path.join("runtime.sqlite").exists(),
3264            "persistent_inner must open a SqliteRuntimeStore at <store_path>/runtime.sqlite"
3265        );
3266
3267        // The hook isn't called until create_session — verify the wrapper exists
3268        // by checking the service is not the raw PersistentSessionService (it
3269        // wraps it). We can't call create_session without a full LLM stack, but
3270        // we can verify the hook_called flag is false (not prematurely invoked).
3271        assert!(
3272            !hook_called.load(std::sync::atomic::Ordering::Relaxed),
3273            "hook must not be called before create_session"
3274        );
3275    }
3276
3277    /// Verify that ephemeral_with_hook accepts and stores a hook.
3278    #[test]
3279    fn ephemeral_with_hook_creates_spec() {
3280        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3281        let store_path = dir.path().to_path_buf();
3282        let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
3283            panic!("failed to parse minimal mob definition");
3284        };
3285
3286        let hook_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
3287        let hook_called_clone = hook_called.clone();
3288
3289        let spec = MobBootstrapSpec::ephemeral_with_hook(
3290            definition,
3291            meerkat_mob::MobStorage::in_memory(),
3292            store_path,
3293            4,
3294            None,
3295            move |_req: &mut CreateSessionRequest| {
3296                hook_called_clone.store(true, std::sync::atomic::Ordering::Relaxed);
3297                Box::pin(async { Ok(()) })
3298            },
3299        );
3300
3301        // Ephemeral specs don't have a runtime adapter.
3302        assert!(spec.runtime_adapter.is_none());
3303
3304        // Hook not yet called.
3305        assert!(
3306            !hook_called.load(std::sync::atomic::Ordering::Relaxed),
3307            "hook must not be called before create_session"
3308        );
3309    }
3310
3311    /// Verify that PreBuildMobSessionService applies the hook to the request
3312    /// in create_session. The hook mutates the model and adds labels; we
3313    /// verify by capturing the state inside the hook itself.
3314    #[tokio::test]
3315    async fn pre_build_hook_mutates_create_session_request() {
3316        use std::sync::Mutex;
3317
3318        let captured = Arc::new(Mutex::new(None::<(String, Option<String>)>));
3319        let captured_clone = captured.clone();
3320
3321        // Build a minimal ephemeral service as the inner.
3322        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3323        let factory = AgentFactory::new(dir.path()).builtins(true);
3324        let config = Config::default();
3325        let builder = FactoryAgentBuilder::new(factory, config);
3326        let inner: Arc<dyn MobSessionService> =
3327            Arc::new(meerkat_session::EphemeralSessionService::new(builder, 4));
3328
3329        // Hook that mutates and captures the post-mutation state.
3330        let hook: PreBuildHook = Arc::new(move |req: &mut CreateSessionRequest| {
3331            req.model = "hooked-model".to_string();
3332            req.system_prompt = Some("injected-prompt".to_string());
3333            let labels = req.labels.get_or_insert_with(Default::default);
3334            labels.insert("hook_label".to_string(), "hook_value".to_string());
3335            // Capture to prove the hook ran and mutated the request.
3336            let mut lock = captured_clone
3337                .lock()
3338                .unwrap_or_else(std::sync::PoisonError::into_inner);
3339            *lock = Some((req.model.clone(), req.system_prompt.clone()));
3340            Box::pin(async { Ok(()) })
3341        });
3342        let wrapped = PreBuildMobSessionService {
3343            inner,
3344            hook,
3345            after_create_hook: None,
3346            runtime_adapter_override: None,
3347        };
3348
3349        let req = CreateSessionRequest {
3350            model: "original-model".to_string(),
3351            prompt: meerkat_core::ContentInput::Text("test".to_string()),
3352            render_metadata: None,
3353            system_prompt: None,
3354            max_tokens: None,
3355            event_tx: None,
3356            skill_references: None,
3357            initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
3358            build: None,
3359            labels: None,
3360            deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
3361        };
3362
3363        // create_session will fail (no LLM) but the hook runs first.
3364        let _ = meerkat_core::service::SessionService::create_session(&wrapped, req).await;
3365
3366        let (model, prompt) = captured
3367            .lock()
3368            .unwrap_or_else(std::sync::PoisonError::into_inner)
3369            .clone()
3370            .expect("hook must have been called");
3371        assert_eq!(model, "hooked-model", "hook must mutate the model");
3372        assert_eq!(
3373            prompt.as_deref(),
3374            Some("injected-prompt"),
3375            "hook must set the system prompt"
3376        );
3377    }
3378
3379    #[derive(Default)]
3380    struct ForwardingProbe {
3381        calls: Mutex<Vec<&'static str>>,
3382    }
3383
3384    impl ForwardingProbe {
3385        fn record(&self, call: &'static str) {
3386            self.calls
3387                .lock()
3388                .unwrap_or_else(std::sync::PoisonError::into_inner)
3389                .push(call);
3390        }
3391
3392        fn calls(&self) -> Vec<&'static str> {
3393            self.calls
3394                .lock()
3395                .unwrap_or_else(std::sync::PoisonError::into_inner)
3396                .clone()
3397        }
3398    }
3399
3400    #[async_trait]
3401    impl meerkat_core::service::SessionService for ForwardingProbe {
3402        async fn create_session(
3403            &self,
3404            _req: CreateSessionRequest,
3405        ) -> Result<meerkat_core::types::RunResult, SessionError> {
3406            Err(SessionError::Unsupported("create_session".to_string()))
3407        }
3408
3409        async fn start_turn(
3410            &self,
3411            _id: &meerkat_core::types::SessionId,
3412            _req: meerkat_core::service::StartTurnRequest,
3413        ) -> Result<meerkat_core::types::RunResult, SessionError> {
3414            Err(SessionError::Unsupported("start_turn".to_string()))
3415        }
3416
3417        async fn interrupt(
3418            &self,
3419            _id: &meerkat_core::types::SessionId,
3420        ) -> Result<(), SessionError> {
3421            self.record("interrupt");
3422            Ok(())
3423        }
3424
3425        async fn read(
3426            &self,
3427            id: &meerkat_core::types::SessionId,
3428        ) -> Result<meerkat_core::service::SessionView, SessionError> {
3429            Err(SessionError::NotFound { id: id.clone() })
3430        }
3431
3432        async fn list(
3433            &self,
3434            _query: meerkat_core::service::SessionQuery,
3435        ) -> Result<Vec<meerkat_core::service::SessionSummary>, SessionError> {
3436            Ok(Vec::new())
3437        }
3438
3439        async fn archive(&self, _id: &meerkat_core::types::SessionId) -> Result<(), SessionError> {
3440            self.record("archive");
3441            Ok(())
3442        }
3443    }
3444
3445    #[async_trait]
3446    impl meerkat_core::service::SessionServiceCommsExt for ForwardingProbe {}
3447
3448    #[async_trait]
3449    impl meerkat_core::service::SessionServiceControlExt for ForwardingProbe {
3450        async fn append_system_context(
3451            &self,
3452            _id: &meerkat_core::types::SessionId,
3453            _req: meerkat_core::service::AppendSystemContextRequest,
3454        ) -> Result<
3455            meerkat_core::service::AppendSystemContextResult,
3456            meerkat_core::service::SessionControlError,
3457        > {
3458            self.record("append_system_context");
3459            Ok(meerkat_core::service::AppendSystemContextResult {
3460                status: meerkat_core::service::AppendSystemContextStatus::Applied,
3461            })
3462        }
3463
3464        async fn stage_tool_results(
3465            &self,
3466            _id: &meerkat_core::types::SessionId,
3467            _req: meerkat_core::service::StageToolResultsRequest,
3468        ) -> Result<meerkat_core::service::StageToolResultsResult, SessionError> {
3469            self.record("stage_tool_results");
3470            Ok(meerkat_core::service::StageToolResultsResult {
3471                accepted_result_count: 7,
3472            })
3473        }
3474    }
3475
3476    #[async_trait]
3477    impl meerkat_core::service::SessionServiceHistoryExt for ForwardingProbe {
3478        async fn read_history(
3479            &self,
3480            id: &meerkat_core::types::SessionId,
3481            _query: meerkat_core::service::SessionHistoryQuery,
3482        ) -> Result<meerkat_core::service::SessionHistoryPage, SessionError> {
3483            Err(SessionError::NotFound { id: id.clone() })
3484        }
3485    }
3486
3487    #[async_trait]
3488    impl MobSessionService for ForwardingProbe {
3489        fn supports_persistent_sessions(&self) -> bool {
3490            true
3491        }
3492
3493        fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::MeerkatMachine>> {
3494            Some(Arc::new(meerkat_runtime::MeerkatMachine::ephemeral()))
3495        }
3496
3497        async fn archive_with_mob_lifecycle_authority(
3498            &self,
3499            _session_id: &meerkat_core::types::SessionId,
3500        ) -> Result<(), SessionError> {
3501            self.record("archive_with_mob_lifecycle_authority");
3502            Ok(())
3503        }
3504    }
3505
3506    #[tokio::test]
3507    async fn pre_build_wrapper_forwards_mob_authority_and_control_extensions() {
3508        let probe = Arc::new(ForwardingProbe::default());
3509        let inner: Arc<dyn MobSessionService> = probe.clone();
3510        let wrapped = PreBuildMobSessionService {
3511            inner,
3512            hook: no_op_pre_build_hook(),
3513            after_create_hook: None,
3514            runtime_adapter_override: Some(Arc::new(meerkat_runtime::MeerkatMachine::ephemeral())),
3515        };
3516        let session_id = meerkat_core::types::SessionId::new();
3517
3518        MobSessionService::archive_with_mob_lifecycle_authority(&wrapped, &session_id)
3519            .await
3520            .expect("archive_with_mob_lifecycle_authority should forward to inner service");
3521        let staged = meerkat_core::service::SessionServiceControlExt::stage_tool_results(
3522            &wrapped,
3523            &session_id,
3524            meerkat_core::service::StageToolResultsRequest {
3525                results: Vec::new(),
3526            },
3527        )
3528        .await
3529        .expect("stage_tool_results should forward to inner service");
3530
3531        assert_eq!(staged.accepted_result_count, 7);
3532        assert_eq!(
3533            probe.calls(),
3534            vec!["archive_with_mob_lifecycle_authority", "stage_tool_results",]
3535        );
3536    }
3537
3538    /// Regression for two compounding bugs in the persistent wiring:
3539    ///
3540    /// 1. **0.6.0**: `persistent_inner` handed the
3541    ///    `PersistentSessionService` an `InMemoryRuntimeStore`. With the
3542    ///    runtime_store path active the `StoreCheckpointer` was disabled
3543    ///    (it's gated on `runtime_store.is_none()`), and the in-memory
3544    ///    store didn't survive process restart. Resume raised "missing
3545    ///    durable session snapshot for '<sid>'".
3546    ///
3547    /// 2. **0.6.1**: switching the session service to `runtime_store=None`
3548    ///    re-enabled the checkpointer (fixing #1) but broke archive/retire,
3549    ///    because `load_persisted_session_for_control` rejects mutations
3550    ///    when runtime_store is None and the session exists in the store
3551    ///    (the "store-only compatibility projection" error from
3552    ///    meerkat-session/src/persistent.rs:786).
3553    ///
3554    /// The 0.6.3 fix uses a **persistent** SqliteRuntimeStore — durable
3555    /// across restart AND control-op authoritative — at
3556    /// `<store_path>/runtime.sqlite`.
3557    #[test]
3558    fn persistent_bootstrap_uses_sqlite_runtime_store() {
3559        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3560        let store_path = dir.path().to_path_buf();
3561        let Ok(sqlite) = meerkat_store::SqliteSessionStore::open(store_path.join("sessions.db"))
3562        else {
3563            panic!("failed to open sqlite session store");
3564        };
3565        let session_store: Arc<dyn SessionStore> = Arc::new(sqlite);
3566        let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
3567            panic!("failed to parse minimal mob definition");
3568        };
3569        let spec = MobBootstrapSpec::persistent(
3570            definition,
3571            meerkat_mob::MobStorage::in_memory(),
3572            store_path.clone(),
3573            4,
3574            session_store,
3575        );
3576        assert!(
3577            spec.runtime_adapter.is_some(),
3578            "persistent bootstrap must provide its own runtime adapter via spec.runtime_adapter"
3579        );
3580        assert!(
3581            spec.session_service.runtime_adapter().is_some(),
3582            "session service must own a runtime_store so archive/retire don't \
3583             hit the store-only-projection rejection"
3584        );
3585        assert!(
3586            store_path.join("runtime.sqlite").exists(),
3587            "persistent_inner must open a SqliteRuntimeStore at <store_path>/runtime.sqlite"
3588        );
3589    }
3590
3591    /// Ephemeral counterpart: runtime-backed ephemeral builds must use a
3592    /// single in-memory machine authority for session service, comms, and
3593    /// image-generation tooling.
3594    #[test]
3595    fn ephemeral_runtime_backed_uses_session_service_runtime_adapter() {
3596        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3597        let store_path = dir.path().to_path_buf();
3598        let Ok(definition) = meerkat_mob::MobDefinition::from_toml("[mob]\nid = \"test\"\n") else {
3599            panic!("failed to parse minimal mob definition");
3600        };
3601        let spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
3602            definition,
3603            meerkat_mob::MobStorage::in_memory(),
3604            store_path,
3605            4,
3606            None,
3607            None,
3608            None,
3609            CapabilityFlags::default(),
3610            None,
3611        );
3612        assert!(
3613            spec.runtime_adapter.is_some(),
3614            "ephemeral_runtime_backed_inner must expose the shared runtime authority"
3615        );
3616        assert!(
3617            spec.session_service.runtime_adapter().is_some(),
3618            "session service must still expose a runtime adapter so autonomous-host comms can wire"
3619        );
3620    }
3621
3622    #[tokio::test]
3623    async fn ephemeral_runtime_backed_custom_session_store_persists_created_session() {
3624        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3625        let store_path = dir.path().to_path_buf();
3626        let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
3627            "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\n[profiles.worker.tools]\ncomms = true\n",
3628        ) else {
3629            panic!("failed to parse minimal mob definition");
3630        };
3631        let custom_store: Arc<dyn SessionStore> = Arc::new(meerkat_store::MemoryStore::new());
3632        let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
3633            definition,
3634            meerkat_mob::MobStorage::in_memory(),
3635            store_path,
3636            4,
3637            Some(custom_store.clone()),
3638            None,
3639            None,
3640            CapabilityFlags::default(),
3641            None,
3642        );
3643        spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
3644
3645        let runtime = MobRuntime::bootstrap(spec)
3646            .await
3647            .unwrap_or_else(|e| panic!("{e}"));
3648        let mid = meerkat_mob::ids::MeerkatId::from("worker:one");
3649        runtime
3650            .handle
3651            .spawn_spec(SpawnMemberSpec::new(
3652                ProfileName::from("worker"),
3653                mid.clone(),
3654            ))
3655            .await
3656            .unwrap_or_else(|e| panic!("{e}"));
3657        let session_id = runtime
3658            .handle
3659            .resolve_bridge_session_id(&mid)
3660            .await
3661            .unwrap_or_else(|| panic!("spawned worker has no bridge session id"));
3662
3663        let stored = custom_store
3664            .load(&session_id)
3665            .await
3666            .unwrap_or_else(|e| panic!("{e}"));
3667        assert!(
3668            stored.is_some(),
3669            "ephemeral runtime-backed builds with a custom store must persist through that store"
3670        );
3671    }
3672
3673    #[tokio::test]
3674    async fn ephemeral_runtime_backed_custom_session_store_resumes_after_runtime_restart() {
3675        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3676        let store_path = dir.path().to_path_buf();
3677        let definition_toml = "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\n[profiles.worker.tools]\ncomms = true\n";
3678        let Ok(definition) = meerkat_mob::MobDefinition::from_toml(definition_toml) else {
3679            panic!("failed to parse minimal mob definition");
3680        };
3681        let custom_store: Arc<dyn SessionStore> = Arc::new(meerkat_store::MemoryStore::new());
3682        let mut spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
3683            definition,
3684            meerkat_mob::MobStorage::in_memory(),
3685            store_path.clone(),
3686            4,
3687            Some(custom_store.clone()),
3688            None,
3689            None,
3690            CapabilityFlags::default(),
3691            None,
3692        );
3693        spec.options.default_llm_client = Some(Arc::new(meerkat_client::TestClient::default()));
3694
3695        let runtime = MobRuntime::bootstrap(spec)
3696            .await
3697            .unwrap_or_else(|e| panic!("{e}"));
3698        let mid = meerkat_mob::ids::MeerkatId::from("worker:one");
3699        runtime
3700            .handle
3701            .spawn_spec(SpawnMemberSpec::new(
3702                ProfileName::from("worker"),
3703                mid.clone(),
3704            ))
3705            .await
3706            .unwrap_or_else(|e| panic!("{e}"));
3707        let session_id = runtime
3708            .handle
3709            .resolve_bridge_session_id(&mid)
3710            .await
3711            .unwrap_or_else(|| panic!("spawned worker has no bridge session id"));
3712        drop(runtime);
3713
3714        let Ok(definition) = meerkat_mob::MobDefinition::from_toml(definition_toml) else {
3715            panic!("failed to parse minimal mob definition");
3716        };
3717        let mut restarted_spec = MobBootstrapSpec::ephemeral_runtime_backed_inner(
3718            definition,
3719            meerkat_mob::MobStorage::in_memory(),
3720            store_path,
3721            4,
3722            Some(custom_store),
3723            None,
3724            None,
3725            CapabilityFlags::default(),
3726            None,
3727        );
3728        restarted_spec.options.default_llm_client =
3729            Some(Arc::new(meerkat_client::TestClient::default()));
3730
3731        let restarted = MobRuntime::bootstrap(restarted_spec)
3732            .await
3733            .unwrap_or_else(|e| panic!("{e}"));
3734        let mut resume_spec = SpawnMemberSpec::new(ProfileName::from("worker"), mid.clone());
3735        resume_spec.launch_mode = meerkat_mob::MemberLaunchMode::Resume {
3736            bridge_session_id: session_id.clone(),
3737        };
3738        restarted
3739            .handle
3740            .spawn_spec(resume_spec)
3741            .await
3742            .unwrap_or_else(|e| panic!("resume should load the external session snapshot: {e}"));
3743
3744        let resumed_session_id = restarted
3745            .handle
3746            .resolve_bridge_session_id(&mid)
3747            .await
3748            .unwrap_or_else(|| panic!("resumed worker has no bridge session id"));
3749        assert_eq!(resumed_session_id, session_id);
3750    }
3751
3752    /// Regression: public ephemeral builds without image generation stay on the
3753    /// lighter direct session-service path.
3754    #[test]
3755    fn ephemeral_bootstrap_without_image_generation_stays_direct() {
3756        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3757        let store_path = dir.path().to_path_buf();
3758        let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
3759            "[mob]\nid = \"test\"\n\n[profiles.worker]\nmodel = \"gpt-5.5\"\nruntime_mode = \"autonomous_host\"\n[profiles.worker.tools]\ncomms = true\n",
3760        ) else {
3761            panic!("failed to parse definition");
3762        };
3763        let spec = MobBootstrapSpec::ephemeral_inner(
3764            definition,
3765            meerkat_mob::MobStorage::in_memory(),
3766            store_path,
3767            4,
3768            None,
3769            None,
3770            CapabilityFlags::default(),
3771            None,
3772        );
3773        assert!(
3774            spec.runtime_adapter.is_none(),
3775            "public ephemeral builds only need a runtime adapter when the definition may use image generation"
3776        );
3777    }
3778
3779    /// Regression: public ephemeral image-generation builds must expose the same
3780    /// runtime adapter through the spec and the session service. The generated
3781    /// image tool consults runtime session/image-operation state by session id,
3782    /// so a fresh, tool-only MeerkatMachine cannot be used here.
3783    #[test]
3784    fn ephemeral_bootstrap_with_image_generation_shares_runtime_adapter() {
3785        let dir = tempfile::tempdir().unwrap_or_else(|e| panic!("{e}"));
3786        let store_path = dir.path().to_path_buf();
3787        let Ok(definition) = meerkat_mob::MobDefinition::from_toml(
3788            r#"
3789[mob]
3790id = "test"
3791
3792[profiles.commander]
3793model = "gpt-5.5"
3794
3795[profiles.commander.tools]
3796builtins = true
3797image_generation = true
3798"#,
3799        ) else {
3800            panic!("failed to parse image-generation definition");
3801        };
3802        let spec = MobBootstrapSpec::ephemeral(
3803            definition,
3804            meerkat_mob::MobStorage::in_memory(),
3805            store_path,
3806            4,
3807            None,
3808        );
3809        let spec_adapter = spec
3810            .runtime_adapter
3811            .as_ref()
3812            .expect("image-generation ephemeral builds must expose a runtime adapter");
3813        let service_adapter = spec
3814            .session_service
3815            .runtime_adapter()
3816            .expect("session service must expose the same runtime adapter");
3817        assert!(
3818            spec_adapter.shares_runtime_persistence_with(&service_adapter),
3819            "image-generation tool state and session state must share one runtime authority"
3820        );
3821    }
3822
3823    /// Runtime-owned handling/routing semantics must be stripped before a
3824    /// runtime-applied turn reaches the direct session-service path.
3825    #[test]
3826    fn normalize_runtime_turn_request_strips_runtime_owned_semantics() {
3827        let req = meerkat_core::service::StartTurnRequest {
3828            prompt: meerkat_core::ContentInput::Text("checkpoint".to_string()),
3829            system_prompt: Some("system".to_string()),
3830            event_tx: None,
3831            runtime: meerkat_core::service::StartTurnRuntimeSemantics {
3832                render_metadata: Some(meerkat_core::types::RenderMetadata {
3833                    class: meerkat_core::types::RenderClass::OpsProgress,
3834                    salience: meerkat_core::types::RenderSalience::Urgent,
3835                }),
3836                handling_mode: meerkat_core::types::HandlingMode::Steer,
3837                skill_references: None,
3838                flow_tool_overlay: None,
3839                pre_turn_context_appends: Vec::new(),
3840                typed_turn_appends: Vec::new(),
3841                turn_metadata: None,
3842            },
3843        };
3844
3845        let expected_prompt = req.prompt.clone();
3846        let expected_system_prompt = req.system_prompt.clone();
3847
3848        let normalized = normalize_runtime_turn_request(req);
3849
3850        assert_eq!(
3851            normalized.runtime.handling_mode,
3852            meerkat_core::types::HandlingMode::Queue,
3853            "runtime-applied turns must downgrade Steer before reaching direct session services"
3854        );
3855        assert!(
3856            normalized.runtime.render_metadata.is_none(),
3857            "runtime-owned render metadata must not be forwarded through the direct agent path"
3858        );
3859        assert_eq!(normalized.prompt, expected_prompt);
3860        assert_eq!(normalized.system_prompt, expected_system_prompt);
3861    }
3862
3863    /// SessionCreatedContext must carry model, labels, and optional system_prompt.
3864    #[test]
3865    fn session_created_context_fields() {
3866        let ctx = SessionCreatedContext {
3867            model: "claude-sonnet-4-5".to_string(),
3868            labels: std::collections::BTreeMap::from([(
3869                "agent_type".to_string(),
3870                "lead".to_string(),
3871            )]),
3872            system_prompt: Some("You are a lead agent.".to_string()),
3873        };
3874        assert_eq!(ctx.model, "claude-sonnet-4-5");
3875        assert_eq!(ctx.labels["agent_type"], "lead");
3876        assert_eq!(ctx.system_prompt.as_deref(), Some("You are a lead agent."));
3877    }
3878
3879    /// SessionHook default implementations are no-ops — calling them must not panic.
3880    #[tokio::test]
3881    async fn session_hook_default_impls_are_noop() {
3882        struct EmptyHook;
3883        #[async_trait]
3884        impl SessionHook for EmptyHook {}
3885
3886        let hook = EmptyHook;
3887        let mut req = CreateSessionRequest {
3888            model: "test".to_string(),
3889            prompt: meerkat_core::ContentInput::Text("test".to_string()),
3890            render_metadata: None,
3891            system_prompt: None,
3892            max_tokens: None,
3893            event_tx: None,
3894            skill_references: None,
3895            initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
3896            build: None,
3897            labels: None,
3898            deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
3899        };
3900        // before_create must succeed with default impl.
3901        hook.before_create(&mut req).await.unwrap();
3902        // after_create must not panic.
3903        let ctx = SessionCreatedContext {
3904            model: "test".to_string(),
3905            labels: Default::default(),
3906            system_prompt: None,
3907        };
3908        hook.after_create(&meerkat_core::types::SessionId::new(), &ctx)
3909            .await;
3910    }
3911
3912    /// before_create returning Err must abort (the caller decides how).
3913    #[tokio::test]
3914    async fn session_hook_before_create_can_abort() {
3915        struct AbortHook;
3916        #[async_trait]
3917        impl SessionHook for AbortHook {
3918            async fn before_create(
3919                &self,
3920                _req: &mut CreateSessionRequest,
3921            ) -> Result<(), SessionError> {
3922                Err(SessionError::Unsupported("hook abort".into()))
3923            }
3924        }
3925
3926        let hook = AbortHook;
3927        let mut req = CreateSessionRequest {
3928            model: "test".to_string(),
3929            prompt: meerkat_core::ContentInput::Text("test".to_string()),
3930            render_metadata: None,
3931            system_prompt: None,
3932            max_tokens: None,
3933            event_tx: None,
3934            skill_references: None,
3935            initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
3936            build: None,
3937            labels: None,
3938            deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
3939        };
3940        let result = hook.before_create(&mut req).await;
3941        assert!(result.is_err());
3942    }
3943
3944    /// before_create mutations must be visible in the request.
3945    #[tokio::test]
3946    async fn session_hook_before_create_mutates_request() {
3947        struct MutatingHook;
3948        #[async_trait]
3949        impl SessionHook for MutatingHook {
3950            async fn before_create(
3951                &self,
3952                req: &mut CreateSessionRequest,
3953            ) -> Result<(), SessionError> {
3954                req.model = "hook-overridden".to_string();
3955                req.system_prompt = Some("injected by hook".to_string());
3956                Ok(())
3957            }
3958        }
3959
3960        let hook = MutatingHook;
3961        let mut req = CreateSessionRequest {
3962            model: "original".to_string(),
3963            prompt: meerkat_core::ContentInput::Text("test".to_string()),
3964            render_metadata: None,
3965            system_prompt: None,
3966            max_tokens: None,
3967            event_tx: None,
3968            skill_references: None,
3969            initial_turn: meerkat_core::service::InitialTurnPolicy::Defer,
3970            build: None,
3971            labels: None,
3972            deferred_prompt_policy: meerkat_core::service::DeferredPromptPolicy::default(),
3973        };
3974        hook.before_create(&mut req).await.unwrap();
3975        assert_eq!(req.model, "hook-overridden");
3976        assert_eq!(req.system_prompt.as_deref(), Some("injected by hook"));
3977    }
3978}