Skip to main content

meerkat_mob/runtime/
session_service.rs

1use super::*;
2use meerkat_core::Session;
3use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
4use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
5use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
6use meerkat_core::service::StartTurnRequest;
7use meerkat_core::service::{
8    SessionError, SessionServiceCommsExt, SessionServiceControlExt, SessionServiceHistoryExt,
9};
10use meerkat_core::{InputId, RunId};
11use sha2::{Digest, Sha256};
12use std::collections::HashMap;
13use std::sync::{Mutex, OnceLock, Weak};
14
15fn build_runtime_receipt(
16    run_id: RunId,
17    boundary: RunApplyBoundary,
18    contributing_input_ids: Vec<InputId>,
19    session: &Session,
20) -> Result<RunBoundaryReceipt, SessionError> {
21    let encoded_messages = serde_json::to_vec(session.messages()).map_err(|err| {
22        SessionError::Agent(meerkat_core::error::AgentError::InternalError(format!(
23            "failed to serialize session for runtime receipt digest: {err}"
24        )))
25    })?;
26    Ok(RunBoundaryReceipt {
27        run_id,
28        boundary,
29        contributing_input_ids,
30        conversation_digest: Some(format!("{:x}", Sha256::digest(encoded_messages))),
31        message_count: session.messages().len(),
32        sequence: 0,
33    })
34}
35
36#[cfg(not(target_arch = "wasm32"))]
37fn session_has_persisted_mob_binding(session: &Session, mob_id: &MobId) -> bool {
38    let Some(metadata) = session.session_metadata() else {
39        return false;
40    };
41
42    let Some(comms_name) = metadata.comms_name.as_deref() else {
43        return false;
44    };
45    let mut parts = comms_name.split('/');
46    let Some(name_mob_id) = parts.next().filter(|part| !part.is_empty()) else {
47        return false;
48    };
49    let Some(profile) = parts.next().filter(|part| !part.is_empty()) else {
50        return false;
51    };
52    let Some(meerkat_id) = parts.next().filter(|part| !part.is_empty()) else {
53        return false;
54    };
55    if parts.next().is_some() {
56        return false;
57    }
58    if name_mob_id != mob_id.as_str() {
59        return false;
60    }
61    if metadata.realm_id.as_deref() != Some(&format!("mob:{mob_id}")) {
62        return false;
63    }
64
65    let Some(peer_meta) = metadata.peer_meta.as_ref() else {
66        return false;
67    };
68    peer_meta.labels.get("mob_id").map(String::as_str) == Some(mob_id.as_str())
69        && peer_meta.labels.get("role").map(String::as_str) == Some(profile)
70        && peer_meta.labels.get("meerkat_id").map(String::as_str) == Some(meerkat_id)
71}
72
73fn ephemeral_runtime_adapter_cache()
74-> &'static Mutex<HashMap<usize, Weak<meerkat_runtime::RuntimeSessionAdapter>>> {
75    static CACHE: OnceLock<Mutex<HashMap<usize, Weak<meerkat_runtime::RuntimeSessionAdapter>>>> =
76        OnceLock::new();
77    CACHE.get_or_init(|| Mutex::new(HashMap::new()))
78}
79
80#[cfg(not(target_arch = "wasm32"))]
81fn persistent_runtime_adapter_cache()
82-> &'static Mutex<HashMap<usize, Weak<meerkat_runtime::RuntimeSessionAdapter>>> {
83    static CACHE: OnceLock<Mutex<HashMap<usize, Weak<meerkat_runtime::RuntimeSessionAdapter>>>> =
84        OnceLock::new();
85    CACHE.get_or_init(|| Mutex::new(HashMap::new()))
86}
87
88fn cached_runtime_adapter(
89    cache: &'static Mutex<HashMap<usize, Weak<meerkat_runtime::RuntimeSessionAdapter>>>,
90    key: usize,
91    init: impl FnOnce() -> Arc<meerkat_runtime::RuntimeSessionAdapter>,
92) -> Arc<meerkat_runtime::RuntimeSessionAdapter> {
93    let mut cache = cache
94        .lock()
95        .unwrap_or_else(std::sync::PoisonError::into_inner);
96    cache.retain(|_, adapter| adapter.strong_count() > 0);
97    if let Some(existing) = cache.get(&key).and_then(Weak::upgrade) {
98        return existing;
99    }
100    let adapter = init();
101    cache.insert(key, Arc::downgrade(&adapter));
102    adapter
103}
104
105// ---------------------------------------------------------------------------
106// MobSessionService trait extension
107// ---------------------------------------------------------------------------
108
109/// Extension trait for session services used by the mob runtime.
110///
111/// Builds on `SessionServiceCommsExt` from core so mob orchestration can use
112/// comms/injector access without per-crate bridge traits.
113#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
114#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
115pub trait MobSessionService:
116    SessionServiceCommsExt + SessionServiceControlExt + SessionServiceHistoryExt
117{
118    /// Subscribe to session-wide events regardless of triggering interaction.
119    async fn subscribe_session_events(
120        &self,
121        session_id: &SessionId,
122    ) -> Result<EventStream, StreamError> {
123        <Self as SessionService>::subscribe_session_events(self, session_id).await
124    }
125
126    /// Whether this service satisfies the persistent-session contract required
127    /// by REQ-MOB-030.
128    fn supports_persistent_sessions(&self) -> bool {
129        false
130    }
131
132    fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::RuntimeSessionAdapter>> {
133        None
134    }
135
136    /// Whether a listed session belongs to the given mob for reconciliation.
137    async fn session_belongs_to_mob(&self, _session_id: &SessionId, _mob_id: &MobId) -> bool {
138        false
139    }
140
141    /// Load the persisted session snapshot when available.
142    async fn load_persisted_session(
143        &self,
144        _session_id: &SessionId,
145    ) -> Result<Option<Session>, SessionError> {
146        Ok(None)
147    }
148
149    async fn apply_runtime_turn(
150        &self,
151        _session_id: &SessionId,
152        _run_id: RunId,
153        _req: StartTurnRequest,
154        _boundary: RunApplyBoundary,
155        _contributing_input_ids: Vec<InputId>,
156    ) -> Result<CoreApplyOutput, SessionError> {
157        Err(SessionError::Agent(
158            meerkat_core::error::AgentError::InternalError(
159                "runtime-backed apply is unavailable for this session service".into(),
160            ),
161        ))
162    }
163
164    async fn discard_live_session(&self, _session_id: &SessionId) -> Result<(), SessionError> {
165        Ok(())
166    }
167
168    /// Cancel all active checkpointer gates.
169    ///
170    /// After this call in-flight saves complete but subsequent checkpoint
171    /// calls on any session are no-ops. Call during `stop()` to prevent
172    /// checkpoint writes from racing with external cleanup.
173    async fn cancel_all_checkpointers(&self) {}
174
175    /// Re-enable checkpointer gates cancelled by [`cancel_all_checkpointers`].
176    ///
177    /// Call during `resume()` to restore periodic persistence.
178    async fn rearm_all_checkpointers(&self) {}
179}
180
181#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
182#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
183impl<B> MobSessionService for meerkat_session::EphemeralSessionService<B>
184where
185    B: meerkat_session::SessionAgentBuilder + 'static,
186{
187    fn supports_persistent_sessions(&self) -> bool {
188        false
189    }
190
191    fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::RuntimeSessionAdapter>> {
192        let key = std::ptr::from_ref(self) as usize;
193        Some(cached_runtime_adapter(
194            ephemeral_runtime_adapter_cache(),
195            key,
196            || Arc::new(meerkat_runtime::RuntimeSessionAdapter::ephemeral()),
197        ))
198    }
199
200    async fn subscribe_session_events(
201        &self,
202        session_id: &SessionId,
203    ) -> Result<EventStream, StreamError> {
204        meerkat_session::EphemeralSessionService::<B>::subscribe_session_events(self, session_id)
205            .await
206    }
207
208    async fn session_belongs_to_mob(&self, _session_id: &SessionId, _mob_id: &MobId) -> bool {
209        false
210    }
211
212    async fn load_persisted_session(
213        &self,
214        _session_id: &SessionId,
215    ) -> Result<Option<Session>, SessionError> {
216        Ok(None)
217    }
218
219    async fn discard_live_session(&self, session_id: &SessionId) -> Result<(), SessionError> {
220        meerkat_session::EphemeralSessionService::<B>::discard_live_session(self, session_id).await
221    }
222
223    async fn apply_runtime_turn(
224        &self,
225        session_id: &SessionId,
226        run_id: RunId,
227        req: StartTurnRequest,
228        boundary: RunApplyBoundary,
229        contributing_input_ids: Vec<InputId>,
230    ) -> Result<CoreApplyOutput, SessionError> {
231        meerkat_session::EphemeralSessionService::<B>::start_turn(self, session_id, req).await?;
232        let session =
233            meerkat_session::EphemeralSessionService::<B>::export_session(self, session_id).await?;
234        let receipt = build_runtime_receipt(run_id, boundary, contributing_input_ids, &session)?;
235        let session_snapshot = serde_json::to_vec(&session).map_err(|err| {
236            SessionError::Agent(meerkat_core::error::AgentError::InternalError(format!(
237                "failed to serialize session snapshot for runtime commit: {err}"
238            )))
239        })?;
240        Ok(CoreApplyOutput::without_terminal(
241            receipt,
242            Some(session_snapshot),
243        ))
244    }
245}
246
247#[cfg(not(target_arch = "wasm32"))]
248#[async_trait::async_trait]
249impl<B> MobSessionService for meerkat_session::PersistentSessionService<B>
250where
251    B: meerkat_session::SessionAgentBuilder + 'static,
252{
253    fn supports_persistent_sessions(&self) -> bool {
254        true
255    }
256
257    fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::RuntimeSessionAdapter>> {
258        #[cfg(target_arch = "wasm32")]
259        {
260            None
261        }
262        #[cfg(not(target_arch = "wasm32"))]
263        {
264            let key = std::ptr::from_ref(self) as usize;
265            self.runtime_store().map(|store| {
266                cached_runtime_adapter(persistent_runtime_adapter_cache(), key, || {
267                    Arc::new(meerkat_runtime::RuntimeSessionAdapter::persistent(
268                        store,
269                        self.blob_store(),
270                    ))
271                })
272            })
273        }
274    }
275
276    async fn subscribe_session_events(
277        &self,
278        session_id: &SessionId,
279    ) -> Result<EventStream, StreamError> {
280        meerkat_session::PersistentSessionService::<B>::subscribe_session_events(self, session_id)
281            .await
282    }
283
284    async fn session_belongs_to_mob(&self, _session_id: &SessionId, _mob_id: &MobId) -> bool {
285        self.load_persisted_session(_session_id)
286            .await
287            .ok()
288            .flatten()
289            .is_some_and(|session| session_has_persisted_mob_binding(&session, _mob_id))
290    }
291
292    async fn load_persisted_session(
293        &self,
294        session_id: &SessionId,
295    ) -> Result<Option<Session>, SessionError> {
296        meerkat_session::PersistentSessionService::<B>::load_persisted(self, session_id).await
297    }
298
299    async fn discard_live_session(&self, session_id: &SessionId) -> Result<(), SessionError> {
300        meerkat_session::PersistentSessionService::<B>::discard_live_session(self, session_id).await
301    }
302
303    async fn apply_runtime_turn(
304        &self,
305        session_id: &SessionId,
306        run_id: RunId,
307        req: StartTurnRequest,
308        boundary: RunApplyBoundary,
309        contributing_input_ids: Vec<InputId>,
310    ) -> Result<CoreApplyOutput, SessionError> {
311        meerkat_session::PersistentSessionService::<B>::apply_runtime_turn(
312            self,
313            session_id,
314            run_id,
315            req,
316            boundary,
317            contributing_input_ids,
318        )
319        .await
320    }
321
322    async fn cancel_all_checkpointers(&self) {
323        meerkat_session::PersistentSessionService::<B>::cancel_all_checkpointers(self).await;
324    }
325
326    async fn rearm_all_checkpointers(&self) {
327        meerkat_session::PersistentSessionService::<B>::rearm_all_checkpointers(self).await;
328    }
329}