1use super::*;
2use meerkat_core::Session;
3use meerkat_core::lifecycle::core_executor::CoreApplyOutput;
4use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
5use meerkat_core::lifecycle::run_receipt::RunBoundaryReceipt;
6use meerkat_core::service::StartTurnRequest;
7use meerkat_core::service::{
8 SessionError, SessionServiceCommsExt, SessionServiceControlExt, SessionServiceHistoryExt,
9};
10use meerkat_core::{InputId, RunId};
11use sha2::{Digest, Sha256};
12use std::collections::HashMap;
13use std::sync::{Mutex, OnceLock, Weak};
14
15fn build_runtime_receipt(
16 run_id: RunId,
17 boundary: RunApplyBoundary,
18 contributing_input_ids: Vec<InputId>,
19 session: &Session,
20) -> Result<RunBoundaryReceipt, SessionError> {
21 let encoded_messages = serde_json::to_vec(session.messages()).map_err(|err| {
22 SessionError::Agent(meerkat_core::error::AgentError::InternalError(format!(
23 "failed to serialize session for runtime receipt digest: {err}"
24 )))
25 })?;
26 Ok(RunBoundaryReceipt {
27 run_id,
28 boundary,
29 contributing_input_ids,
30 conversation_digest: Some(format!("{:x}", Sha256::digest(encoded_messages))),
31 message_count: session.messages().len(),
32 sequence: 0,
33 })
34}
35
36#[cfg(not(target_arch = "wasm32"))]
37fn session_has_persisted_mob_binding(session: &Session, mob_id: &MobId) -> bool {
38 let Some(metadata) = session.session_metadata() else {
39 return false;
40 };
41
42 let Some(comms_name) = metadata.comms_name.as_deref() else {
43 return false;
44 };
45 let mut parts = comms_name.split('/');
46 let Some(name_mob_id) = parts.next().filter(|part| !part.is_empty()) else {
47 return false;
48 };
49 let Some(profile) = parts.next().filter(|part| !part.is_empty()) else {
50 return false;
51 };
52 let Some(meerkat_id) = parts.next().filter(|part| !part.is_empty()) else {
53 return false;
54 };
55 if parts.next().is_some() {
56 return false;
57 }
58 if name_mob_id != mob_id.as_str() {
59 return false;
60 }
61 if metadata.realm_id.as_deref() != Some(&format!("mob:{mob_id}")) {
62 return false;
63 }
64
65 let Some(peer_meta) = metadata.peer_meta.as_ref() else {
66 return false;
67 };
68 peer_meta.labels.get("mob_id").map(String::as_str) == Some(mob_id.as_str())
69 && peer_meta.labels.get("role").map(String::as_str) == Some(profile)
70 && peer_meta.labels.get("meerkat_id").map(String::as_str) == Some(meerkat_id)
71}
72
73fn ephemeral_runtime_adapter_cache()
74-> &'static Mutex<HashMap<usize, Weak<meerkat_runtime::RuntimeSessionAdapter>>> {
75 static CACHE: OnceLock<Mutex<HashMap<usize, Weak<meerkat_runtime::RuntimeSessionAdapter>>>> =
76 OnceLock::new();
77 CACHE.get_or_init(|| Mutex::new(HashMap::new()))
78}
79
80#[cfg(not(target_arch = "wasm32"))]
81fn persistent_runtime_adapter_cache()
82-> &'static Mutex<HashMap<usize, Weak<meerkat_runtime::RuntimeSessionAdapter>>> {
83 static CACHE: OnceLock<Mutex<HashMap<usize, Weak<meerkat_runtime::RuntimeSessionAdapter>>>> =
84 OnceLock::new();
85 CACHE.get_or_init(|| Mutex::new(HashMap::new()))
86}
87
88fn cached_runtime_adapter(
89 cache: &'static Mutex<HashMap<usize, Weak<meerkat_runtime::RuntimeSessionAdapter>>>,
90 key: usize,
91 init: impl FnOnce() -> Arc<meerkat_runtime::RuntimeSessionAdapter>,
92) -> Arc<meerkat_runtime::RuntimeSessionAdapter> {
93 let mut cache = cache
94 .lock()
95 .unwrap_or_else(std::sync::PoisonError::into_inner);
96 cache.retain(|_, adapter| adapter.strong_count() > 0);
97 if let Some(existing) = cache.get(&key).and_then(Weak::upgrade) {
98 return existing;
99 }
100 let adapter = init();
101 cache.insert(key, Arc::downgrade(&adapter));
102 adapter
103}
104
105#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
114#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
115pub trait MobSessionService:
116 SessionServiceCommsExt + SessionServiceControlExt + SessionServiceHistoryExt
117{
118 async fn subscribe_session_events(
120 &self,
121 session_id: &SessionId,
122 ) -> Result<EventStream, StreamError> {
123 <Self as SessionService>::subscribe_session_events(self, session_id).await
124 }
125
126 fn supports_persistent_sessions(&self) -> bool {
129 false
130 }
131
132 fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::RuntimeSessionAdapter>> {
133 None
134 }
135
136 async fn session_belongs_to_mob(&self, _session_id: &SessionId, _mob_id: &MobId) -> bool {
138 false
139 }
140
141 async fn load_persisted_session(
143 &self,
144 _session_id: &SessionId,
145 ) -> Result<Option<Session>, SessionError> {
146 Ok(None)
147 }
148
149 async fn apply_runtime_turn(
150 &self,
151 _session_id: &SessionId,
152 _run_id: RunId,
153 _req: StartTurnRequest,
154 _boundary: RunApplyBoundary,
155 _contributing_input_ids: Vec<InputId>,
156 ) -> Result<CoreApplyOutput, SessionError> {
157 Err(SessionError::Agent(
158 meerkat_core::error::AgentError::InternalError(
159 "runtime-backed apply is unavailable for this session service".into(),
160 ),
161 ))
162 }
163
164 async fn discard_live_session(&self, _session_id: &SessionId) -> Result<(), SessionError> {
165 Ok(())
166 }
167
168 async fn cancel_all_checkpointers(&self) {}
174
175 async fn rearm_all_checkpointers(&self) {}
179}
180
181#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
182#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
183impl<B> MobSessionService for meerkat_session::EphemeralSessionService<B>
184where
185 B: meerkat_session::SessionAgentBuilder + 'static,
186{
187 fn supports_persistent_sessions(&self) -> bool {
188 false
189 }
190
191 fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::RuntimeSessionAdapter>> {
192 let key = std::ptr::from_ref(self) as usize;
193 Some(cached_runtime_adapter(
194 ephemeral_runtime_adapter_cache(),
195 key,
196 || Arc::new(meerkat_runtime::RuntimeSessionAdapter::ephemeral()),
197 ))
198 }
199
200 async fn subscribe_session_events(
201 &self,
202 session_id: &SessionId,
203 ) -> Result<EventStream, StreamError> {
204 meerkat_session::EphemeralSessionService::<B>::subscribe_session_events(self, session_id)
205 .await
206 }
207
208 async fn session_belongs_to_mob(&self, _session_id: &SessionId, _mob_id: &MobId) -> bool {
209 false
210 }
211
212 async fn load_persisted_session(
213 &self,
214 _session_id: &SessionId,
215 ) -> Result<Option<Session>, SessionError> {
216 Ok(None)
217 }
218
219 async fn discard_live_session(&self, session_id: &SessionId) -> Result<(), SessionError> {
220 meerkat_session::EphemeralSessionService::<B>::discard_live_session(self, session_id).await
221 }
222
223 async fn apply_runtime_turn(
224 &self,
225 session_id: &SessionId,
226 run_id: RunId,
227 req: StartTurnRequest,
228 boundary: RunApplyBoundary,
229 contributing_input_ids: Vec<InputId>,
230 ) -> Result<CoreApplyOutput, SessionError> {
231 meerkat_session::EphemeralSessionService::<B>::start_turn(self, session_id, req).await?;
232 let session =
233 meerkat_session::EphemeralSessionService::<B>::export_session(self, session_id).await?;
234 let receipt = build_runtime_receipt(run_id, boundary, contributing_input_ids, &session)?;
235 let session_snapshot = serde_json::to_vec(&session).map_err(|err| {
236 SessionError::Agent(meerkat_core::error::AgentError::InternalError(format!(
237 "failed to serialize session snapshot for runtime commit: {err}"
238 )))
239 })?;
240 Ok(CoreApplyOutput::without_terminal(
241 receipt,
242 Some(session_snapshot),
243 ))
244 }
245}
246
247#[cfg(not(target_arch = "wasm32"))]
248#[async_trait::async_trait]
249impl<B> MobSessionService for meerkat_session::PersistentSessionService<B>
250where
251 B: meerkat_session::SessionAgentBuilder + 'static,
252{
253 fn supports_persistent_sessions(&self) -> bool {
254 true
255 }
256
257 fn runtime_adapter(&self) -> Option<Arc<meerkat_runtime::RuntimeSessionAdapter>> {
258 #[cfg(target_arch = "wasm32")]
259 {
260 None
261 }
262 #[cfg(not(target_arch = "wasm32"))]
263 {
264 let key = std::ptr::from_ref(self) as usize;
265 self.runtime_store().map(|store| {
266 cached_runtime_adapter(persistent_runtime_adapter_cache(), key, || {
267 Arc::new(meerkat_runtime::RuntimeSessionAdapter::persistent(
268 store,
269 self.blob_store(),
270 ))
271 })
272 })
273 }
274 }
275
276 async fn subscribe_session_events(
277 &self,
278 session_id: &SessionId,
279 ) -> Result<EventStream, StreamError> {
280 meerkat_session::PersistentSessionService::<B>::subscribe_session_events(self, session_id)
281 .await
282 }
283
284 async fn session_belongs_to_mob(&self, _session_id: &SessionId, _mob_id: &MobId) -> bool {
285 self.load_persisted_session(_session_id)
286 .await
287 .ok()
288 .flatten()
289 .is_some_and(|session| session_has_persisted_mob_binding(&session, _mob_id))
290 }
291
292 async fn load_persisted_session(
293 &self,
294 session_id: &SessionId,
295 ) -> Result<Option<Session>, SessionError> {
296 meerkat_session::PersistentSessionService::<B>::load_persisted(self, session_id).await
297 }
298
299 async fn discard_live_session(&self, session_id: &SessionId) -> Result<(), SessionError> {
300 meerkat_session::PersistentSessionService::<B>::discard_live_session(self, session_id).await
301 }
302
303 async fn apply_runtime_turn(
304 &self,
305 session_id: &SessionId,
306 run_id: RunId,
307 req: StartTurnRequest,
308 boundary: RunApplyBoundary,
309 contributing_input_ids: Vec<InputId>,
310 ) -> Result<CoreApplyOutput, SessionError> {
311 meerkat_session::PersistentSessionService::<B>::apply_runtime_turn(
312 self,
313 session_id,
314 run_id,
315 req,
316 boundary,
317 contributing_input_ids,
318 )
319 .await
320 }
321
322 async fn cancel_all_checkpointers(&self) {
323 meerkat_session::PersistentSessionService::<B>::cancel_all_checkpointers(self).await;
324 }
325
326 async fn rearm_all_checkpointers(&self) {
327 meerkat_session::PersistentSessionService::<B>::rearm_all_checkpointers(self).await;
328 }
329}