Skip to main content

meerkat_runtime/store/
mod.rs

1//! RuntimeStore โ€” atomic persistence for runtime state.
2//!
3//! ยง19: "CoreExecutor::apply MUST durably persist the RunBoundaryReceipt atomically
4//! with the boundary side effects before returning success."
5
6pub mod memory;
7#[cfg(feature = "redb-store")]
8pub mod redb;
9#[cfg(feature = "sqlite-store")]
10pub mod sqlite;
11
12use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
13use meerkat_core::lifecycle::{InputId, RunBoundaryReceipt, RunId};
14use sha2::{Digest, Sha256};
15
16use crate::identifiers::LogicalRuntimeId;
17use crate::input_state::InputState;
18use crate::runtime_state::RuntimeState;
19
20/// Errors from RuntimeStore operations.
21#[derive(Debug, Clone, thiserror::Error)]
22#[non_exhaustive]
23pub enum RuntimeStoreError {
24    /// Write failed.
25    #[error("Store write failed: {0}")]
26    WriteFailed(String),
27    /// Read failed.
28    #[error("Store read failed: {0}")]
29    ReadFailed(String),
30    /// Not found.
31    #[error("Not found: {0}")]
32    NotFound(String),
33    /// Internal error.
34    #[error("Internal error: {0}")]
35    Internal(String),
36}
37
38/// Describes session-level writes to be committed atomically with receipts.
39#[derive(Debug, Clone)]
40pub struct SessionDelta {
41    /// Serialized session snapshot (opaque to RuntimeStore).
42    pub session_snapshot: Vec<u8>,
43}
44
45fn authoritative_receipt(
46    session_delta: Option<&SessionDelta>,
47    run_id: RunId,
48    boundary: RunApplyBoundary,
49    contributing_input_ids: Vec<InputId>,
50    sequence: u64,
51) -> Result<RunBoundaryReceipt, RuntimeStoreError> {
52    let (conversation_digest, message_count) = match session_delta {
53        Some(delta) => {
54            let session: meerkat_core::Session = serde_json::from_slice(&delta.session_snapshot)
55                .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
56            let encoded_messages = serde_json::to_vec(session.messages())
57                .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
58            (
59                Some(format!("{:x}", Sha256::digest(encoded_messages))),
60                session.messages().len(),
61            )
62        }
63        None => (None, 0),
64    };
65
66    Ok(RunBoundaryReceipt {
67        run_id,
68        boundary,
69        contributing_input_ids,
70        conversation_digest,
71        message_count,
72        sequence,
73    })
74}
75
76/// Atomic persistence interface for runtime state.
77///
78/// Implementations:
79/// - `InMemoryRuntimeStore` โ€” in-memory, no durability (ephemeral/testing)
80/// - `RedbRuntimeStore` โ€” shared redb database, single-transaction atomicity (Phase 4)
81#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
82#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
83pub trait RuntimeStore: Send + Sync {
84    /// Atomically persist session delta + authoritative receipt + input state updates.
85    ///
86    /// The receipt MUST be minted by the durable commit seam itself, not by the
87    /// caller, so returned success carries the exact proof that was stored.
88    async fn commit_session_boundary(
89        &self,
90        runtime_id: &LogicalRuntimeId,
91        session_delta: SessionDelta,
92        run_id: RunId,
93        boundary: RunApplyBoundary,
94        contributing_input_ids: Vec<InputId>,
95        input_updates: Vec<InputState>,
96    ) -> Result<RunBoundaryReceipt, RuntimeStoreError>;
97
98    /// Atomically persist session delta + receipt + input state updates.
99    ///
100    /// All three writes MUST commit in a single atomic operation.
101    /// If any write fails, none should be visible.
102    /// Atomically persist session delta + receipt + input state updates.
103    ///
104    /// All writes MUST commit in a single atomic operation.
105    /// If `session_store_key` is `Some`, also writes the session snapshot
106    /// to the sessions table (the same table `SessionStore` uses), providing
107    /// a unified boundary commit across both stores.
108    async fn atomic_apply(
109        &self,
110        runtime_id: &LogicalRuntimeId,
111        session_delta: Option<SessionDelta>,
112        receipt: RunBoundaryReceipt,
113        input_updates: Vec<InputState>,
114        session_store_key: Option<meerkat_core::types::SessionId>,
115    ) -> Result<(), RuntimeStoreError>;
116
117    /// Load all input states for a runtime.
118    async fn load_input_states(
119        &self,
120        runtime_id: &LogicalRuntimeId,
121    ) -> Result<Vec<InputState>, RuntimeStoreError>;
122
123    /// Load a specific boundary receipt.
124    async fn load_boundary_receipt(
125        &self,
126        runtime_id: &LogicalRuntimeId,
127        run_id: &RunId,
128        sequence: u64,
129    ) -> Result<Option<RunBoundaryReceipt>, RuntimeStoreError>;
130
131    /// Load the latest committed session snapshot for a runtime, if any.
132    async fn load_session_snapshot(
133        &self,
134        runtime_id: &LogicalRuntimeId,
135    ) -> Result<Option<Vec<u8>>, RuntimeStoreError>;
136
137    /// Persist a single input state (for durable-before-ack).
138    async fn persist_input_state(
139        &self,
140        runtime_id: &LogicalRuntimeId,
141        state: &InputState,
142    ) -> Result<(), RuntimeStoreError>;
143
144    /// Load a single input state.
145    async fn load_input_state(
146        &self,
147        runtime_id: &LogicalRuntimeId,
148        input_id: &InputId,
149    ) -> Result<Option<InputState>, RuntimeStoreError>;
150
151    /// Persist the runtime state itself (for durable retire/stop semantics).
152    async fn persist_runtime_state(
153        &self,
154        runtime_id: &LogicalRuntimeId,
155        state: RuntimeState,
156    ) -> Result<(), RuntimeStoreError>;
157
158    /// Load the last persisted runtime state, if any.
159    async fn load_runtime_state(
160        &self,
161        runtime_id: &LogicalRuntimeId,
162    ) -> Result<Option<RuntimeState>, RuntimeStoreError>;
163
164    /// Atomically commit lifecycle state changes (retire/reset/stop/destroy).
165    ///
166    /// Writes runtime state + all input state updates in a single atomic
167    /// operation. Used for lifecycle ops that don't produce boundary receipts.
168    async fn atomic_lifecycle_commit(
169        &self,
170        runtime_id: &LogicalRuntimeId,
171        runtime_state: RuntimeState,
172        input_states: &[InputState],
173    ) -> Result<(), RuntimeStoreError>;
174}
175
176pub use memory::InMemoryRuntimeStore;
177#[cfg(feature = "redb-store")]
178pub use redb::RedbRuntimeStore;
179#[cfg(feature = "sqlite-store")]
180pub use sqlite::SqliteRuntimeStore;