1pub mod memory;
7#[cfg(feature = "sqlite-store")]
8pub mod sqlite;
9
10use meerkat_core::lifecycle::{InputId, RunBoundaryReceipt, RunId};
11
12use crate::identifiers::LogicalRuntimeId;
13use crate::input_state::{InputStatePersistenceRecord, StoredInputState};
14use crate::runtime_state::RuntimeState;
15
16const MACHINE_LIFECYCLE_STORE_RECORD_VERSION: u16 = 1;
17
18#[derive(Debug, Clone, thiserror::Error)]
20#[non_exhaustive]
21pub enum RuntimeStoreError {
22 #[error("Store write failed: {0}")]
24 WriteFailed(String),
25 #[error("Store read failed: {0}")]
27 ReadFailed(String),
28 #[error("Session store key mismatch: expected {expected}, actual {actual}")]
30 SessionKeyMismatch {
31 expected: meerkat_core::types::SessionId,
32 actual: meerkat_core::types::SessionId,
33 },
34 #[error("Not found: {0}")]
36 NotFound(String),
37 #[error("Unsupported store operation: {0}")]
39 Unsupported(String),
40 #[error("Transcript revision conflict: expected {expected}, actual {actual}")]
42 TranscriptRevisionConflict { expected: String, actual: String },
43 #[error("Internal error: {0}")]
45 Internal(String),
46}
47
48pub type AuthOAuthFlowSnapshotUpdate<'a> =
50 dyn FnMut(Option<&[u8]>) -> Result<Vec<u8>, RuntimeStoreError> + 'a;
51
52#[derive(Debug, Clone)]
54pub struct SessionDelta {
55 pub session_snapshot: Vec<u8>,
57}
58
59#[derive(Debug, Clone, Default, PartialEq, Eq)]
66pub struct MachineLifecycleBindingFacts {
67 agent_runtime_id: Option<String>,
68 fence_token: Option<u64>,
69 runtime_generation: Option<u64>,
70 runtime_epoch_id: Option<String>,
71}
72
73impl MachineLifecycleBindingFacts {
74 pub(crate) fn new(
75 agent_runtime_id: Option<String>,
76 fence_token: Option<u64>,
77 runtime_generation: Option<u64>,
78 runtime_epoch_id: Option<String>,
79 ) -> Self {
80 Self {
81 agent_runtime_id,
82 fence_token,
83 runtime_generation,
84 runtime_epoch_id,
85 }
86 }
87
88 pub fn agent_runtime_id(&self) -> Option<&str> {
89 self.agent_runtime_id.as_deref()
90 }
91
92 pub fn fence_token(&self) -> Option<u64> {
93 self.fence_token
94 }
95
96 pub fn runtime_generation(&self) -> Option<u64> {
97 self.runtime_generation
98 }
99
100 pub fn runtime_epoch_id(&self) -> Option<&str> {
101 self.runtime_epoch_id.as_deref()
102 }
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
107pub struct MachineLifecycleSnapshot {
108 runtime_state: RuntimeState,
109 binding: MachineLifecycleBindingFacts,
110}
111
112impl MachineLifecycleSnapshot {
113 pub(crate) fn new(runtime_state: RuntimeState, binding: MachineLifecycleBindingFacts) -> Self {
114 Self {
115 runtime_state,
116 binding,
117 }
118 }
119
120 pub fn runtime_state(&self) -> RuntimeState {
122 self.runtime_state
123 }
124
125 pub fn binding(&self) -> &MachineLifecycleBindingFacts {
127 &self.binding
128 }
129}
130
131#[derive(serde::Serialize, serde::Deserialize)]
132struct MachineLifecycleBindingFactsStoreWire {
133 agent_runtime_id: Option<String>,
134 fence_token: Option<u64>,
135 runtime_generation: Option<u64>,
136 runtime_epoch_id: Option<String>,
137}
138
139impl From<&MachineLifecycleBindingFacts> for MachineLifecycleBindingFactsStoreWire {
140 fn from(binding: &MachineLifecycleBindingFacts) -> Self {
141 Self {
142 agent_runtime_id: binding.agent_runtime_id().map(ToOwned::to_owned),
143 fence_token: binding.fence_token(),
144 runtime_generation: binding.runtime_generation(),
145 runtime_epoch_id: binding.runtime_epoch_id().map(ToOwned::to_owned),
146 }
147 }
148}
149
150impl From<MachineLifecycleBindingFactsStoreWire> for MachineLifecycleBindingFacts {
151 fn from(binding: MachineLifecycleBindingFactsStoreWire) -> Self {
152 Self::new(
153 binding.agent_runtime_id,
154 binding.fence_token,
155 binding.runtime_generation,
156 binding.runtime_epoch_id,
157 )
158 }
159}
160
161#[derive(serde::Serialize, serde::Deserialize)]
162struct MachineLifecycleSnapshotStoreWire {
163 record_version: u16,
164 runtime_state: RuntimeState,
165 binding: MachineLifecycleBindingFactsStoreWire,
166}
167
168impl From<&MachineLifecycleSnapshot> for MachineLifecycleSnapshotStoreWire {
169 fn from(snapshot: &MachineLifecycleSnapshot) -> Self {
170 Self {
171 record_version: MACHINE_LIFECYCLE_STORE_RECORD_VERSION,
172 runtime_state: snapshot.runtime_state(),
173 binding: snapshot.binding().into(),
174 }
175 }
176}
177
178impl TryFrom<MachineLifecycleSnapshotStoreWire> for MachineLifecycleSnapshot {
179 type Error = RuntimeStoreError;
180
181 fn try_from(record: MachineLifecycleSnapshotStoreWire) -> Result<Self, Self::Error> {
182 if record.record_version != MACHINE_LIFECYCLE_STORE_RECORD_VERSION {
183 return Err(RuntimeStoreError::ReadFailed(format!(
184 "unsupported machine lifecycle store record version {}",
185 record.record_version
186 )));
187 }
188 Ok(Self::new(record.runtime_state, record.binding.into()))
189 }
190}
191
192fn decode_machine_lifecycle_store_record(
193 bytes: &[u8],
194) -> Result<MachineLifecycleSnapshot, RuntimeStoreError> {
195 let record = serde_json::from_slice::<MachineLifecycleSnapshotStoreWire>(bytes)
196 .map_err(|err| RuntimeStoreError::ReadFailed(err.to_string()))?;
197 MachineLifecycleSnapshot::try_from(record)
198}
199
200pub async fn load_runtime_state(
208 store: &dyn RuntimeStore,
209 runtime_id: &LogicalRuntimeId,
210) -> Result<Option<RuntimeState>, RuntimeStoreError> {
211 Ok(load_machine_lifecycle(store, runtime_id)
212 .await?
213 .map(|snapshot| snapshot.runtime_state()))
214}
215
216pub(crate) async fn load_machine_lifecycle(
217 store: &dyn RuntimeStore,
218 runtime_id: &LogicalRuntimeId,
219) -> Result<Option<MachineLifecycleSnapshot>, RuntimeStoreError> {
220 store
221 .load_machine_lifecycle_record(runtime_id)
222 .await?
223 .map(|bytes| decode_machine_lifecycle_store_record(&bytes))
224 .transpose()
225}
226
227#[derive(Debug, Clone, PartialEq, Eq)]
233pub struct MachineLifecycleStoreRecord {
234 snapshot: MachineLifecycleSnapshot,
235}
236
237impl MachineLifecycleStoreRecord {
238 pub(crate) fn from_snapshot(snapshot: &MachineLifecycleSnapshot) -> Self {
239 Self {
240 snapshot: snapshot.clone(),
241 }
242 }
243
244 pub fn encode(&self) -> Result<Vec<u8>, RuntimeStoreError> {
245 let wire = MachineLifecycleSnapshotStoreWire::from(&self.snapshot);
246 serde_json::to_vec(&wire).map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))
247 }
248}
249
250#[derive(Debug, Clone, PartialEq, Eq)]
256pub struct MachineLifecycleCommit {
257 snapshot: MachineLifecycleSnapshot,
258}
259
260impl MachineLifecycleCommit {
261 pub(crate) fn new_with_binding(
262 runtime_state: RuntimeState,
263 binding: MachineLifecycleBindingFacts,
264 ) -> Self {
265 Self {
266 snapshot: MachineLifecycleSnapshot::new(runtime_state, binding),
267 }
268 }
269
270 pub fn runtime_state(&self) -> RuntimeState {
272 self.snapshot.runtime_state()
273 }
274
275 pub fn snapshot(&self) -> &MachineLifecycleSnapshot {
277 &self.snapshot
278 }
279
280 pub fn store_record(&self) -> MachineLifecycleStoreRecord {
282 MachineLifecycleStoreRecord::from_snapshot(&self.snapshot)
283 }
284
285 pub(crate) fn into_snapshot(self) -> MachineLifecycleSnapshot {
286 self.snapshot
287 }
288}
289
290#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
296#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
297pub trait RuntimeStore: Send + Sync {
298 fn auth_authority_key(&self) -> Option<String> {
301 None
302 }
303
304 fn persist_auth_oauth_flow_snapshot(
310 &self,
311 snapshot_json: &[u8],
312 ) -> Result<(), RuntimeStoreError> {
313 let _ = snapshot_json;
314 Err(RuntimeStoreError::Unsupported(
315 "persist_auth_oauth_flow_snapshot".into(),
316 ))
317 }
318
319 fn load_auth_oauth_flow_snapshot(&self) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
321 Err(RuntimeStoreError::Unsupported(
322 "load_auth_oauth_flow_snapshot".into(),
323 ))
324 }
325
326 fn update_auth_oauth_flow_snapshot(
332 &self,
333 _update: &mut AuthOAuthFlowSnapshotUpdate<'_>,
334 ) -> Result<(), RuntimeStoreError> {
335 Err(RuntimeStoreError::Unsupported(
336 "update_auth_oauth_flow_snapshot".into(),
337 ))
338 }
339
340 async fn commit_session_snapshot(
345 &self,
346 runtime_id: &LogicalRuntimeId,
347 session_delta: SessionDelta,
348 ) -> Result<(), RuntimeStoreError>;
349
350 async fn commit_session_transcript_rewrite_snapshot(
356 &self,
357 runtime_id: &LogicalRuntimeId,
358 session_delta: SessionDelta,
359 commit: &meerkat_core::TranscriptRewriteCommit,
360 ) -> Result<(), RuntimeStoreError> {
361 let _ = (runtime_id, session_delta, commit);
362 Err(RuntimeStoreError::Unsupported(
363 "commit_session_transcript_rewrite_snapshot".into(),
364 ))
365 }
366
367 async fn atomic_apply(
380 &self,
381 runtime_id: &LogicalRuntimeId,
382 session_delta: Option<SessionDelta>,
383 receipt: RunBoundaryReceipt,
384 input_updates: Vec<InputStatePersistenceRecord>,
385 session_store_key: Option<meerkat_core::types::SessionId>,
386 ) -> Result<(), RuntimeStoreError>;
387
388 async fn load_input_states(
390 &self,
391 runtime_id: &LogicalRuntimeId,
392 ) -> Result<Vec<StoredInputState>, RuntimeStoreError>;
393
394 async fn load_boundary_receipt(
396 &self,
397 runtime_id: &LogicalRuntimeId,
398 run_id: &RunId,
399 sequence: u64,
400 ) -> Result<Option<RunBoundaryReceipt>, RuntimeStoreError>;
401
402 async fn load_session_snapshot(
404 &self,
405 runtime_id: &LogicalRuntimeId,
406 ) -> Result<Option<Vec<u8>>, RuntimeStoreError>;
407
408 async fn clear_session_snapshot(
414 &self,
415 runtime_id: &LogicalRuntimeId,
416 ) -> Result<(), RuntimeStoreError>;
417
418 async fn replace_session_snapshot_if_current(
425 &self,
426 runtime_id: &LogicalRuntimeId,
427 expected_current: &[u8],
428 replacement: Vec<u8>,
429 ) -> Result<bool, RuntimeStoreError>;
430
431 async fn clear_session_snapshot_if_current(
436 &self,
437 runtime_id: &LogicalRuntimeId,
438 expected_current: &[u8],
439 ) -> Result<bool, RuntimeStoreError>;
440
441 async fn persist_input_state(
443 &self,
444 runtime_id: &LogicalRuntimeId,
445 state: &InputStatePersistenceRecord,
446 ) -> Result<(), RuntimeStoreError>;
447
448 async fn load_input_state(
450 &self,
451 runtime_id: &LogicalRuntimeId,
452 input_id: &InputId,
453 ) -> Result<Option<StoredInputState>, RuntimeStoreError>;
454
455 async fn load_machine_lifecycle_record(
463 &self,
464 runtime_id: &LogicalRuntimeId,
465 ) -> Result<Option<Vec<u8>>, RuntimeStoreError>;
466
467 async fn commit_machine_lifecycle(
474 &self,
475 runtime_id: &LogicalRuntimeId,
476 commit: MachineLifecycleCommit,
477 input_states: &[InputStatePersistenceRecord],
478 ) -> Result<(), RuntimeStoreError>;
479
480 async fn persist_ops_lifecycle(
482 &self,
483 runtime_id: &LogicalRuntimeId,
484 snapshot: &crate::ops_lifecycle::PersistedOpsSnapshot,
485 ) -> Result<(), RuntimeStoreError> {
486 let _ = (runtime_id, snapshot);
487 Err(RuntimeStoreError::Unsupported(
488 "persist_ops_lifecycle".into(),
489 ))
490 }
491
492 async fn load_ops_lifecycle(
494 &self,
495 runtime_id: &LogicalRuntimeId,
496 ) -> Result<Option<crate::ops_lifecycle::PersistedOpsSnapshot>, RuntimeStoreError> {
497 let _ = runtime_id;
498 Err(RuntimeStoreError::Unsupported("load_ops_lifecycle".into()))
499 }
500
501 async fn delete_ops_lifecycle(
503 &self,
504 runtime_id: &LogicalRuntimeId,
505 ) -> Result<(), RuntimeStoreError> {
506 let _ = runtime_id;
507 Err(RuntimeStoreError::Unsupported(
508 "delete_ops_lifecycle".into(),
509 ))
510 }
511}
512
513pub use memory::InMemoryRuntimeStore;
514#[cfg(feature = "sqlite-store")]
515pub use sqlite::SqliteRuntimeStore;