Skip to main content

meerkat_runtime/store/
mod.rs

1//! RuntimeStore โ€” atomic persistence for runtime state.
2//!
3//! ยง19: "CoreExecutor::apply MUST durably persist the RunBoundaryReceipt atomically
4//! with the boundary side effects before returning success."
5
6pub mod memory;
7#[cfg(feature = "sqlite-store")]
8pub mod sqlite;
9
10use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
11use meerkat_core::lifecycle::{InputId, RunBoundaryReceipt, RunId};
12use sha2::{Digest, Sha256};
13
14use crate::identifiers::LogicalRuntimeId;
15use crate::input_state::InputState;
16use crate::runtime_state::RuntimeState;
17
18/// Errors from RuntimeStore operations.
19#[derive(Debug, Clone, thiserror::Error)]
20#[non_exhaustive]
21pub enum RuntimeStoreError {
22    /// Write failed.
23    #[error("Store write failed: {0}")]
24    WriteFailed(String),
25    /// Read failed.
26    #[error("Store read failed: {0}")]
27    ReadFailed(String),
28    /// Not found.
29    #[error("Not found: {0}")]
30    NotFound(String),
31    /// Internal error.
32    #[error("Internal error: {0}")]
33    Internal(String),
34}
35
36/// Describes session-level writes to be committed atomically with receipts.
37#[derive(Debug, Clone)]
38pub struct SessionDelta {
39    /// Serialized session snapshot (opaque to RuntimeStore).
40    pub session_snapshot: Vec<u8>,
41}
42
43fn authoritative_receipt(
44    session_delta: Option<&SessionDelta>,
45    run_id: RunId,
46    boundary: RunApplyBoundary,
47    contributing_input_ids: Vec<InputId>,
48    sequence: u64,
49) -> Result<RunBoundaryReceipt, RuntimeStoreError> {
50    let (conversation_digest, message_count) = match session_delta {
51        Some(delta) => {
52            let session: meerkat_core::Session = serde_json::from_slice(&delta.session_snapshot)
53                .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
54            let encoded_messages = serde_json::to_vec(session.messages())
55                .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
56            (
57                Some(format!("{:x}", Sha256::digest(encoded_messages))),
58                session.messages().len(),
59            )
60        }
61        None => (None, 0),
62    };
63
64    Ok(RunBoundaryReceipt {
65        run_id,
66        boundary,
67        contributing_input_ids,
68        conversation_digest,
69        message_count,
70        sequence,
71    })
72}
73
74/// Atomic persistence interface for runtime state.
75///
76/// Implementations:
77/// - `InMemoryRuntimeStore` โ€” in-memory, no durability (ephemeral/testing)
78/// - `SqliteRuntimeStore` โ€” SQLite-backed durable runtime state
79#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
80#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
81pub trait RuntimeStore: Send + Sync {
82    /// Atomically persist session delta + authoritative receipt + input state updates.
83    ///
84    /// The receipt MUST be minted by the durable commit seam itself, not by the
85    /// caller, so returned success carries the exact proof that was stored.
86    async fn commit_session_boundary(
87        &self,
88        runtime_id: &LogicalRuntimeId,
89        session_delta: SessionDelta,
90        run_id: RunId,
91        boundary: RunApplyBoundary,
92        contributing_input_ids: Vec<InputId>,
93        input_updates: Vec<InputState>,
94    ) -> Result<RunBoundaryReceipt, RuntimeStoreError>;
95
96    /// Atomically persist session delta + receipt + input state updates.
97    ///
98    /// All three writes MUST commit in a single atomic operation.
99    /// If any write fails, none should be visible.
100    /// Atomically persist session delta + receipt + input state updates.
101    ///
102    /// All writes MUST commit in a single atomic operation.
103    /// If `session_store_key` is `Some`, also writes the session snapshot
104    /// to the sessions table (the same table `SessionStore` uses), providing
105    /// a unified boundary commit across both stores.
106    async fn atomic_apply(
107        &self,
108        runtime_id: &LogicalRuntimeId,
109        session_delta: Option<SessionDelta>,
110        receipt: RunBoundaryReceipt,
111        input_updates: Vec<InputState>,
112        session_store_key: Option<meerkat_core::types::SessionId>,
113    ) -> Result<(), RuntimeStoreError>;
114
115    /// Load all input states for a runtime.
116    async fn load_input_states(
117        &self,
118        runtime_id: &LogicalRuntimeId,
119    ) -> Result<Vec<InputState>, RuntimeStoreError>;
120
121    /// Load a specific boundary receipt.
122    async fn load_boundary_receipt(
123        &self,
124        runtime_id: &LogicalRuntimeId,
125        run_id: &RunId,
126        sequence: u64,
127    ) -> Result<Option<RunBoundaryReceipt>, RuntimeStoreError>;
128
129    /// Load the latest committed session snapshot for a runtime, if any.
130    async fn load_session_snapshot(
131        &self,
132        runtime_id: &LogicalRuntimeId,
133    ) -> Result<Option<Vec<u8>>, RuntimeStoreError>;
134
135    /// Persist a single input state (for durable-before-ack).
136    async fn persist_input_state(
137        &self,
138        runtime_id: &LogicalRuntimeId,
139        state: &InputState,
140    ) -> Result<(), RuntimeStoreError>;
141
142    /// Load a single input state.
143    async fn load_input_state(
144        &self,
145        runtime_id: &LogicalRuntimeId,
146        input_id: &InputId,
147    ) -> Result<Option<InputState>, RuntimeStoreError>;
148
149    /// Persist the runtime state itself (for durable retire/stop semantics).
150    async fn persist_runtime_state(
151        &self,
152        runtime_id: &LogicalRuntimeId,
153        state: RuntimeState,
154    ) -> Result<(), RuntimeStoreError>;
155
156    /// Load the last persisted runtime state, if any.
157    async fn load_runtime_state(
158        &self,
159        runtime_id: &LogicalRuntimeId,
160    ) -> Result<Option<RuntimeState>, RuntimeStoreError>;
161
162    /// Atomically commit lifecycle state changes (retire/reset/stop/destroy).
163    ///
164    /// Writes runtime state + all input state updates in a single atomic
165    /// operation. Used for lifecycle ops that don't produce boundary receipts.
166    async fn atomic_lifecycle_commit(
167        &self,
168        runtime_id: &LogicalRuntimeId,
169        runtime_state: RuntimeState,
170        input_states: &[InputState],
171    ) -> Result<(), RuntimeStoreError>;
172
173    /// Persist a snapshot of the ops lifecycle registry state.
174    async fn persist_ops_lifecycle(
175        &self,
176        runtime_id: &LogicalRuntimeId,
177        snapshot: &crate::ops_lifecycle::PersistedOpsSnapshot,
178    ) -> Result<(), RuntimeStoreError> {
179        let _ = (runtime_id, snapshot);
180        Ok(())
181    }
182
183    /// Load a previously persisted ops lifecycle snapshot.
184    async fn load_ops_lifecycle(
185        &self,
186        runtime_id: &LogicalRuntimeId,
187    ) -> Result<Option<crate::ops_lifecycle::PersistedOpsSnapshot>, RuntimeStoreError> {
188        let _ = runtime_id;
189        Ok(None)
190    }
191}
192
193pub use memory::InMemoryRuntimeStore;
194#[cfg(feature = "sqlite-store")]
195pub use sqlite::SqliteRuntimeStore;