Skip to main content

meerkat_runtime/store/
memory.rs

1//! InMemoryRuntimeStore — in-memory implementation for testing/ephemeral.
2//!
3//! Uses `tokio::sync::Mutex` per the in-memory concurrency rule.
4//! All mutations complete inside one lock acquisition (no lock held across .await).
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::sync::Mutex as StdMutex;
9
10use indexmap::IndexMap;
11use meerkat_core::lifecycle::{InputId, RunBoundaryReceipt, RunId};
12#[cfg(not(target_arch = "wasm32"))]
13use tokio::sync::Mutex;
14#[cfg(target_arch = "wasm32")]
15use tokio_with_wasm::alias::sync::Mutex;
16
17use super::{
18    AuthOAuthFlowSnapshotUpdate, MachineLifecycleCommit, MachineLifecycleSnapshot,
19    MachineLifecycleStoreRecord, RuntimeStore, RuntimeStoreError, SessionDelta,
20};
21use crate::identifiers::LogicalRuntimeId;
22use crate::input_state::{InputStatePersistenceRecord, StoredInputState};
23use crate::ops_lifecycle::PersistedOpsSnapshot;
24
25/// Receipt key: (runtime_id, run_id, sequence).
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27struct ReceiptKey {
28    runtime_id: String,
29    run_id: RunId,
30    sequence: u64,
31}
32
33/// Inner state protected by the mutex.
34#[derive(Debug, Default)]
35struct Inner {
36    /// runtime_id → (input_id → StoredInputState). IndexMap for deterministic iteration order.
37    input_states: HashMap<String, IndexMap<InputId, StoredInputState>>,
38    /// Receipt storage.
39    receipts: HashMap<ReceiptKey, RunBoundaryReceipt>,
40    /// Runtime session snapshots keyed by canonical runtime id.
41    sessions: HashMap<String, Vec<u8>>,
42    /// Persisted machine lifecycle snapshots.
43    runtime_lifecycle: HashMap<String, MachineLifecycleSnapshot>,
44    /// Persisted ops lifecycle snapshots.
45    ops_lifecycle_snapshots: HashMap<String, PersistedOpsSnapshot>,
46}
47
48/// In-memory runtime store. Thread-safe via `tokio::sync::Mutex`.
49#[derive(Debug, Clone)]
50pub struct InMemoryRuntimeStore {
51    inner: Arc<Mutex<Inner>>,
52    auth_oauth_flow_snapshot: Arc<StdMutex<Option<Vec<u8>>>>,
53}
54
55impl InMemoryRuntimeStore {
56    pub fn new() -> Self {
57        Self {
58            inner: Arc::new(Mutex::new(Inner::default())),
59            auth_oauth_flow_snapshot: Arc::new(StdMutex::new(None)),
60        }
61    }
62}
63
64impl Default for InMemoryRuntimeStore {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70fn is_runtime_placeholder_session(session: &meerkat_core::Session) -> bool {
71    session.transcript_history_state().ok().flatten().is_none()
72        && matches!(
73            session.messages(),
74            [] | [meerkat_core::types::Message::System(_)]
75        )
76}
77
78/// Deserialize a persisted session-snapshot blob through typed serde, matching
79/// the SQLite runtime store read path. `Session::deserialize` validates the
80/// mandatory envelope version against the generated persistence version
81/// authority, so a missing or non-current (v0/v1) row fails closed instead of
82/// silently defaulting or upgrading on read.
83fn deserialize_persisted_session(bytes: &[u8]) -> Result<meerkat_core::Session, RuntimeStoreError> {
84    serde_json::from_slice(bytes).map_err(|err| RuntimeStoreError::ReadFailed(err.to_string()))
85}
86
87#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
88#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
89impl RuntimeStore for InMemoryRuntimeStore {
90    fn persist_auth_oauth_flow_snapshot(
91        &self,
92        snapshot_json: &[u8],
93    ) -> Result<(), RuntimeStoreError> {
94        *self
95            .auth_oauth_flow_snapshot
96            .lock()
97            .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))? =
98            Some(snapshot_json.to_vec());
99        Ok(())
100    }
101
102    fn load_auth_oauth_flow_snapshot(&self) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
103        self.auth_oauth_flow_snapshot
104            .lock()
105            .map(|snapshot| snapshot.clone())
106            .map_err(|err| RuntimeStoreError::ReadFailed(err.to_string()))
107    }
108
109    fn update_auth_oauth_flow_snapshot(
110        &self,
111        update: &mut AuthOAuthFlowSnapshotUpdate<'_>,
112    ) -> Result<(), RuntimeStoreError> {
113        let mut snapshot = self
114            .auth_oauth_flow_snapshot
115            .lock()
116            .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
117        let next = update(snapshot.as_deref())?;
118        *snapshot = Some(next);
119        Ok(())
120    }
121
122    async fn commit_session_snapshot(
123        &self,
124        runtime_id: &LogicalRuntimeId,
125        session_delta: SessionDelta,
126    ) -> Result<(), RuntimeStoreError> {
127        let incoming: meerkat_core::Session =
128            serde_json::from_slice(&session_delta.session_snapshot)
129                .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
130        let mut inner = self.inner.lock().await;
131        let previous = inner
132            .sessions
133            .get(&runtime_id.0)
134            .map(|snapshot| deserialize_persisted_session(snapshot))
135            .transpose()?;
136        meerkat_core::session_store::run_boundary_snapshot_save_guard(&incoming, previous.as_ref())
137            .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
138        inner
139            .sessions
140            .insert(runtime_id.0.clone(), session_delta.session_snapshot);
141        Ok(())
142    }
143
144    async fn commit_session_transcript_rewrite_snapshot(
145        &self,
146        runtime_id: &LogicalRuntimeId,
147        session_delta: SessionDelta,
148        commit: &meerkat_core::TranscriptRewriteCommit,
149    ) -> Result<(), RuntimeStoreError> {
150        let incoming: meerkat_core::Session =
151            serde_json::from_slice(&session_delta.session_snapshot)
152                .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
153        let mut inner = self.inner.lock().await;
154        let previous = inner
155            .sessions
156            .get(&runtime_id.0)
157            .map(|snapshot| deserialize_persisted_session(snapshot))
158            .transpose()?;
159        meerkat_core::session_store::transcript_rewrite_save_guard(
160            &incoming,
161            previous.as_ref(),
162            commit,
163        )
164        .map_err(|err| match err {
165            meerkat_core::SessionStoreError::TranscriptRevisionConflict {
166                expected,
167                actual,
168                ..
169            } => RuntimeStoreError::TranscriptRevisionConflict { expected, actual },
170            other => RuntimeStoreError::WriteFailed(other.to_string()),
171        })?;
172        inner
173            .sessions
174            .insert(runtime_id.0.clone(), session_delta.session_snapshot);
175        Ok(())
176    }
177
178    async fn atomic_apply(
179        &self,
180        runtime_id: &LogicalRuntimeId,
181        session_delta: Option<SessionDelta>,
182        receipt: RunBoundaryReceipt,
183        input_updates: Vec<InputStatePersistenceRecord>,
184        session_store_key: Option<meerkat_core::types::SessionId>,
185    ) -> Result<(), RuntimeStoreError> {
186        let mut inner = self.inner.lock().await;
187
188        // All writes in one lock acquisition (atomic for in-memory)
189        let rid = runtime_id.0.clone();
190
191        // Session delta. The supersession verdict computed here keys the
192        // entire commit: if the incoming session snapshot is classified as
193        // superseded (the persisted head is already a valid append-extension
194        // of it), the snapshot write is skipped AND so are the receipt + input
195        // writes, so receipt/input ordering identity never advances past the
196        // retained session truth.
197        let mut session_snapshot_superseded = false;
198        if let Some(delta) = session_delta {
199            let incoming_session =
200                serde_json::from_slice::<meerkat_core::Session>(&delta.session_snapshot);
201            let mut persist_session_snapshot = true;
202            match (incoming_session, session_store_key) {
203                (Ok(incoming_session), session_store_key) => {
204                    if let Some(session_store_key) = session_store_key
205                        && incoming_session.id() != &session_store_key
206                    {
207                        return Err(RuntimeStoreError::SessionKeyMismatch {
208                            expected: session_store_key,
209                            actual: incoming_session.id().clone(),
210                        });
211                    }
212                    let previous_session = inner
213                        .sessions
214                        .get(&rid)
215                        .and_then(|snapshot| deserialize_persisted_session(snapshot).ok());
216                    if let Err(err) = meerkat_core::session_store::run_boundary_snapshot_save_guard(
217                        &incoming_session,
218                        previous_session.as_ref(),
219                    ) {
220                        if previous_session
221                            .as_ref()
222                            .is_some_and(is_runtime_placeholder_session)
223                        {
224                            persist_session_snapshot = true;
225                        } else if previous_session.as_ref().is_some_and(|previous_session| {
226                            meerkat_core::session_store::run_boundary_snapshot_save_guard(
227                                previous_session,
228                                Some(&incoming_session),
229                            )
230                            .is_ok()
231                        }) {
232                            persist_session_snapshot = false;
233                            session_snapshot_superseded = true;
234                        } else {
235                            return Err(RuntimeStoreError::WriteFailed(err.to_string()));
236                        }
237                    }
238                }
239                (Err(err), Some(session_store_key)) => {
240                    return Err(RuntimeStoreError::WriteFailed(format!(
241                        "session snapshot for {session_store_key} is not a Session: {err}"
242                    )));
243                }
244                (Err(err), None) => {
245                    return Err(RuntimeStoreError::WriteFailed(format!(
246                        "session snapshot is not a Session: {err}"
247                    )));
248                }
249            }
250            if persist_session_snapshot {
251                inner.sessions.insert(rid.clone(), delta.session_snapshot);
252            }
253        }
254
255        // When the session snapshot was superseded and skipped, the boundary
256        // receipt and input-state updates for that boundary must also be
257        // skipped: advancing them against a retained (older) session snapshot
258        // would split receipt/input ordering identity from session truth.
259        if session_snapshot_superseded {
260            return Ok(());
261        }
262
263        // Receipt
264        let key = ReceiptKey {
265            runtime_id: rid.clone(),
266            run_id: receipt.run_id.clone(),
267            sequence: receipt.sequence,
268        };
269        inner.receipts.insert(key, receipt);
270
271        // Input states
272        let states = inner.input_states.entry(rid).or_default();
273        for record in input_updates {
274            let bundle = record.into_stored();
275            states.insert(bundle.state.input_id.clone(), bundle);
276        }
277
278        Ok(())
279    }
280
281    async fn load_input_states(
282        &self,
283        runtime_id: &LogicalRuntimeId,
284    ) -> Result<Vec<StoredInputState>, RuntimeStoreError> {
285        let inner = self.inner.lock().await;
286        let states = inner
287            .input_states
288            .get(&runtime_id.0)
289            .map(|m| m.values().cloned().collect())
290            .unwrap_or_default();
291        Ok(states)
292    }
293
294    async fn load_boundary_receipt(
295        &self,
296        runtime_id: &LogicalRuntimeId,
297        run_id: &RunId,
298        sequence: u64,
299    ) -> Result<Option<RunBoundaryReceipt>, RuntimeStoreError> {
300        let inner = self.inner.lock().await;
301        let key = ReceiptKey {
302            runtime_id: runtime_id.0.clone(),
303            run_id: run_id.clone(),
304            sequence,
305        };
306        Ok(inner.receipts.get(&key).cloned())
307    }
308
309    async fn load_session_snapshot(
310        &self,
311        runtime_id: &LogicalRuntimeId,
312    ) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
313        let inner = self.inner.lock().await;
314        Ok(inner.sessions.get(&runtime_id.0).cloned())
315    }
316
317    async fn clear_session_snapshot(
318        &self,
319        runtime_id: &LogicalRuntimeId,
320    ) -> Result<(), RuntimeStoreError> {
321        let mut inner = self.inner.lock().await;
322        inner.sessions.remove(&runtime_id.0);
323        Ok(())
324    }
325
326    async fn replace_session_snapshot_if_current(
327        &self,
328        runtime_id: &LogicalRuntimeId,
329        expected_current: &[u8],
330        replacement: Vec<u8>,
331    ) -> Result<bool, RuntimeStoreError> {
332        let _: meerkat_core::Session = serde_json::from_slice(&replacement)
333            .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
334        let mut inner = self.inner.lock().await;
335        let Some(current) = inner.sessions.get_mut(&runtime_id.0) else {
336            return Ok(false);
337        };
338        if current.as_slice() != expected_current {
339            return Ok(false);
340        }
341        *current = replacement;
342        Ok(true)
343    }
344
345    async fn clear_session_snapshot_if_current(
346        &self,
347        runtime_id: &LogicalRuntimeId,
348        expected_current: &[u8],
349    ) -> Result<bool, RuntimeStoreError> {
350        let mut inner = self.inner.lock().await;
351        let Some(current) = inner.sessions.get(&runtime_id.0) else {
352            return Ok(false);
353        };
354        if current.as_slice() != expected_current {
355            return Ok(false);
356        }
357        inner.sessions.remove(&runtime_id.0);
358        Ok(true)
359    }
360
361    async fn persist_input_state(
362        &self,
363        runtime_id: &LogicalRuntimeId,
364        state: &InputStatePersistenceRecord,
365    ) -> Result<(), RuntimeStoreError> {
366        let mut inner = self.inner.lock().await;
367        let states = inner.input_states.entry(runtime_id.0.clone()).or_default();
368        let bundle = state.as_stored();
369        states.insert(bundle.state.input_id.clone(), bundle.clone());
370        Ok(())
371    }
372
373    async fn load_input_state(
374        &self,
375        runtime_id: &LogicalRuntimeId,
376        input_id: &InputId,
377    ) -> Result<Option<StoredInputState>, RuntimeStoreError> {
378        let inner = self.inner.lock().await;
379        let state = inner
380            .input_states
381            .get(&runtime_id.0)
382            .and_then(|m| m.get(input_id).cloned());
383        Ok(state)
384    }
385
386    async fn load_machine_lifecycle_record(
387        &self,
388        runtime_id: &LogicalRuntimeId,
389    ) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
390        let inner = self.inner.lock().await;
391        inner
392            .runtime_lifecycle
393            .get(&runtime_id.0)
394            .map(|snapshot| MachineLifecycleStoreRecord::from_snapshot(snapshot).encode())
395            .transpose()
396    }
397
398    async fn commit_machine_lifecycle(
399        &self,
400        runtime_id: &LogicalRuntimeId,
401        commit: MachineLifecycleCommit,
402        input_states: &[InputStatePersistenceRecord],
403    ) -> Result<(), RuntimeStoreError> {
404        let mut inner = self.inner.lock().await;
405        let rid = runtime_id.0.clone();
406
407        // Single lock acquisition — atomic for in-memory
408        inner
409            .runtime_lifecycle
410            .insert(rid.clone(), commit.into_snapshot());
411        let states = inner.input_states.entry(rid).or_default();
412        for record in input_states {
413            let bundle = record.as_stored();
414            states.insert(bundle.state.input_id.clone(), bundle.clone());
415        }
416
417        Ok(())
418    }
419
420    async fn persist_ops_lifecycle(
421        &self,
422        runtime_id: &LogicalRuntimeId,
423        snapshot: &PersistedOpsSnapshot,
424    ) -> Result<(), RuntimeStoreError> {
425        let mut inner = self.inner.lock().await;
426        inner
427            .ops_lifecycle_snapshots
428            .insert(runtime_id.0.clone(), snapshot.clone());
429        Ok(())
430    }
431
432    async fn load_ops_lifecycle(
433        &self,
434        runtime_id: &LogicalRuntimeId,
435    ) -> Result<Option<PersistedOpsSnapshot>, RuntimeStoreError> {
436        let inner = self.inner.lock().await;
437        Ok(inner.ops_lifecycle_snapshots.get(&runtime_id.0).cloned())
438    }
439
440    async fn delete_ops_lifecycle(
441        &self,
442        runtime_id: &LogicalRuntimeId,
443    ) -> Result<(), RuntimeStoreError> {
444        let mut inner = self.inner.lock().await;
445        inner.ops_lifecycle_snapshots.remove(&runtime_id.0);
446        Ok(())
447    }
448}
449
450#[cfg(test)]
451#[allow(clippy::unwrap_used)]
452mod tests {
453    use super::*;
454    use crate::store::MachineLifecycleBindingFacts;
455    use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
456
457    fn make_receipt(run_id: RunId, seq: u64) -> RunBoundaryReceipt {
458        RunBoundaryReceipt {
459            run_id,
460            boundary: RunApplyBoundary::RunStart,
461            contributing_input_ids: vec![],
462            conversation_digest: None,
463            message_count: 0,
464            sequence: seq,
465        }
466    }
467
468    fn persistable(bundle: StoredInputState) -> InputStatePersistenceRecord {
469        InputStatePersistenceRecord::from_machine_snapshot(bundle).unwrap()
470    }
471
472    fn session_with_user(content: &str) -> meerkat_core::Session {
473        let mut session = meerkat_core::Session::new();
474        session.push(meerkat_core::types::Message::User(
475            meerkat_core::types::UserMessage::text(content.to_string()),
476        ));
477        session
478    }
479
480    #[tokio::test]
481    async fn atomic_apply_roundtrip() {
482        let store = InMemoryRuntimeStore::new();
483        let rid = LogicalRuntimeId::new("test-runtime");
484        let run_id = RunId::new();
485        let input_id = InputId::new();
486
487        let bundle = StoredInputState::new_accepted(input_id.clone());
488        let receipt = make_receipt(run_id.clone(), 0);
489
490        let session = session_with_user("hello");
491        let session_snapshot = serde_json::to_vec(&session).unwrap();
492
493        store
494            .atomic_apply(
495                &rid,
496                Some(SessionDelta { session_snapshot }),
497                receipt.clone(),
498                vec![persistable(bundle)],
499                None,
500            )
501            .await
502            .unwrap();
503
504        // Load input states
505        let states = store.load_input_states(&rid).await.unwrap();
506        assert_eq!(states.len(), 1);
507        assert_eq!(states[0].state.input_id, input_id);
508
509        // Load receipt
510        let loaded = store.load_boundary_receipt(&rid, &run_id, 0).await.unwrap();
511        assert!(loaded.is_some());
512    }
513
514    #[tokio::test]
515    async fn atomic_apply_rejects_non_session_snapshot_without_owner_context() {
516        let store = InMemoryRuntimeStore::new();
517        let rid = LogicalRuntimeId::new("test-runtime");
518        let run_id = RunId::new();
519        let input_id = InputId::new();
520
521        let bundle = StoredInputState::new_accepted(input_id);
522        let receipt = make_receipt(run_id, 0);
523
524        // Owner-context absence is not a license to store arbitrary bytes as a
525        // session snapshot: a non-deserializable snapshot must fail closed.
526        let err = store
527            .atomic_apply(
528                &rid,
529                Some(SessionDelta {
530                    session_snapshot: b"session-data".to_vec(),
531                }),
532                receipt,
533                vec![persistable(bundle)],
534                None,
535            )
536            .await
537            .expect_err("non-Session snapshot must be rejected");
538
539        match err {
540            RuntimeStoreError::WriteFailed(message) => {
541                assert!(
542                    message.contains("not a Session"),
543                    "unexpected WriteFailed message: {message}"
544                );
545            }
546            other => panic!("expected WriteFailed, got {other:?}"),
547        }
548    }
549
550    #[tokio::test]
551    async fn persist_and_load_single_state() {
552        let store = InMemoryRuntimeStore::new();
553        let rid = LogicalRuntimeId::new("test");
554        let input_id = InputId::new();
555        let bundle = StoredInputState::new_accepted(input_id.clone());
556
557        store
558            .persist_input_state(&rid, &persistable(bundle))
559            .await
560            .unwrap();
561
562        let loaded = store.load_input_state(&rid, &input_id).await.unwrap();
563        assert!(loaded.is_some());
564        assert_eq!(loaded.unwrap().state.input_id, input_id);
565    }
566
567    #[tokio::test]
568    async fn load_nonexistent_returns_none() {
569        let store = InMemoryRuntimeStore::new();
570        let rid = LogicalRuntimeId::new("test");
571
572        let states = store.load_input_states(&rid).await.unwrap();
573        assert!(states.is_empty());
574
575        let state = store.load_input_state(&rid, &InputId::new()).await.unwrap();
576        assert!(state.is_none());
577
578        let receipt = store
579            .load_boundary_receipt(&rid, &RunId::new(), 0)
580            .await
581            .unwrap();
582        assert!(receipt.is_none());
583    }
584
585    #[tokio::test]
586    async fn atomic_apply_updates_existing() {
587        let store = InMemoryRuntimeStore::new();
588        let rid = LogicalRuntimeId::new("test");
589        let input_id = InputId::new();
590
591        // First write
592        let bundle1 = StoredInputState::new_accepted(input_id.clone());
593        store
594            .atomic_apply(
595                &rid,
596                None,
597                make_receipt(RunId::new(), 0),
598                vec![persistable(bundle1)],
599                None,
600            )
601            .await
602            .unwrap();
603
604        // Second write with updated seed phase
605        let mut bundle2 = StoredInputState::new_accepted(input_id.clone());
606        bundle2.seed.phase = crate::input_state::InputLifecycleState::Queued;
607        store
608            .atomic_apply(
609                &rid,
610                None,
611                make_receipt(RunId::new(), 1),
612                vec![persistable(bundle2)],
613                None,
614            )
615            .await
616            .unwrap();
617
618        let states = store.load_input_states(&rid).await.unwrap();
619        assert_eq!(states.len(), 1);
620        assert_eq!(
621            states[0].seed.phase,
622            crate::input_state::InputLifecycleState::Queued
623        );
624    }
625
626    #[tokio::test]
627    async fn atomic_apply_validates_session_store_key_without_aliasing_snapshot() {
628        let store = InMemoryRuntimeStore::new();
629        let rid = LogicalRuntimeId::new("runtime-key");
630        let session = meerkat_core::Session::new();
631        let session_id = session.id().clone();
632        let snapshot = serde_json::to_vec(&session).unwrap();
633
634        store
635            .atomic_apply(
636                &rid,
637                Some(SessionDelta {
638                    session_snapshot: snapshot.clone(),
639                }),
640                make_receipt(RunId::new(), 0),
641                vec![],
642                Some(session_id.clone()),
643            )
644            .await
645            .unwrap();
646
647        assert_eq!(
648            store.load_session_snapshot(&rid).await.unwrap(),
649            Some(snapshot)
650        );
651        assert!(
652            store
653                .load_session_snapshot(&LogicalRuntimeId::legacy_session_uuid_alias(&session_id))
654                .await
655                .unwrap()
656                .is_none(),
657            "session_store_key must validate the snapshot identity, not create a raw UUID runtime alias"
658        );
659    }
660
661    #[tokio::test]
662    async fn atomic_apply_rejects_mismatched_session_store_key() {
663        let store = InMemoryRuntimeStore::new();
664        let rid = LogicalRuntimeId::new("runtime-key");
665        let session = meerkat_core::Session::new();
666        let wrong_session_id = meerkat_core::Session::new().id().clone();
667        let snapshot = serde_json::to_vec(&session).unwrap();
668
669        let err = store
670            .atomic_apply(
671                &rid,
672                Some(SessionDelta {
673                    session_snapshot: snapshot,
674                }),
675                make_receipt(RunId::new(), 0),
676                vec![],
677                Some(wrong_session_id),
678            )
679            .await
680            .expect_err("mismatched session_store_key should fail");
681
682        assert!(matches!(err, RuntimeStoreError::SessionKeyMismatch { .. }));
683        assert!(store.load_session_snapshot(&rid).await.unwrap().is_none());
684    }
685
686    #[tokio::test]
687    async fn atomic_apply_persists_machine_owned_receipt() {
688        let store = InMemoryRuntimeStore::new();
689        let rid = LogicalRuntimeId::new("test");
690        let run_id = RunId::new();
691        let input_id = InputId::new();
692        let session = meerkat_core::Session::new();
693        let snapshot = serde_json::to_vec(&session).unwrap();
694        let receipt = RunBoundaryReceipt {
695            run_id: run_id.clone(),
696            boundary: RunApplyBoundary::Immediate,
697            contributing_input_ids: vec![input_id.clone()],
698            conversation_digest: Some("machine-owned-digest".to_string()),
699            message_count: 42,
700            sequence: 7,
701        };
702
703        store
704            .atomic_apply(
705                &rid,
706                Some(SessionDelta {
707                    session_snapshot: snapshot,
708                }),
709                receipt.clone(),
710                vec![persistable(StoredInputState::new_accepted(input_id))],
711                None,
712            )
713            .await
714            .unwrap();
715
716        assert_eq!(receipt.run_id, run_id);
717        assert!(receipt.conversation_digest.is_some());
718        let loaded = store
719            .load_boundary_receipt(&rid, &receipt.run_id, receipt.sequence)
720            .await
721            .unwrap();
722        assert!(loaded.is_some(), "receipt should be persisted");
723        let Some(loaded) = loaded else {
724            unreachable!("asserted above");
725        };
726        assert_eq!(loaded, receipt);
727    }
728
729    #[tokio::test]
730    async fn multiple_runtimes_isolated() {
731        let store = InMemoryRuntimeStore::new();
732        let rid1 = LogicalRuntimeId::new("runtime-1");
733        let rid2 = LogicalRuntimeId::new("runtime-2");
734
735        store
736            .persist_input_state(
737                &rid1,
738                &persistable(StoredInputState::new_accepted(InputId::new())),
739            )
740            .await
741            .unwrap();
742        store
743            .persist_input_state(
744                &rid2,
745                &persistable(StoredInputState::new_accepted(InputId::new())),
746            )
747            .await
748            .unwrap();
749        store
750            .persist_input_state(
751                &rid2,
752                &persistable(StoredInputState::new_accepted(InputId::new())),
753            )
754            .await
755            .unwrap();
756
757        let s1 = store.load_input_states(&rid1).await.unwrap();
758        let s2 = store.load_input_states(&rid2).await.unwrap();
759        assert_eq!(s1.len(), 1);
760        assert_eq!(s2.len(), 2);
761    }
762
763    #[tokio::test]
764    async fn load_session_snapshot_roundtrip() {
765        let store = InMemoryRuntimeStore::new();
766        let rid = LogicalRuntimeId::new("runtime");
767        let snapshot = serde_json::to_vec(&meerkat_core::Session::new()).unwrap();
768
769        store
770            .atomic_apply(
771                &rid,
772                Some(SessionDelta {
773                    session_snapshot: snapshot.clone(),
774                }),
775                make_receipt(RunId::new(), 0),
776                vec![],
777                None,
778            )
779            .await
780            .unwrap();
781
782        let loaded = store.load_session_snapshot(&rid).await.unwrap();
783        assert_eq!(loaded, Some(snapshot));
784    }
785
786    #[tokio::test]
787    async fn commit_session_snapshot_rejects_stale_runtime_parent() {
788        let store = InMemoryRuntimeStore::new();
789        let rid = LogicalRuntimeId::new("runtime-stale-parent");
790        let accepted = session_with_user("accepted runtime turn");
791        let mut stale = meerkat_core::Session::with_id(accepted.id().clone());
792        stale.push(meerkat_core::types::Message::User(
793            meerkat_core::types::UserMessage::text("stale runtime turn".to_string()),
794        ));
795        let accepted_snapshot = serde_json::to_vec(&accepted).unwrap();
796
797        store
798            .commit_session_snapshot(
799                &rid,
800                SessionDelta {
801                    session_snapshot: accepted_snapshot.clone(),
802                },
803            )
804            .await
805            .unwrap();
806
807        let err = store
808            .commit_session_snapshot(
809                &rid,
810                SessionDelta {
811                    session_snapshot: serde_json::to_vec(&stale).unwrap(),
812                },
813            )
814            .await
815            .expect_err("stale non-continuation must not overwrite runtime snapshot");
816
817        assert!(matches!(err, RuntimeStoreError::WriteFailed(_)));
818        assert_eq!(
819            store.load_session_snapshot(&rid).await.unwrap(),
820            Some(accepted_snapshot)
821        );
822    }
823
824    #[tokio::test]
825    async fn atomic_apply_keeps_current_snapshot_when_incoming_is_superseded() {
826        let store = InMemoryRuntimeStore::new();
827        let rid = LogicalRuntimeId::new("runtime-superseded-terminal");
828        let incoming = session_with_user("turn input");
829        let mut current = incoming.clone();
830        current.push(meerkat_core::types::Message::BlockAssistant(
831            meerkat_core::types::BlockAssistantMessage {
832                blocks: vec![meerkat_core::types::AssistantBlock::Text {
833                    text: "peer response already applied".to_string(),
834                    meta: None,
835                }],
836                stop_reason: meerkat_core::types::StopReason::EndTurn,
837                created_at: meerkat_core::types::message_timestamp_now(),
838            },
839        ));
840        let current_snapshot = serde_json::to_vec(&current).unwrap();
841        let receipt = make_receipt(RunId::new(), 11);
842
843        store
844            .commit_session_snapshot(
845                &rid,
846                SessionDelta {
847                    session_snapshot: current_snapshot.clone(),
848                },
849            )
850            .await
851            .unwrap();
852
853        store
854            .atomic_apply(
855                &rid,
856                Some(SessionDelta {
857                    session_snapshot: serde_json::to_vec(&incoming).unwrap(),
858                }),
859                receipt.clone(),
860                vec![],
861                Some(incoming.id().clone()),
862            )
863            .await
864            .unwrap();
865
866        assert_eq!(
867            store.load_session_snapshot(&rid).await.unwrap(),
868            Some(current_snapshot)
869        );
870        // The session snapshot was classified superseded and skipped, so the
871        // boundary receipt for that boundary must NOT advance against the
872        // retained (more-advanced) session snapshot.
873        assert_eq!(
874            store
875                .load_boundary_receipt(&rid, &receipt.run_id, receipt.sequence)
876                .await
877                .unwrap(),
878            None
879        );
880    }
881
882    #[tokio::test]
883    async fn atomic_apply_skips_inputs_when_session_snapshot_superseded() {
884        let store = InMemoryRuntimeStore::new();
885        let rid = LogicalRuntimeId::new("runtime-superseded-inputs");
886        let incoming = session_with_user("turn input");
887        let mut current = incoming.clone();
888        current.push(meerkat_core::types::Message::BlockAssistant(
889            meerkat_core::types::BlockAssistantMessage {
890                blocks: vec![meerkat_core::types::AssistantBlock::Text {
891                    text: "peer response already applied".to_string(),
892                    meta: None,
893                }],
894                stop_reason: meerkat_core::types::StopReason::EndTurn,
895                created_at: meerkat_core::types::message_timestamp_now(),
896            },
897        ));
898        let current_snapshot = serde_json::to_vec(&current).unwrap();
899        let receipt = make_receipt(RunId::new(), 21);
900        let input_id = InputId::new();
901        let bundle = StoredInputState::new_accepted(input_id.clone());
902
903        store
904            .commit_session_snapshot(
905                &rid,
906                SessionDelta {
907                    session_snapshot: current_snapshot.clone(),
908                },
909            )
910            .await
911            .unwrap();
912
913        store
914            .atomic_apply(
915                &rid,
916                Some(SessionDelta {
917                    session_snapshot: serde_json::to_vec(&incoming).unwrap(),
918                }),
919                receipt.clone(),
920                vec![persistable(bundle)],
921                Some(incoming.id().clone()),
922            )
923            .await
924            .unwrap();
925
926        // Snapshot retained, receipt + input-state writes skipped as a unit.
927        assert_eq!(
928            store.load_session_snapshot(&rid).await.unwrap(),
929            Some(current_snapshot)
930        );
931        assert_eq!(
932            store
933                .load_boundary_receipt(&rid, &receipt.run_id, receipt.sequence)
934                .await
935                .unwrap(),
936            None
937        );
938        assert!(store.load_input_states(&rid).await.unwrap().is_empty());
939    }
940
941    #[tokio::test]
942    async fn atomic_apply_allows_first_generated_snapshot_after_placeholder() {
943        let store = InMemoryRuntimeStore::new();
944        let rid = LogicalRuntimeId::new("runtime-placeholder");
945        let mut placeholder = meerkat_core::Session::new();
946        placeholder.set_system_prompt("base system".to_string());
947        let mut incoming = meerkat_core::Session::with_id(placeholder.id().clone());
948        incoming.set_system_prompt("base system".to_string());
949        incoming.push(meerkat_core::types::Message::User(
950            meerkat_core::types::UserMessage::text("verbose first turn".to_string()),
951        ));
952        let parent_revision = incoming.transcript_revision().unwrap();
953        incoming
954            .commit_transcript_rewrite(
955                meerkat_core::TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
956                vec![meerkat_core::types::Message::User(
957                    meerkat_core::types::UserMessage::compaction_summary(
958                        "[Context compacted] first turn",
959                    ),
960                )],
961                meerkat_core::TranscriptRewriteReason::new("compaction"),
962                Some("meerkat-core".to_string()),
963                Some(parent_revision),
964            )
965            .unwrap();
966        let incoming_snapshot = serde_json::to_vec(&incoming).unwrap();
967        let receipt = make_receipt(RunId::new(), 12);
968
969        store
970            .commit_session_snapshot(
971                &rid,
972                SessionDelta {
973                    session_snapshot: serde_json::to_vec(&placeholder).unwrap(),
974                },
975            )
976            .await
977            .unwrap();
978
979        store
980            .atomic_apply(
981                &rid,
982                Some(SessionDelta {
983                    session_snapshot: incoming_snapshot.clone(),
984                }),
985                receipt.clone(),
986                vec![],
987                Some(incoming.id().clone()),
988            )
989            .await
990            .unwrap();
991
992        assert_eq!(
993            store.load_session_snapshot(&rid).await.unwrap(),
994            Some(incoming_snapshot)
995        );
996        assert_eq!(
997            store
998                .load_boundary_receipt(&rid, &receipt.run_id, receipt.sequence)
999                .await
1000                .unwrap(),
1001            Some(receipt)
1002        );
1003    }
1004
1005    #[tokio::test]
1006    async fn atomic_apply_allows_generated_compaction_before_retained_tail() {
1007        let store = InMemoryRuntimeStore::new();
1008        let rid = LogicalRuntimeId::new("runtime-compaction-tail");
1009        let mut previous = meerkat_core::Session::new();
1010        previous.set_system_prompt("runtime system before context refresh".to_string());
1011        previous.push(meerkat_core::types::Message::User(
1012            meerkat_core::types::UserMessage::text("Turn 1 request".to_string()),
1013        ));
1014        previous.push(meerkat_core::types::Message::BlockAssistant(
1015            meerkat_core::types::BlockAssistantMessage {
1016                blocks: vec![meerkat_core::types::AssistantBlock::Text {
1017                    text: "Turn 1 answer".to_string(),
1018                    meta: None,
1019                }],
1020                stop_reason: meerkat_core::types::StopReason::EndTurn,
1021                created_at: meerkat_core::types::message_timestamp_now(),
1022            },
1023        ));
1024
1025        let mut incoming = meerkat_core::Session::with_id(previous.id().clone());
1026        incoming.set_system_prompt("runtime system after context refresh".to_string());
1027        incoming.push(meerkat_core::types::Message::User(
1028            meerkat_core::types::UserMessage::text(
1029                "Verbose context that will be compacted".to_string(),
1030            ),
1031        ));
1032        for message in previous.messages()[1..].iter().cloned() {
1033            incoming.push(message);
1034        }
1035        incoming.push(meerkat_core::types::Message::BlockAssistant(
1036            meerkat_core::types::BlockAssistantMessage {
1037                blocks: vec![meerkat_core::types::AssistantBlock::Text {
1038                    text: "Turn 2 generated answer".to_string(),
1039                    meta: None,
1040                }],
1041                stop_reason: meerkat_core::types::StopReason::EndTurn,
1042                created_at: meerkat_core::types::message_timestamp_now(),
1043            },
1044        ));
1045        let parent_revision = incoming.transcript_revision().unwrap();
1046        incoming
1047            .commit_transcript_rewrite(
1048                meerkat_core::TranscriptRewriteSelection::MessageRange { start: 1, end: 2 },
1049                vec![meerkat_core::types::Message::User(
1050                    meerkat_core::types::UserMessage::compaction_summary(
1051                        "[Context compacted] Earlier runtime context".to_string(),
1052                    ),
1053                )],
1054                meerkat_core::TranscriptRewriteReason::new("compaction"),
1055                Some("meerkat-core".to_string()),
1056                Some(parent_revision),
1057            )
1058            .unwrap();
1059        let incoming_snapshot = serde_json::to_vec(&incoming).unwrap();
1060        let receipt = make_receipt(RunId::new(), 13);
1061
1062        store
1063            .commit_session_snapshot(
1064                &rid,
1065                SessionDelta {
1066                    session_snapshot: serde_json::to_vec(&previous).unwrap(),
1067                },
1068            )
1069            .await
1070            .unwrap();
1071
1072        store
1073            .atomic_apply(
1074                &rid,
1075                Some(SessionDelta {
1076                    session_snapshot: incoming_snapshot.clone(),
1077                }),
1078                receipt.clone(),
1079                vec![],
1080                Some(incoming.id().clone()),
1081            )
1082            .await
1083            .unwrap();
1084
1085        assert_eq!(
1086            store.load_session_snapshot(&rid).await.unwrap(),
1087            Some(incoming_snapshot)
1088        );
1089        assert_eq!(
1090            store
1091                .load_boundary_receipt(&rid, &receipt.run_id, receipt.sequence)
1092                .await
1093                .unwrap(),
1094            Some(receipt)
1095        );
1096    }
1097
1098    #[tokio::test]
1099    async fn commit_machine_lifecycle_persists_binding_facts() {
1100        use crate::runtime_state::RuntimeState;
1101
1102        let store = InMemoryRuntimeStore::new();
1103        let rid = LogicalRuntimeId::new("runtime-binding");
1104        let binding = MachineLifecycleBindingFacts::new(
1105            Some("rt:session:abc".to_string()),
1106            Some(7),
1107            Some(3),
1108            Some("epoch-1".to_string()),
1109        );
1110
1111        store
1112            .commit_machine_lifecycle(
1113                &rid,
1114                MachineLifecycleCommit::new_with_binding(RuntimeState::Retired, binding.clone()),
1115                &[],
1116            )
1117            .await
1118            .unwrap();
1119
1120        let lifecycle = crate::store::load_machine_lifecycle(&store, &rid)
1121            .await
1122            .unwrap()
1123            .expect("machine lifecycle snapshot");
1124        assert_eq!(lifecycle.runtime_state(), RuntimeState::Retired);
1125        assert_eq!(lifecycle.binding(), &binding);
1126        assert_eq!(
1127            crate::store::load_runtime_state(&store, &rid)
1128                .await
1129                .unwrap(),
1130            Some(RuntimeState::Retired)
1131        );
1132    }
1133}