1use std::collections::HashMap;
7use std::sync::Arc;
8use std::sync::Mutex as StdMutex;
9
10use indexmap::IndexMap;
11use meerkat_core::lifecycle::{InputId, RunBoundaryReceipt, RunId};
12#[cfg(not(target_arch = "wasm32"))]
13use tokio::sync::Mutex;
14#[cfg(target_arch = "wasm32")]
15use tokio_with_wasm::alias::sync::Mutex;
16
17use super::{
18 AuthOAuthFlowSnapshotUpdate, MachineLifecycleCommit, MachineLifecycleSnapshot,
19 MachineLifecycleStoreRecord, RuntimeStore, RuntimeStoreError, SessionDelta,
20};
21use crate::identifiers::LogicalRuntimeId;
22use crate::input_state::{InputStatePersistenceRecord, StoredInputState};
23use crate::ops_lifecycle::PersistedOpsSnapshot;
24
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27struct ReceiptKey {
28 runtime_id: String,
29 run_id: RunId,
30 sequence: u64,
31}
32
33#[derive(Debug, Default)]
35struct Inner {
36 input_states: HashMap<String, IndexMap<InputId, StoredInputState>>,
38 receipts: HashMap<ReceiptKey, RunBoundaryReceipt>,
40 sessions: HashMap<String, Vec<u8>>,
42 runtime_lifecycle: HashMap<String, MachineLifecycleSnapshot>,
44 ops_lifecycle_snapshots: HashMap<String, PersistedOpsSnapshot>,
46}
47
48#[derive(Debug, Clone)]
50pub struct InMemoryRuntimeStore {
51 inner: Arc<Mutex<Inner>>,
52 auth_oauth_flow_snapshot: Arc<StdMutex<Option<Vec<u8>>>>,
53}
54
55impl InMemoryRuntimeStore {
56 pub fn new() -> Self {
57 Self {
58 inner: Arc::new(Mutex::new(Inner::default())),
59 auth_oauth_flow_snapshot: Arc::new(StdMutex::new(None)),
60 }
61 }
62}
63
64impl Default for InMemoryRuntimeStore {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70fn is_runtime_placeholder_session(session: &meerkat_core::Session) -> bool {
71 session.transcript_history_state().ok().flatten().is_none()
72 && matches!(
73 session.messages(),
74 [] | [meerkat_core::types::Message::System(_)]
75 )
76}
77
78fn deserialize_persisted_session(bytes: &[u8]) -> Result<meerkat_core::Session, RuntimeStoreError> {
84 serde_json::from_slice(bytes).map_err(|err| RuntimeStoreError::ReadFailed(err.to_string()))
85}
86
87#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
88#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
89impl RuntimeStore for InMemoryRuntimeStore {
90 fn persist_auth_oauth_flow_snapshot(
91 &self,
92 snapshot_json: &[u8],
93 ) -> Result<(), RuntimeStoreError> {
94 *self
95 .auth_oauth_flow_snapshot
96 .lock()
97 .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))? =
98 Some(snapshot_json.to_vec());
99 Ok(())
100 }
101
102 fn load_auth_oauth_flow_snapshot(&self) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
103 self.auth_oauth_flow_snapshot
104 .lock()
105 .map(|snapshot| snapshot.clone())
106 .map_err(|err| RuntimeStoreError::ReadFailed(err.to_string()))
107 }
108
109 fn update_auth_oauth_flow_snapshot(
110 &self,
111 update: &mut AuthOAuthFlowSnapshotUpdate<'_>,
112 ) -> Result<(), RuntimeStoreError> {
113 let mut snapshot = self
114 .auth_oauth_flow_snapshot
115 .lock()
116 .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
117 let next = update(snapshot.as_deref())?;
118 *snapshot = Some(next);
119 Ok(())
120 }
121
122 async fn commit_session_snapshot(
123 &self,
124 runtime_id: &LogicalRuntimeId,
125 session_delta: SessionDelta,
126 ) -> Result<(), RuntimeStoreError> {
127 let incoming: meerkat_core::Session =
128 serde_json::from_slice(&session_delta.session_snapshot)
129 .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
130 let mut inner = self.inner.lock().await;
131 let previous = inner
132 .sessions
133 .get(&runtime_id.0)
134 .map(|snapshot| deserialize_persisted_session(snapshot))
135 .transpose()?;
136 meerkat_core::session_store::run_boundary_snapshot_save_guard(&incoming, previous.as_ref())
137 .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
138 inner
139 .sessions
140 .insert(runtime_id.0.clone(), session_delta.session_snapshot);
141 Ok(())
142 }
143
144 async fn commit_session_transcript_rewrite_snapshot(
145 &self,
146 runtime_id: &LogicalRuntimeId,
147 session_delta: SessionDelta,
148 commit: &meerkat_core::TranscriptRewriteCommit,
149 ) -> Result<(), RuntimeStoreError> {
150 let incoming: meerkat_core::Session =
151 serde_json::from_slice(&session_delta.session_snapshot)
152 .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
153 let mut inner = self.inner.lock().await;
154 let previous = inner
155 .sessions
156 .get(&runtime_id.0)
157 .map(|snapshot| deserialize_persisted_session(snapshot))
158 .transpose()?;
159 meerkat_core::session_store::transcript_rewrite_save_guard(
160 &incoming,
161 previous.as_ref(),
162 commit,
163 )
164 .map_err(|err| match err {
165 meerkat_core::SessionStoreError::TranscriptRevisionConflict {
166 expected,
167 actual,
168 ..
169 } => RuntimeStoreError::TranscriptRevisionConflict { expected, actual },
170 other => RuntimeStoreError::WriteFailed(other.to_string()),
171 })?;
172 inner
173 .sessions
174 .insert(runtime_id.0.clone(), session_delta.session_snapshot);
175 Ok(())
176 }
177
178 async fn atomic_apply(
179 &self,
180 runtime_id: &LogicalRuntimeId,
181 session_delta: Option<SessionDelta>,
182 receipt: RunBoundaryReceipt,
183 input_updates: Vec<InputStatePersistenceRecord>,
184 session_store_key: Option<meerkat_core::types::SessionId>,
185 ) -> Result<(), RuntimeStoreError> {
186 let mut inner = self.inner.lock().await;
187
188 let rid = runtime_id.0.clone();
190
191 let mut session_snapshot_superseded = false;
198 if let Some(delta) = session_delta {
199 let incoming_session =
200 serde_json::from_slice::<meerkat_core::Session>(&delta.session_snapshot);
201 let mut persist_session_snapshot = true;
202 match (incoming_session, session_store_key) {
203 (Ok(incoming_session), session_store_key) => {
204 if let Some(session_store_key) = session_store_key
205 && incoming_session.id() != &session_store_key
206 {
207 return Err(RuntimeStoreError::SessionKeyMismatch {
208 expected: session_store_key,
209 actual: incoming_session.id().clone(),
210 });
211 }
212 let previous_session = inner
213 .sessions
214 .get(&rid)
215 .and_then(|snapshot| deserialize_persisted_session(snapshot).ok());
216 if let Err(err) = meerkat_core::session_store::run_boundary_snapshot_save_guard(
217 &incoming_session,
218 previous_session.as_ref(),
219 ) {
220 if previous_session
221 .as_ref()
222 .is_some_and(is_runtime_placeholder_session)
223 {
224 persist_session_snapshot = true;
225 } else if previous_session.as_ref().is_some_and(|previous_session| {
226 meerkat_core::session_store::run_boundary_snapshot_save_guard(
227 previous_session,
228 Some(&incoming_session),
229 )
230 .is_ok()
231 }) {
232 persist_session_snapshot = false;
233 session_snapshot_superseded = true;
234 } else {
235 return Err(RuntimeStoreError::WriteFailed(err.to_string()));
236 }
237 }
238 }
239 (Err(err), Some(session_store_key)) => {
240 return Err(RuntimeStoreError::WriteFailed(format!(
241 "session snapshot for {session_store_key} is not a Session: {err}"
242 )));
243 }
244 (Err(err), None) => {
245 return Err(RuntimeStoreError::WriteFailed(format!(
246 "session snapshot is not a Session: {err}"
247 )));
248 }
249 }
250 if persist_session_snapshot {
251 inner.sessions.insert(rid.clone(), delta.session_snapshot);
252 }
253 }
254
255 if session_snapshot_superseded {
260 return Ok(());
261 }
262
263 let key = ReceiptKey {
265 runtime_id: rid.clone(),
266 run_id: receipt.run_id.clone(),
267 sequence: receipt.sequence,
268 };
269 inner.receipts.insert(key, receipt);
270
271 let states = inner.input_states.entry(rid).or_default();
273 for record in input_updates {
274 let bundle = record.into_stored();
275 states.insert(bundle.state.input_id.clone(), bundle);
276 }
277
278 Ok(())
279 }
280
281 async fn load_input_states(
282 &self,
283 runtime_id: &LogicalRuntimeId,
284 ) -> Result<Vec<StoredInputState>, RuntimeStoreError> {
285 let inner = self.inner.lock().await;
286 let states = inner
287 .input_states
288 .get(&runtime_id.0)
289 .map(|m| m.values().cloned().collect())
290 .unwrap_or_default();
291 Ok(states)
292 }
293
294 async fn load_boundary_receipt(
295 &self,
296 runtime_id: &LogicalRuntimeId,
297 run_id: &RunId,
298 sequence: u64,
299 ) -> Result<Option<RunBoundaryReceipt>, RuntimeStoreError> {
300 let inner = self.inner.lock().await;
301 let key = ReceiptKey {
302 runtime_id: runtime_id.0.clone(),
303 run_id: run_id.clone(),
304 sequence,
305 };
306 Ok(inner.receipts.get(&key).cloned())
307 }
308
309 async fn load_session_snapshot(
310 &self,
311 runtime_id: &LogicalRuntimeId,
312 ) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
313 let inner = self.inner.lock().await;
314 Ok(inner.sessions.get(&runtime_id.0).cloned())
315 }
316
317 async fn clear_session_snapshot(
318 &self,
319 runtime_id: &LogicalRuntimeId,
320 ) -> Result<(), RuntimeStoreError> {
321 let mut inner = self.inner.lock().await;
322 inner.sessions.remove(&runtime_id.0);
323 Ok(())
324 }
325
326 async fn replace_session_snapshot_if_current(
327 &self,
328 runtime_id: &LogicalRuntimeId,
329 expected_current: &[u8],
330 replacement: Vec<u8>,
331 ) -> Result<bool, RuntimeStoreError> {
332 let _: meerkat_core::Session = serde_json::from_slice(&replacement)
333 .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
334 let mut inner = self.inner.lock().await;
335 let Some(current) = inner.sessions.get_mut(&runtime_id.0) else {
336 return Ok(false);
337 };
338 if current.as_slice() != expected_current {
339 return Ok(false);
340 }
341 *current = replacement;
342 Ok(true)
343 }
344
345 async fn clear_session_snapshot_if_current(
346 &self,
347 runtime_id: &LogicalRuntimeId,
348 expected_current: &[u8],
349 ) -> Result<bool, RuntimeStoreError> {
350 let mut inner = self.inner.lock().await;
351 let Some(current) = inner.sessions.get(&runtime_id.0) else {
352 return Ok(false);
353 };
354 if current.as_slice() != expected_current {
355 return Ok(false);
356 }
357 inner.sessions.remove(&runtime_id.0);
358 Ok(true)
359 }
360
361 async fn persist_input_state(
362 &self,
363 runtime_id: &LogicalRuntimeId,
364 state: &InputStatePersistenceRecord,
365 ) -> Result<(), RuntimeStoreError> {
366 let mut inner = self.inner.lock().await;
367 let states = inner.input_states.entry(runtime_id.0.clone()).or_default();
368 let bundle = state.as_stored();
369 states.insert(bundle.state.input_id.clone(), bundle.clone());
370 Ok(())
371 }
372
373 async fn load_input_state(
374 &self,
375 runtime_id: &LogicalRuntimeId,
376 input_id: &InputId,
377 ) -> Result<Option<StoredInputState>, RuntimeStoreError> {
378 let inner = self.inner.lock().await;
379 let state = inner
380 .input_states
381 .get(&runtime_id.0)
382 .and_then(|m| m.get(input_id).cloned());
383 Ok(state)
384 }
385
386 async fn load_machine_lifecycle_record(
387 &self,
388 runtime_id: &LogicalRuntimeId,
389 ) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
390 let inner = self.inner.lock().await;
391 inner
392 .runtime_lifecycle
393 .get(&runtime_id.0)
394 .map(|snapshot| MachineLifecycleStoreRecord::from_snapshot(snapshot).encode())
395 .transpose()
396 }
397
398 async fn commit_machine_lifecycle(
399 &self,
400 runtime_id: &LogicalRuntimeId,
401 commit: MachineLifecycleCommit,
402 input_states: &[InputStatePersistenceRecord],
403 ) -> Result<(), RuntimeStoreError> {
404 let mut inner = self.inner.lock().await;
405 let rid = runtime_id.0.clone();
406
407 inner
409 .runtime_lifecycle
410 .insert(rid.clone(), commit.into_snapshot());
411 let states = inner.input_states.entry(rid).or_default();
412 for record in input_states {
413 let bundle = record.as_stored();
414 states.insert(bundle.state.input_id.clone(), bundle.clone());
415 }
416
417 Ok(())
418 }
419
420 async fn persist_ops_lifecycle(
421 &self,
422 runtime_id: &LogicalRuntimeId,
423 snapshot: &PersistedOpsSnapshot,
424 ) -> Result<(), RuntimeStoreError> {
425 let mut inner = self.inner.lock().await;
426 inner
427 .ops_lifecycle_snapshots
428 .insert(runtime_id.0.clone(), snapshot.clone());
429 Ok(())
430 }
431
432 async fn load_ops_lifecycle(
433 &self,
434 runtime_id: &LogicalRuntimeId,
435 ) -> Result<Option<PersistedOpsSnapshot>, RuntimeStoreError> {
436 let inner = self.inner.lock().await;
437 Ok(inner.ops_lifecycle_snapshots.get(&runtime_id.0).cloned())
438 }
439
440 async fn delete_ops_lifecycle(
441 &self,
442 runtime_id: &LogicalRuntimeId,
443 ) -> Result<(), RuntimeStoreError> {
444 let mut inner = self.inner.lock().await;
445 inner.ops_lifecycle_snapshots.remove(&runtime_id.0);
446 Ok(())
447 }
448}
449
450#[cfg(test)]
451#[allow(clippy::unwrap_used)]
452mod tests {
453 use super::*;
454 use crate::store::MachineLifecycleBindingFacts;
455 use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
456
457 fn make_receipt(run_id: RunId, seq: u64) -> RunBoundaryReceipt {
458 RunBoundaryReceipt {
459 run_id,
460 boundary: RunApplyBoundary::RunStart,
461 contributing_input_ids: vec![],
462 conversation_digest: None,
463 message_count: 0,
464 sequence: seq,
465 }
466 }
467
468 fn persistable(bundle: StoredInputState) -> InputStatePersistenceRecord {
469 InputStatePersistenceRecord::from_machine_snapshot(bundle).unwrap()
470 }
471
472 fn session_with_user(content: &str) -> meerkat_core::Session {
473 let mut session = meerkat_core::Session::new();
474 session.push(meerkat_core::types::Message::User(
475 meerkat_core::types::UserMessage::text(content.to_string()),
476 ));
477 session
478 }
479
480 #[tokio::test]
481 async fn atomic_apply_roundtrip() {
482 let store = InMemoryRuntimeStore::new();
483 let rid = LogicalRuntimeId::new("test-runtime");
484 let run_id = RunId::new();
485 let input_id = InputId::new();
486
487 let bundle = StoredInputState::new_accepted(input_id.clone());
488 let receipt = make_receipt(run_id.clone(), 0);
489
490 let session = session_with_user("hello");
491 let session_snapshot = serde_json::to_vec(&session).unwrap();
492
493 store
494 .atomic_apply(
495 &rid,
496 Some(SessionDelta { session_snapshot }),
497 receipt.clone(),
498 vec![persistable(bundle)],
499 None,
500 )
501 .await
502 .unwrap();
503
504 let states = store.load_input_states(&rid).await.unwrap();
506 assert_eq!(states.len(), 1);
507 assert_eq!(states[0].state.input_id, input_id);
508
509 let loaded = store.load_boundary_receipt(&rid, &run_id, 0).await.unwrap();
511 assert!(loaded.is_some());
512 }
513
514 #[tokio::test]
515 async fn atomic_apply_rejects_non_session_snapshot_without_owner_context() {
516 let store = InMemoryRuntimeStore::new();
517 let rid = LogicalRuntimeId::new("test-runtime");
518 let run_id = RunId::new();
519 let input_id = InputId::new();
520
521 let bundle = StoredInputState::new_accepted(input_id);
522 let receipt = make_receipt(run_id, 0);
523
524 let err = store
527 .atomic_apply(
528 &rid,
529 Some(SessionDelta {
530 session_snapshot: b"session-data".to_vec(),
531 }),
532 receipt,
533 vec![persistable(bundle)],
534 None,
535 )
536 .await
537 .expect_err("non-Session snapshot must be rejected");
538
539 match err {
540 RuntimeStoreError::WriteFailed(message) => {
541 assert!(
542 message.contains("not a Session"),
543 "unexpected WriteFailed message: {message}"
544 );
545 }
546 other => panic!("expected WriteFailed, got {other:?}"),
547 }
548 }
549
550 #[tokio::test]
551 async fn persist_and_load_single_state() {
552 let store = InMemoryRuntimeStore::new();
553 let rid = LogicalRuntimeId::new("test");
554 let input_id = InputId::new();
555 let bundle = StoredInputState::new_accepted(input_id.clone());
556
557 store
558 .persist_input_state(&rid, &persistable(bundle))
559 .await
560 .unwrap();
561
562 let loaded = store.load_input_state(&rid, &input_id).await.unwrap();
563 assert!(loaded.is_some());
564 assert_eq!(loaded.unwrap().state.input_id, input_id);
565 }
566
567 #[tokio::test]
568 async fn load_nonexistent_returns_none() {
569 let store = InMemoryRuntimeStore::new();
570 let rid = LogicalRuntimeId::new("test");
571
572 let states = store.load_input_states(&rid).await.unwrap();
573 assert!(states.is_empty());
574
575 let state = store.load_input_state(&rid, &InputId::new()).await.unwrap();
576 assert!(state.is_none());
577
578 let receipt = store
579 .load_boundary_receipt(&rid, &RunId::new(), 0)
580 .await
581 .unwrap();
582 assert!(receipt.is_none());
583 }
584
585 #[tokio::test]
586 async fn atomic_apply_updates_existing() {
587 let store = InMemoryRuntimeStore::new();
588 let rid = LogicalRuntimeId::new("test");
589 let input_id = InputId::new();
590
591 let bundle1 = StoredInputState::new_accepted(input_id.clone());
593 store
594 .atomic_apply(
595 &rid,
596 None,
597 make_receipt(RunId::new(), 0),
598 vec![persistable(bundle1)],
599 None,
600 )
601 .await
602 .unwrap();
603
604 let mut bundle2 = StoredInputState::new_accepted(input_id.clone());
606 bundle2.seed.phase = crate::input_state::InputLifecycleState::Queued;
607 store
608 .atomic_apply(
609 &rid,
610 None,
611 make_receipt(RunId::new(), 1),
612 vec![persistable(bundle2)],
613 None,
614 )
615 .await
616 .unwrap();
617
618 let states = store.load_input_states(&rid).await.unwrap();
619 assert_eq!(states.len(), 1);
620 assert_eq!(
621 states[0].seed.phase,
622 crate::input_state::InputLifecycleState::Queued
623 );
624 }
625
626 #[tokio::test]
627 async fn atomic_apply_validates_session_store_key_without_aliasing_snapshot() {
628 let store = InMemoryRuntimeStore::new();
629 let rid = LogicalRuntimeId::new("runtime-key");
630 let session = meerkat_core::Session::new();
631 let session_id = session.id().clone();
632 let snapshot = serde_json::to_vec(&session).unwrap();
633
634 store
635 .atomic_apply(
636 &rid,
637 Some(SessionDelta {
638 session_snapshot: snapshot.clone(),
639 }),
640 make_receipt(RunId::new(), 0),
641 vec![],
642 Some(session_id.clone()),
643 )
644 .await
645 .unwrap();
646
647 assert_eq!(
648 store.load_session_snapshot(&rid).await.unwrap(),
649 Some(snapshot)
650 );
651 assert!(
652 store
653 .load_session_snapshot(&LogicalRuntimeId::legacy_session_uuid_alias(&session_id))
654 .await
655 .unwrap()
656 .is_none(),
657 "session_store_key must validate the snapshot identity, not create a raw UUID runtime alias"
658 );
659 }
660
661 #[tokio::test]
662 async fn atomic_apply_rejects_mismatched_session_store_key() {
663 let store = InMemoryRuntimeStore::new();
664 let rid = LogicalRuntimeId::new("runtime-key");
665 let session = meerkat_core::Session::new();
666 let wrong_session_id = meerkat_core::Session::new().id().clone();
667 let snapshot = serde_json::to_vec(&session).unwrap();
668
669 let err = store
670 .atomic_apply(
671 &rid,
672 Some(SessionDelta {
673 session_snapshot: snapshot,
674 }),
675 make_receipt(RunId::new(), 0),
676 vec![],
677 Some(wrong_session_id),
678 )
679 .await
680 .expect_err("mismatched session_store_key should fail");
681
682 assert!(matches!(err, RuntimeStoreError::SessionKeyMismatch { .. }));
683 assert!(store.load_session_snapshot(&rid).await.unwrap().is_none());
684 }
685
686 #[tokio::test]
687 async fn atomic_apply_persists_machine_owned_receipt() {
688 let store = InMemoryRuntimeStore::new();
689 let rid = LogicalRuntimeId::new("test");
690 let run_id = RunId::new();
691 let input_id = InputId::new();
692 let session = meerkat_core::Session::new();
693 let snapshot = serde_json::to_vec(&session).unwrap();
694 let receipt = RunBoundaryReceipt {
695 run_id: run_id.clone(),
696 boundary: RunApplyBoundary::Immediate,
697 contributing_input_ids: vec![input_id.clone()],
698 conversation_digest: Some("machine-owned-digest".to_string()),
699 message_count: 42,
700 sequence: 7,
701 };
702
703 store
704 .atomic_apply(
705 &rid,
706 Some(SessionDelta {
707 session_snapshot: snapshot,
708 }),
709 receipt.clone(),
710 vec![persistable(StoredInputState::new_accepted(input_id))],
711 None,
712 )
713 .await
714 .unwrap();
715
716 assert_eq!(receipt.run_id, run_id);
717 assert!(receipt.conversation_digest.is_some());
718 let loaded = store
719 .load_boundary_receipt(&rid, &receipt.run_id, receipt.sequence)
720 .await
721 .unwrap();
722 assert!(loaded.is_some(), "receipt should be persisted");
723 let Some(loaded) = loaded else {
724 unreachable!("asserted above");
725 };
726 assert_eq!(loaded, receipt);
727 }
728
729 #[tokio::test]
730 async fn multiple_runtimes_isolated() {
731 let store = InMemoryRuntimeStore::new();
732 let rid1 = LogicalRuntimeId::new("runtime-1");
733 let rid2 = LogicalRuntimeId::new("runtime-2");
734
735 store
736 .persist_input_state(
737 &rid1,
738 &persistable(StoredInputState::new_accepted(InputId::new())),
739 )
740 .await
741 .unwrap();
742 store
743 .persist_input_state(
744 &rid2,
745 &persistable(StoredInputState::new_accepted(InputId::new())),
746 )
747 .await
748 .unwrap();
749 store
750 .persist_input_state(
751 &rid2,
752 &persistable(StoredInputState::new_accepted(InputId::new())),
753 )
754 .await
755 .unwrap();
756
757 let s1 = store.load_input_states(&rid1).await.unwrap();
758 let s2 = store.load_input_states(&rid2).await.unwrap();
759 assert_eq!(s1.len(), 1);
760 assert_eq!(s2.len(), 2);
761 }
762
763 #[tokio::test]
764 async fn load_session_snapshot_roundtrip() {
765 let store = InMemoryRuntimeStore::new();
766 let rid = LogicalRuntimeId::new("runtime");
767 let snapshot = serde_json::to_vec(&meerkat_core::Session::new()).unwrap();
768
769 store
770 .atomic_apply(
771 &rid,
772 Some(SessionDelta {
773 session_snapshot: snapshot.clone(),
774 }),
775 make_receipt(RunId::new(), 0),
776 vec![],
777 None,
778 )
779 .await
780 .unwrap();
781
782 let loaded = store.load_session_snapshot(&rid).await.unwrap();
783 assert_eq!(loaded, Some(snapshot));
784 }
785
786 #[tokio::test]
787 async fn commit_session_snapshot_rejects_stale_runtime_parent() {
788 let store = InMemoryRuntimeStore::new();
789 let rid = LogicalRuntimeId::new("runtime-stale-parent");
790 let accepted = session_with_user("accepted runtime turn");
791 let mut stale = meerkat_core::Session::with_id(accepted.id().clone());
792 stale.push(meerkat_core::types::Message::User(
793 meerkat_core::types::UserMessage::text("stale runtime turn".to_string()),
794 ));
795 let accepted_snapshot = serde_json::to_vec(&accepted).unwrap();
796
797 store
798 .commit_session_snapshot(
799 &rid,
800 SessionDelta {
801 session_snapshot: accepted_snapshot.clone(),
802 },
803 )
804 .await
805 .unwrap();
806
807 let err = store
808 .commit_session_snapshot(
809 &rid,
810 SessionDelta {
811 session_snapshot: serde_json::to_vec(&stale).unwrap(),
812 },
813 )
814 .await
815 .expect_err("stale non-continuation must not overwrite runtime snapshot");
816
817 assert!(matches!(err, RuntimeStoreError::WriteFailed(_)));
818 assert_eq!(
819 store.load_session_snapshot(&rid).await.unwrap(),
820 Some(accepted_snapshot)
821 );
822 }
823
824 #[tokio::test]
825 async fn atomic_apply_keeps_current_snapshot_when_incoming_is_superseded() {
826 let store = InMemoryRuntimeStore::new();
827 let rid = LogicalRuntimeId::new("runtime-superseded-terminal");
828 let incoming = session_with_user("turn input");
829 let mut current = incoming.clone();
830 current.push(meerkat_core::types::Message::BlockAssistant(
831 meerkat_core::types::BlockAssistantMessage {
832 blocks: vec![meerkat_core::types::AssistantBlock::Text {
833 text: "peer response already applied".to_string(),
834 meta: None,
835 }],
836 stop_reason: meerkat_core::types::StopReason::EndTurn,
837 created_at: meerkat_core::types::message_timestamp_now(),
838 },
839 ));
840 let current_snapshot = serde_json::to_vec(¤t).unwrap();
841 let receipt = make_receipt(RunId::new(), 11);
842
843 store
844 .commit_session_snapshot(
845 &rid,
846 SessionDelta {
847 session_snapshot: current_snapshot.clone(),
848 },
849 )
850 .await
851 .unwrap();
852
853 store
854 .atomic_apply(
855 &rid,
856 Some(SessionDelta {
857 session_snapshot: serde_json::to_vec(&incoming).unwrap(),
858 }),
859 receipt.clone(),
860 vec![],
861 Some(incoming.id().clone()),
862 )
863 .await
864 .unwrap();
865
866 assert_eq!(
867 store.load_session_snapshot(&rid).await.unwrap(),
868 Some(current_snapshot)
869 );
870 assert_eq!(
874 store
875 .load_boundary_receipt(&rid, &receipt.run_id, receipt.sequence)
876 .await
877 .unwrap(),
878 None
879 );
880 }
881
882 #[tokio::test]
883 async fn atomic_apply_skips_inputs_when_session_snapshot_superseded() {
884 let store = InMemoryRuntimeStore::new();
885 let rid = LogicalRuntimeId::new("runtime-superseded-inputs");
886 let incoming = session_with_user("turn input");
887 let mut current = incoming.clone();
888 current.push(meerkat_core::types::Message::BlockAssistant(
889 meerkat_core::types::BlockAssistantMessage {
890 blocks: vec![meerkat_core::types::AssistantBlock::Text {
891 text: "peer response already applied".to_string(),
892 meta: None,
893 }],
894 stop_reason: meerkat_core::types::StopReason::EndTurn,
895 created_at: meerkat_core::types::message_timestamp_now(),
896 },
897 ));
898 let current_snapshot = serde_json::to_vec(¤t).unwrap();
899 let receipt = make_receipt(RunId::new(), 21);
900 let input_id = InputId::new();
901 let bundle = StoredInputState::new_accepted(input_id.clone());
902
903 store
904 .commit_session_snapshot(
905 &rid,
906 SessionDelta {
907 session_snapshot: current_snapshot.clone(),
908 },
909 )
910 .await
911 .unwrap();
912
913 store
914 .atomic_apply(
915 &rid,
916 Some(SessionDelta {
917 session_snapshot: serde_json::to_vec(&incoming).unwrap(),
918 }),
919 receipt.clone(),
920 vec![persistable(bundle)],
921 Some(incoming.id().clone()),
922 )
923 .await
924 .unwrap();
925
926 assert_eq!(
928 store.load_session_snapshot(&rid).await.unwrap(),
929 Some(current_snapshot)
930 );
931 assert_eq!(
932 store
933 .load_boundary_receipt(&rid, &receipt.run_id, receipt.sequence)
934 .await
935 .unwrap(),
936 None
937 );
938 assert!(store.load_input_states(&rid).await.unwrap().is_empty());
939 }
940
941 #[tokio::test]
942 async fn atomic_apply_allows_first_generated_snapshot_after_placeholder() {
943 let store = InMemoryRuntimeStore::new();
944 let rid = LogicalRuntimeId::new("runtime-placeholder");
945 let mut placeholder = meerkat_core::Session::new();
946 placeholder.set_system_prompt("base system".to_string());
947 let mut incoming = meerkat_core::Session::with_id(placeholder.id().clone());
948 incoming.set_system_prompt("base system".to_string());
949 incoming.push(meerkat_core::types::Message::User(
950 meerkat_core::types::UserMessage::text("verbose first turn".to_string()),
951 ));
952 let parent_revision = incoming.transcript_revision().unwrap();
953 incoming
954 .commit_transcript_rewrite(
955 meerkat_core::TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
956 vec![meerkat_core::types::Message::User(
957 meerkat_core::types::UserMessage::compaction_summary(
958 "[Context compacted] first turn",
959 ),
960 )],
961 meerkat_core::TranscriptRewriteReason::new("compaction"),
962 Some("meerkat-core".to_string()),
963 Some(parent_revision),
964 )
965 .unwrap();
966 let incoming_snapshot = serde_json::to_vec(&incoming).unwrap();
967 let receipt = make_receipt(RunId::new(), 12);
968
969 store
970 .commit_session_snapshot(
971 &rid,
972 SessionDelta {
973 session_snapshot: serde_json::to_vec(&placeholder).unwrap(),
974 },
975 )
976 .await
977 .unwrap();
978
979 store
980 .atomic_apply(
981 &rid,
982 Some(SessionDelta {
983 session_snapshot: incoming_snapshot.clone(),
984 }),
985 receipt.clone(),
986 vec![],
987 Some(incoming.id().clone()),
988 )
989 .await
990 .unwrap();
991
992 assert_eq!(
993 store.load_session_snapshot(&rid).await.unwrap(),
994 Some(incoming_snapshot)
995 );
996 assert_eq!(
997 store
998 .load_boundary_receipt(&rid, &receipt.run_id, receipt.sequence)
999 .await
1000 .unwrap(),
1001 Some(receipt)
1002 );
1003 }
1004
1005 #[tokio::test]
1006 async fn atomic_apply_allows_generated_compaction_before_retained_tail() {
1007 let store = InMemoryRuntimeStore::new();
1008 let rid = LogicalRuntimeId::new("runtime-compaction-tail");
1009 let mut previous = meerkat_core::Session::new();
1010 previous.set_system_prompt("runtime system before context refresh".to_string());
1011 previous.push(meerkat_core::types::Message::User(
1012 meerkat_core::types::UserMessage::text("Turn 1 request".to_string()),
1013 ));
1014 previous.push(meerkat_core::types::Message::BlockAssistant(
1015 meerkat_core::types::BlockAssistantMessage {
1016 blocks: vec![meerkat_core::types::AssistantBlock::Text {
1017 text: "Turn 1 answer".to_string(),
1018 meta: None,
1019 }],
1020 stop_reason: meerkat_core::types::StopReason::EndTurn,
1021 created_at: meerkat_core::types::message_timestamp_now(),
1022 },
1023 ));
1024
1025 let mut incoming = meerkat_core::Session::with_id(previous.id().clone());
1026 incoming.set_system_prompt("runtime system after context refresh".to_string());
1027 incoming.push(meerkat_core::types::Message::User(
1028 meerkat_core::types::UserMessage::text(
1029 "Verbose context that will be compacted".to_string(),
1030 ),
1031 ));
1032 for message in previous.messages()[1..].iter().cloned() {
1033 incoming.push(message);
1034 }
1035 incoming.push(meerkat_core::types::Message::BlockAssistant(
1036 meerkat_core::types::BlockAssistantMessage {
1037 blocks: vec![meerkat_core::types::AssistantBlock::Text {
1038 text: "Turn 2 generated answer".to_string(),
1039 meta: None,
1040 }],
1041 stop_reason: meerkat_core::types::StopReason::EndTurn,
1042 created_at: meerkat_core::types::message_timestamp_now(),
1043 },
1044 ));
1045 let parent_revision = incoming.transcript_revision().unwrap();
1046 incoming
1047 .commit_transcript_rewrite(
1048 meerkat_core::TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
1049 vec![meerkat_core::types::Message::User(
1050 meerkat_core::types::UserMessage::compaction_summary(
1051 "[Context compacted] Earlier runtime context".to_string(),
1052 ),
1053 )],
1054 meerkat_core::TranscriptRewriteReason::new("compaction"),
1055 Some("meerkat-core".to_string()),
1056 Some(parent_revision),
1057 )
1058 .unwrap();
1059 let incoming_snapshot = serde_json::to_vec(&incoming).unwrap();
1060 let receipt = make_receipt(RunId::new(), 13);
1061
1062 store
1063 .commit_session_snapshot(
1064 &rid,
1065 SessionDelta {
1066 session_snapshot: serde_json::to_vec(&previous).unwrap(),
1067 },
1068 )
1069 .await
1070 .unwrap();
1071
1072 store
1073 .atomic_apply(
1074 &rid,
1075 Some(SessionDelta {
1076 session_snapshot: incoming_snapshot.clone(),
1077 }),
1078 receipt.clone(),
1079 vec![],
1080 Some(incoming.id().clone()),
1081 )
1082 .await
1083 .unwrap();
1084
1085 assert_eq!(
1086 store.load_session_snapshot(&rid).await.unwrap(),
1087 Some(incoming_snapshot)
1088 );
1089 assert_eq!(
1090 store
1091 .load_boundary_receipt(&rid, &receipt.run_id, receipt.sequence)
1092 .await
1093 .unwrap(),
1094 Some(receipt)
1095 );
1096 }
1097
1098 #[tokio::test]
1099 async fn commit_machine_lifecycle_persists_binding_facts() {
1100 use crate::runtime_state::RuntimeState;
1101
1102 let store = InMemoryRuntimeStore::new();
1103 let rid = LogicalRuntimeId::new("runtime-binding");
1104 let binding = MachineLifecycleBindingFacts::new(
1105 Some("rt:session:abc".to_string()),
1106 Some(7),
1107 Some(3),
1108 Some("epoch-1".to_string()),
1109 );
1110
1111 store
1112 .commit_machine_lifecycle(
1113 &rid,
1114 MachineLifecycleCommit::new_with_binding(RuntimeState::Retired, binding.clone()),
1115 &[],
1116 )
1117 .await
1118 .unwrap();
1119
1120 let lifecycle = crate::store::load_machine_lifecycle(&store, &rid)
1121 .await
1122 .unwrap()
1123 .expect("machine lifecycle snapshot");
1124 assert_eq!(lifecycle.runtime_state(), RuntimeState::Retired);
1125 assert_eq!(lifecycle.binding(), &binding);
1126 assert_eq!(
1127 crate::store::load_runtime_state(&store, &rid)
1128 .await
1129 .unwrap(),
1130 Some(RuntimeState::Retired)
1131 );
1132 }
1133}