meerkat_runtime/store/
mod.rs1pub mod memory;
7#[cfg(feature = "sqlite-store")]
8pub mod sqlite;
9
10use meerkat_core::lifecycle::run_primitive::RunApplyBoundary;
11use meerkat_core::lifecycle::{InputId, RunBoundaryReceipt, RunId};
12use sha2::{Digest, Sha256};
13
14use crate::identifiers::LogicalRuntimeId;
15use crate::input_state::InputState;
16use crate::runtime_state::RuntimeState;
17
18#[derive(Debug, Clone, thiserror::Error)]
20#[non_exhaustive]
21pub enum RuntimeStoreError {
22 #[error("Store write failed: {0}")]
24 WriteFailed(String),
25 #[error("Store read failed: {0}")]
27 ReadFailed(String),
28 #[error("Not found: {0}")]
30 NotFound(String),
31 #[error("Internal error: {0}")]
33 Internal(String),
34}
35
36#[derive(Debug, Clone)]
38pub struct SessionDelta {
39 pub session_snapshot: Vec<u8>,
41}
42
43fn authoritative_receipt(
44 session_delta: Option<&SessionDelta>,
45 run_id: RunId,
46 boundary: RunApplyBoundary,
47 contributing_input_ids: Vec<InputId>,
48 sequence: u64,
49) -> Result<RunBoundaryReceipt, RuntimeStoreError> {
50 let (conversation_digest, message_count) = match session_delta {
51 Some(delta) => {
52 let session: meerkat_core::Session = serde_json::from_slice(&delta.session_snapshot)
53 .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
54 let encoded_messages = serde_json::to_vec(session.messages())
55 .map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))?;
56 (
57 Some(format!("{:x}", Sha256::digest(encoded_messages))),
58 session.messages().len(),
59 )
60 }
61 None => (None, 0),
62 };
63
64 Ok(RunBoundaryReceipt {
65 run_id,
66 boundary,
67 contributing_input_ids,
68 conversation_digest,
69 message_count,
70 sequence,
71 })
72}
73
74#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
80#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
81pub trait RuntimeStore: Send + Sync {
82 async fn commit_session_boundary(
87 &self,
88 runtime_id: &LogicalRuntimeId,
89 session_delta: SessionDelta,
90 run_id: RunId,
91 boundary: RunApplyBoundary,
92 contributing_input_ids: Vec<InputId>,
93 input_updates: Vec<InputState>,
94 ) -> Result<RunBoundaryReceipt, RuntimeStoreError>;
95
96 async fn atomic_apply(
107 &self,
108 runtime_id: &LogicalRuntimeId,
109 session_delta: Option<SessionDelta>,
110 receipt: RunBoundaryReceipt,
111 input_updates: Vec<InputState>,
112 session_store_key: Option<meerkat_core::types::SessionId>,
113 ) -> Result<(), RuntimeStoreError>;
114
115 async fn load_input_states(
117 &self,
118 runtime_id: &LogicalRuntimeId,
119 ) -> Result<Vec<InputState>, RuntimeStoreError>;
120
121 async fn load_boundary_receipt(
123 &self,
124 runtime_id: &LogicalRuntimeId,
125 run_id: &RunId,
126 sequence: u64,
127 ) -> Result<Option<RunBoundaryReceipt>, RuntimeStoreError>;
128
129 async fn load_session_snapshot(
131 &self,
132 runtime_id: &LogicalRuntimeId,
133 ) -> Result<Option<Vec<u8>>, RuntimeStoreError>;
134
135 async fn persist_input_state(
137 &self,
138 runtime_id: &LogicalRuntimeId,
139 state: &InputState,
140 ) -> Result<(), RuntimeStoreError>;
141
142 async fn load_input_state(
144 &self,
145 runtime_id: &LogicalRuntimeId,
146 input_id: &InputId,
147 ) -> Result<Option<InputState>, RuntimeStoreError>;
148
149 async fn persist_runtime_state(
151 &self,
152 runtime_id: &LogicalRuntimeId,
153 state: RuntimeState,
154 ) -> Result<(), RuntimeStoreError>;
155
156 async fn load_runtime_state(
158 &self,
159 runtime_id: &LogicalRuntimeId,
160 ) -> Result<Option<RuntimeState>, RuntimeStoreError>;
161
162 async fn atomic_lifecycle_commit(
167 &self,
168 runtime_id: &LogicalRuntimeId,
169 runtime_state: RuntimeState,
170 input_states: &[InputState],
171 ) -> Result<(), RuntimeStoreError>;
172
173 async fn persist_ops_lifecycle(
175 &self,
176 runtime_id: &LogicalRuntimeId,
177 snapshot: &crate::ops_lifecycle::PersistedOpsSnapshot,
178 ) -> Result<(), RuntimeStoreError> {
179 let _ = (runtime_id, snapshot);
180 Ok(())
181 }
182
183 async fn load_ops_lifecycle(
185 &self,
186 runtime_id: &LogicalRuntimeId,
187 ) -> Result<Option<crate::ops_lifecycle::PersistedOpsSnapshot>, RuntimeStoreError> {
188 let _ = runtime_id;
189 Ok(None)
190 }
191}
192
193pub use memory::InMemoryRuntimeStore;
194#[cfg(feature = "sqlite-store")]
195pub use sqlite::SqliteRuntimeStore;