Skip to main content

meerkat_runtime/store/
mod.rs

1//! RuntimeStore — atomic persistence for runtime state.
2//!
3//! Machine-owned runtime commands durably persist [`RunBoundaryReceipt`] values
4//! atomically with their session and input-state effects.
5
6pub mod memory;
7#[cfg(feature = "sqlite-store")]
8pub mod sqlite;
9
10use meerkat_core::lifecycle::{InputId, RunBoundaryReceipt, RunId};
11
12use crate::identifiers::LogicalRuntimeId;
13use crate::input_state::{InputStatePersistenceRecord, StoredInputState};
14use crate::runtime_state::RuntimeState;
15
16const MACHINE_LIFECYCLE_STORE_RECORD_VERSION: u16 = 1;
17
18/// Errors from RuntimeStore operations.
19#[derive(Debug, Clone, thiserror::Error)]
20#[non_exhaustive]
21pub enum RuntimeStoreError {
22    /// Write failed.
23    #[error("Store write failed: {0}")]
24    WriteFailed(String),
25    /// Read failed.
26    #[error("Store read failed: {0}")]
27    ReadFailed(String),
28    /// The explicit session-store key does not match the serialized session.
29    #[error("Session store key mismatch: expected {expected}, actual {actual}")]
30    SessionKeyMismatch {
31        expected: meerkat_core::types::SessionId,
32        actual: meerkat_core::types::SessionId,
33    },
34    /// Not found.
35    #[error("Not found: {0}")]
36    NotFound(String),
37    /// Operation is not supported by this store implementation.
38    #[error("Unsupported store operation: {0}")]
39    Unsupported(String),
40    /// Runtime snapshot CAS rejected a stale transcript rewrite.
41    #[error("Transcript revision conflict: expected {expected}, actual {actual}")]
42    TranscriptRevisionConflict { expected: String, actual: String },
43    /// Internal error.
44    #[error("Internal error: {0}")]
45    Internal(String),
46}
47
48/// Transactional updater for the runtime-owned OAuth login-flow payload snapshot.
49pub type AuthOAuthFlowSnapshotUpdate<'a> =
50    dyn FnMut(Option<&[u8]>) -> Result<Vec<u8>, RuntimeStoreError> + 'a;
51
52/// Describes a serialized session snapshot for boundary and snapshot-only commits.
53#[derive(Debug, Clone)]
54pub struct SessionDelta {
55    /// Serialized session snapshot (opaque to RuntimeStore).
56    pub session_snapshot: Vec<u8>,
57}
58
59/// Runtime binding facts selected by generated MeerkatMachine authority.
60///
61/// RuntimeStore implementations persist and read these facts as part of a
62/// machine lifecycle snapshot. The commit token that writes these facts stays
63/// crate-private so compatibility callers cannot mint replacement lifecycle
64/// truth.
65#[derive(Debug, Clone, Default, PartialEq, Eq)]
66pub struct MachineLifecycleBindingFacts {
67    agent_runtime_id: Option<String>,
68    fence_token: Option<u64>,
69    runtime_generation: Option<u64>,
70    runtime_epoch_id: Option<String>,
71}
72
73impl MachineLifecycleBindingFacts {
74    pub(crate) fn new(
75        agent_runtime_id: Option<String>,
76        fence_token: Option<u64>,
77        runtime_generation: Option<u64>,
78        runtime_epoch_id: Option<String>,
79    ) -> Self {
80        Self {
81            agent_runtime_id,
82            fence_token,
83            runtime_generation,
84            runtime_epoch_id,
85        }
86    }
87
88    pub fn agent_runtime_id(&self) -> Option<&str> {
89        self.agent_runtime_id.as_deref()
90    }
91
92    pub fn fence_token(&self) -> Option<u64> {
93        self.fence_token
94    }
95
96    pub fn runtime_generation(&self) -> Option<u64> {
97        self.runtime_generation
98    }
99
100    pub fn runtime_epoch_id(&self) -> Option<&str> {
101        self.runtime_epoch_id.as_deref()
102    }
103}
104
105/// Durable read-back shape for machine-owned lifecycle state.
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub struct MachineLifecycleSnapshot {
108    runtime_state: RuntimeState,
109    binding: MachineLifecycleBindingFacts,
110}
111
112impl MachineLifecycleSnapshot {
113    pub(crate) fn new(runtime_state: RuntimeState, binding: MachineLifecycleBindingFacts) -> Self {
114        Self {
115            runtime_state,
116            binding,
117        }
118    }
119
120    /// Runtime state selected by the owning MeerkatMachine transition.
121    pub fn runtime_state(&self) -> RuntimeState {
122        self.runtime_state
123    }
124
125    /// Runtime binding facts selected by the owning MeerkatMachine transition.
126    pub fn binding(&self) -> &MachineLifecycleBindingFacts {
127        &self.binding
128    }
129}
130
131#[derive(serde::Serialize, serde::Deserialize)]
132struct MachineLifecycleBindingFactsStoreWire {
133    agent_runtime_id: Option<String>,
134    fence_token: Option<u64>,
135    runtime_generation: Option<u64>,
136    runtime_epoch_id: Option<String>,
137}
138
139impl From<&MachineLifecycleBindingFacts> for MachineLifecycleBindingFactsStoreWire {
140    fn from(binding: &MachineLifecycleBindingFacts) -> Self {
141        Self {
142            agent_runtime_id: binding.agent_runtime_id().map(ToOwned::to_owned),
143            fence_token: binding.fence_token(),
144            runtime_generation: binding.runtime_generation(),
145            runtime_epoch_id: binding.runtime_epoch_id().map(ToOwned::to_owned),
146        }
147    }
148}
149
150impl From<MachineLifecycleBindingFactsStoreWire> for MachineLifecycleBindingFacts {
151    fn from(binding: MachineLifecycleBindingFactsStoreWire) -> Self {
152        Self::new(
153            binding.agent_runtime_id,
154            binding.fence_token,
155            binding.runtime_generation,
156            binding.runtime_epoch_id,
157        )
158    }
159}
160
161#[derive(serde::Serialize, serde::Deserialize)]
162struct MachineLifecycleSnapshotStoreWire {
163    record_version: u16,
164    runtime_state: RuntimeState,
165    binding: MachineLifecycleBindingFactsStoreWire,
166}
167
168impl From<&MachineLifecycleSnapshot> for MachineLifecycleSnapshotStoreWire {
169    fn from(snapshot: &MachineLifecycleSnapshot) -> Self {
170        Self {
171            record_version: MACHINE_LIFECYCLE_STORE_RECORD_VERSION,
172            runtime_state: snapshot.runtime_state(),
173            binding: snapshot.binding().into(),
174        }
175    }
176}
177
178impl TryFrom<MachineLifecycleSnapshotStoreWire> for MachineLifecycleSnapshot {
179    type Error = RuntimeStoreError;
180
181    fn try_from(record: MachineLifecycleSnapshotStoreWire) -> Result<Self, Self::Error> {
182        if record.record_version != MACHINE_LIFECYCLE_STORE_RECORD_VERSION {
183            return Err(RuntimeStoreError::ReadFailed(format!(
184                "unsupported machine lifecycle store record version {}",
185                record.record_version
186            )));
187        }
188        Ok(Self::new(record.runtime_state, record.binding.into()))
189    }
190}
191
192fn decode_machine_lifecycle_store_record(
193    bytes: &[u8],
194) -> Result<MachineLifecycleSnapshot, RuntimeStoreError> {
195    let record = serde_json::from_slice::<MachineLifecycleSnapshotStoreWire>(bytes)
196        .map_err(|err| RuntimeStoreError::ReadFailed(err.to_string()))?;
197    MachineLifecycleSnapshot::try_from(record)
198}
199
200/// Load the last persisted runtime-state projection from a generated lifecycle
201/// record.
202///
203/// This is a projection of [`MachineLifecycleCommit`] authority. Store
204/// implementations provide only opaque record bytes; the runtime crate owns the
205/// decoding and rejects compatibility rows that are not machine lifecycle
206/// records.
207pub async fn load_runtime_state(
208    store: &dyn RuntimeStore,
209    runtime_id: &LogicalRuntimeId,
210) -> Result<Option<RuntimeState>, RuntimeStoreError> {
211    Ok(load_machine_lifecycle(store, runtime_id)
212        .await?
213        .map(|snapshot| snapshot.runtime_state()))
214}
215
216pub(crate) async fn load_machine_lifecycle(
217    store: &dyn RuntimeStore,
218    runtime_id: &LogicalRuntimeId,
219) -> Result<Option<MachineLifecycleSnapshot>, RuntimeStoreError> {
220    store
221        .load_machine_lifecycle_record(runtime_id)
222        .await?
223        .map(|bytes| decode_machine_lifecycle_store_record(&bytes))
224        .transpose()
225}
226
227/// Declared durable store record for generated machine lifecycle truth.
228///
229/// Stores receive this record from [`MachineLifecycleCommit`] and may persist
230/// its encoded form. Loading must decode this exact record shape; compatibility
231/// runtime-state projections are not lifecycle authority.
232#[derive(Debug, Clone, PartialEq, Eq)]
233pub struct MachineLifecycleStoreRecord {
234    snapshot: MachineLifecycleSnapshot,
235}
236
237impl MachineLifecycleStoreRecord {
238    pub(crate) fn from_snapshot(snapshot: &MachineLifecycleSnapshot) -> Self {
239        Self {
240            snapshot: snapshot.clone(),
241        }
242    }
243
244    pub fn encode(&self) -> Result<Vec<u8>, RuntimeStoreError> {
245        let wire = MachineLifecycleSnapshotStoreWire::from(&self.snapshot);
246        serde_json::to_vec(&wire).map_err(|err| RuntimeStoreError::WriteFailed(err.to_string()))
247    }
248}
249
250/// Machine-owned lifecycle commit token.
251///
252/// This token has no public constructor. RuntimeStore implementors can persist
253/// the selected state and binding facts, but callers outside the machine/driver
254/// commit path cannot select arbitrary lifecycle truth.
255#[derive(Debug, Clone, PartialEq, Eq)]
256pub struct MachineLifecycleCommit {
257    snapshot: MachineLifecycleSnapshot,
258}
259
260impl MachineLifecycleCommit {
261    pub(crate) fn new_with_binding(
262        runtime_state: RuntimeState,
263        binding: MachineLifecycleBindingFacts,
264    ) -> Self {
265        Self {
266            snapshot: MachineLifecycleSnapshot::new(runtime_state, binding),
267        }
268    }
269
270    /// Runtime state selected by the owning MeerkatMachine transition.
271    pub fn runtime_state(&self) -> RuntimeState {
272        self.snapshot.runtime_state()
273    }
274
275    /// Durable lifecycle snapshot selected by the owning MeerkatMachine transition.
276    pub fn snapshot(&self) -> &MachineLifecycleSnapshot {
277        &self.snapshot
278    }
279
280    /// Durable record selected by the owning MeerkatMachine transition.
281    pub fn store_record(&self) -> MachineLifecycleStoreRecord {
282        MachineLifecycleStoreRecord::from_snapshot(&self.snapshot)
283    }
284
285    pub(crate) fn into_snapshot(self) -> MachineLifecycleSnapshot {
286        self.snapshot
287    }
288}
289
290/// Atomic persistence interface for runtime state.
291///
292/// Implementations:
293/// - `InMemoryRuntimeStore` — in-memory, no durability (ephemeral/testing)
294/// - `SqliteRuntimeStore` — SQLite-backed durable runtime state
295#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
296#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
297pub trait RuntimeStore: Send + Sync {
298    /// Stable key for process-local auth/OAuth authority reuse across reopened
299    /// handles for the same durable store.
300    fn auth_authority_key(&self) -> Option<String> {
301        None
302    }
303
304    /// Persist the runtime-owned OAuth login-flow payload snapshot.
305    ///
306    /// The AuthMachine owns admission/consume semantics; this payload snapshot
307    /// carries the PKCE verifier and device-code correlation data needed to
308    /// rehydrate active flows after a persistent runtime process restart.
309    fn persist_auth_oauth_flow_snapshot(
310        &self,
311        snapshot_json: &[u8],
312    ) -> Result<(), RuntimeStoreError> {
313        let _ = snapshot_json;
314        Err(RuntimeStoreError::Unsupported(
315            "persist_auth_oauth_flow_snapshot".into(),
316        ))
317    }
318
319    /// Load the runtime-owned OAuth login-flow payload snapshot, if present.
320    fn load_auth_oauth_flow_snapshot(&self) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
321        Err(RuntimeStoreError::Unsupported(
322            "load_auth_oauth_flow_snapshot".into(),
323        ))
324    }
325
326    /// Atomically update the runtime-owned OAuth login-flow payload snapshot.
327    ///
328    /// Stores that support OAuth snapshots must override this with a lock,
329    /// transaction, or compare-and-swap boundary. A load/compute/persist
330    /// fallback is not safe for admission, capacity, or consume claims.
331    fn update_auth_oauth_flow_snapshot(
332        &self,
333        _update: &mut AuthOAuthFlowSnapshotUpdate<'_>,
334    ) -> Result<(), RuntimeStoreError> {
335        Err(RuntimeStoreError::Unsupported(
336            "update_auth_oauth_flow_snapshot".into(),
337        ))
338    }
339
340    /// Atomically persist a session snapshot that is not a run boundary.
341    ///
342    /// Session-control snapshots update durable session authority without
343    /// producing a [`RunBoundaryReceipt`].
344    async fn commit_session_snapshot(
345        &self,
346        runtime_id: &LogicalRuntimeId,
347        session_delta: SessionDelta,
348    ) -> Result<(), RuntimeStoreError>;
349
350    /// Atomically persist a same-session transcript rewrite snapshot.
351    ///
352    /// Store implementations that support transcript edits must compare the
353    /// currently persisted session transcript revision with `commit.parent_revision`
354    /// inside the same lock or transaction that writes `session_delta`.
355    async fn commit_session_transcript_rewrite_snapshot(
356        &self,
357        runtime_id: &LogicalRuntimeId,
358        session_delta: SessionDelta,
359        commit: &meerkat_core::TranscriptRewriteCommit,
360    ) -> Result<(), RuntimeStoreError> {
361        let _ = (runtime_id, session_delta, commit);
362        Err(RuntimeStoreError::Unsupported(
363            "commit_session_transcript_rewrite_snapshot".into(),
364        ))
365    }
366
367    /// Atomically persist session delta + receipt + input state updates.
368    ///
369    /// All three writes MUST commit in a single atomic operation.
370    /// If any write fails, none should be visible.
371    /// Atomically persist session delta + receipt + input state updates.
372    ///
373    /// All writes MUST commit in a single atomic operation.
374    /// If `session_store_key` is `Some`, validates that the snapshot belongs
375    /// to that session and, for stores that physically share a `SessionStore`
376    /// table, writes that table in the same transaction. Runtime snapshot
377    /// authority remains keyed only by `runtime_id`; `session_store_key` must
378    /// not create a raw session UUID runtime alias.
379    async fn atomic_apply(
380        &self,
381        runtime_id: &LogicalRuntimeId,
382        session_delta: Option<SessionDelta>,
383        receipt: RunBoundaryReceipt,
384        input_updates: Vec<InputStatePersistenceRecord>,
385        session_store_key: Option<meerkat_core::types::SessionId>,
386    ) -> Result<(), RuntimeStoreError>;
387
388    /// Load all input states for a runtime.
389    async fn load_input_states(
390        &self,
391        runtime_id: &LogicalRuntimeId,
392    ) -> Result<Vec<StoredInputState>, RuntimeStoreError>;
393
394    /// Load a specific boundary receipt.
395    async fn load_boundary_receipt(
396        &self,
397        runtime_id: &LogicalRuntimeId,
398        run_id: &RunId,
399        sequence: u64,
400    ) -> Result<Option<RunBoundaryReceipt>, RuntimeStoreError>;
401
402    /// Load the latest committed session snapshot for a runtime, if any.
403    async fn load_session_snapshot(
404        &self,
405        runtime_id: &LogicalRuntimeId,
406    ) -> Result<Option<Vec<u8>>, RuntimeStoreError>;
407
408    /// Remove the latest committed session snapshot for a runtime.
409    ///
410    /// This is used only as a fail-closed quarantine path after a compatibility
411    /// projection write rejects a runtime snapshot that was already staged as
412    /// runtime authority and the service cannot restore the previous snapshot.
413    async fn clear_session_snapshot(
414        &self,
415        runtime_id: &LogicalRuntimeId,
416    ) -> Result<(), RuntimeStoreError>;
417
418    /// Replace the latest committed session snapshot only if it still matches
419    /// `expected_current`.
420    ///
421    /// Used by fail-closed recovery after a compatibility projection rejected
422    /// a runtime snapshot. Implementations must compare and write atomically so
423    /// recovery cannot overwrite a newer runtime-authoritative snapshot.
424    async fn replace_session_snapshot_if_current(
425        &self,
426        runtime_id: &LogicalRuntimeId,
427        expected_current: &[u8],
428        replacement: Vec<u8>,
429    ) -> Result<bool, RuntimeStoreError>;
430
431    /// Remove the latest committed session snapshot only if it still matches
432    /// `expected_current`.
433    ///
434    /// This is the conditional variant of the fail-closed quarantine path.
435    async fn clear_session_snapshot_if_current(
436        &self,
437        runtime_id: &LogicalRuntimeId,
438        expected_current: &[u8],
439    ) -> Result<bool, RuntimeStoreError>;
440
441    /// Persist a single input state (for durable-before-ack).
442    async fn persist_input_state(
443        &self,
444        runtime_id: &LogicalRuntimeId,
445        state: &InputStatePersistenceRecord,
446    ) -> Result<(), RuntimeStoreError>;
447
448    /// Load a single input state.
449    async fn load_input_state(
450        &self,
451        runtime_id: &LogicalRuntimeId,
452        input_id: &InputId,
453    ) -> Result<Option<StoredInputState>, RuntimeStoreError>;
454
455    /// Load the last persisted machine lifecycle record bytes, if any.
456    ///
457    /// Implementations return only the opaque bytes previously obtained from
458    /// [`MachineLifecycleCommit::store_record`]. The runtime crate decodes
459    /// these bytes through `load_runtime_state` or internal recovery helpers;
460    /// stores must not promote compatibility rows or bare runtime states into
461    /// lifecycle authority.
462    async fn load_machine_lifecycle_record(
463        &self,
464        runtime_id: &LogicalRuntimeId,
465    ) -> Result<Option<Vec<u8>>, RuntimeStoreError>;
466
467    /// Atomically commit machine-owned lifecycle state changes.
468    ///
469    /// Writes runtime state, generated runtime binding facts, and all input
470    /// state updates in a single atomic operation. `MachineLifecycleCommit` has
471    /// no public constructor, so this cannot be used by compatibility callers
472    /// to pick runtime truth.
473    async fn commit_machine_lifecycle(
474        &self,
475        runtime_id: &LogicalRuntimeId,
476        commit: MachineLifecycleCommit,
477        input_states: &[InputStatePersistenceRecord],
478    ) -> Result<(), RuntimeStoreError>;
479
480    /// Persist a snapshot of the ops lifecycle registry state.
481    async fn persist_ops_lifecycle(
482        &self,
483        runtime_id: &LogicalRuntimeId,
484        snapshot: &crate::ops_lifecycle::PersistedOpsSnapshot,
485    ) -> Result<(), RuntimeStoreError> {
486        let _ = (runtime_id, snapshot);
487        Err(RuntimeStoreError::Unsupported(
488            "persist_ops_lifecycle".into(),
489        ))
490    }
491
492    /// Load a previously persisted ops lifecycle snapshot.
493    async fn load_ops_lifecycle(
494        &self,
495        runtime_id: &LogicalRuntimeId,
496    ) -> Result<Option<crate::ops_lifecycle::PersistedOpsSnapshot>, RuntimeStoreError> {
497        let _ = runtime_id;
498        Err(RuntimeStoreError::Unsupported("load_ops_lifecycle".into()))
499    }
500
501    /// Delete a previously persisted ops lifecycle snapshot.
502    async fn delete_ops_lifecycle(
503        &self,
504        runtime_id: &LogicalRuntimeId,
505    ) -> Result<(), RuntimeStoreError> {
506        let _ = runtime_id;
507        Err(RuntimeStoreError::Unsupported(
508            "delete_ops_lifecycle".into(),
509        ))
510    }
511}
512
513pub use memory::InMemoryRuntimeStore;
514#[cfg(feature = "sqlite-store")]
515pub use sqlite::SqliteRuntimeStore;