Skip to main content

meerkat_runtime/store/
mod.rs

1//! RuntimeStore — atomic persistence for runtime state.
2//!
3//! Machine-owned runtime commands durably persist [`RunBoundaryReceipt`] values
4//! atomically with their session and input-state effects.
5
6pub mod memory;
7#[cfg(feature = "sqlite-store")]
8pub mod sqlite;
9
10use meerkat_core::lifecycle::{InputId, RunBoundaryReceipt, RunId};
11
12use crate::identifiers::LogicalRuntimeId;
13use crate::input_state::StoredInputState;
14use crate::runtime_state::RuntimeState;
15
16/// Errors from RuntimeStore operations.
17#[derive(Debug, Clone, thiserror::Error)]
18#[non_exhaustive]
19pub enum RuntimeStoreError {
20    /// Write failed.
21    #[error("Store write failed: {0}")]
22    WriteFailed(String),
23    /// Read failed.
24    #[error("Store read failed: {0}")]
25    ReadFailed(String),
26    /// The explicit session-store key does not match the serialized session.
27    #[error("Session store key mismatch: expected {expected}, actual {actual}")]
28    SessionKeyMismatch {
29        expected: meerkat_core::types::SessionId,
30        actual: meerkat_core::types::SessionId,
31    },
32    /// Not found.
33    #[error("Not found: {0}")]
34    NotFound(String),
35    /// Operation is not supported by this store implementation.
36    #[error("Unsupported store operation: {0}")]
37    Unsupported(String),
38    /// Internal error.
39    #[error("Internal error: {0}")]
40    Internal(String),
41}
42
43/// Transactional updater for the runtime-owned OAuth login-flow payload snapshot.
44pub type AuthOAuthFlowSnapshotUpdate<'a> =
45    dyn FnMut(Option<&[u8]>) -> Result<Vec<u8>, RuntimeStoreError> + 'a;
46
47/// Describes a serialized session snapshot for boundary and snapshot-only commits.
48#[derive(Debug, Clone)]
49pub struct SessionDelta {
50    /// Serialized session snapshot (opaque to RuntimeStore).
51    pub session_snapshot: Vec<u8>,
52}
53
54/// Machine-owned lifecycle commit token.
55///
56/// This token has no public constructor. RuntimeStore implementors can persist
57/// the selected state, but callers outside the machine/driver commit path
58/// cannot select arbitrary lifecycle truth.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub struct MachineLifecycleCommit {
61    runtime_state: RuntimeState,
62}
63
64impl MachineLifecycleCommit {
65    pub(crate) fn new(runtime_state: RuntimeState) -> Self {
66        Self { runtime_state }
67    }
68
69    /// Runtime state selected by the owning MeerkatMachine transition.
70    pub fn runtime_state(self) -> RuntimeState {
71        self.runtime_state
72    }
73}
74
75/// Atomic persistence interface for runtime state.
76///
77/// Implementations:
78/// - `InMemoryRuntimeStore` — in-memory, no durability (ephemeral/testing)
79/// - `SqliteRuntimeStore` — SQLite-backed durable runtime state
80#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
81#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
82pub trait RuntimeStore: Send + Sync {
83    /// Stable key for process-local auth/OAuth authority reuse across reopened
84    /// handles for the same durable store.
85    fn auth_authority_key(&self) -> Option<String> {
86        None
87    }
88
89    /// Persist the runtime-owned OAuth login-flow payload snapshot.
90    ///
91    /// The AuthMachine owns admission/consume semantics; this payload snapshot
92    /// carries the PKCE verifier and device-code correlation data needed to
93    /// rehydrate active flows after a persistent runtime process restart.
94    fn persist_auth_oauth_flow_snapshot(
95        &self,
96        snapshot_json: &[u8],
97    ) -> Result<(), RuntimeStoreError> {
98        let _ = snapshot_json;
99        Err(RuntimeStoreError::Unsupported(
100            "persist_auth_oauth_flow_snapshot".into(),
101        ))
102    }
103
104    /// Load the runtime-owned OAuth login-flow payload snapshot, if present.
105    fn load_auth_oauth_flow_snapshot(&self) -> Result<Option<Vec<u8>>, RuntimeStoreError> {
106        Err(RuntimeStoreError::Unsupported(
107            "load_auth_oauth_flow_snapshot".into(),
108        ))
109    }
110
111    /// Atomically update the runtime-owned OAuth login-flow payload snapshot.
112    ///
113    /// Stores that support OAuth snapshots must override this with a lock,
114    /// transaction, or compare-and-swap boundary. A load/compute/persist
115    /// fallback is not safe for admission, capacity, or consume claims.
116    fn update_auth_oauth_flow_snapshot(
117        &self,
118        _update: &mut AuthOAuthFlowSnapshotUpdate<'_>,
119    ) -> Result<(), RuntimeStoreError> {
120        Err(RuntimeStoreError::Unsupported(
121            "update_auth_oauth_flow_snapshot".into(),
122        ))
123    }
124
125    /// Atomically persist a session snapshot that is not a run boundary.
126    ///
127    /// Session-control snapshots update durable session authority without
128    /// producing a [`RunBoundaryReceipt`].
129    async fn commit_session_snapshot(
130        &self,
131        runtime_id: &LogicalRuntimeId,
132        session_delta: SessionDelta,
133    ) -> Result<(), RuntimeStoreError>;
134
135    /// Atomically persist session delta + receipt + input state updates.
136    ///
137    /// All three writes MUST commit in a single atomic operation.
138    /// If any write fails, none should be visible.
139    /// Atomically persist session delta + receipt + input state updates.
140    ///
141    /// All writes MUST commit in a single atomic operation.
142    /// If `session_store_key` is `Some`, validates that the snapshot belongs
143    /// to that session and, for stores that physically share a `SessionStore`
144    /// table, writes that table in the same transaction. Runtime snapshot
145    /// authority remains keyed only by `runtime_id`; `session_store_key` must
146    /// not create a raw session UUID runtime alias.
147    async fn atomic_apply(
148        &self,
149        runtime_id: &LogicalRuntimeId,
150        session_delta: Option<SessionDelta>,
151        receipt: RunBoundaryReceipt,
152        input_updates: Vec<StoredInputState>,
153        session_store_key: Option<meerkat_core::types::SessionId>,
154    ) -> Result<(), RuntimeStoreError>;
155
156    /// Load all input states for a runtime.
157    async fn load_input_states(
158        &self,
159        runtime_id: &LogicalRuntimeId,
160    ) -> Result<Vec<StoredInputState>, RuntimeStoreError>;
161
162    /// Load a specific boundary receipt.
163    async fn load_boundary_receipt(
164        &self,
165        runtime_id: &LogicalRuntimeId,
166        run_id: &RunId,
167        sequence: u64,
168    ) -> Result<Option<RunBoundaryReceipt>, RuntimeStoreError>;
169
170    /// Load the latest committed session snapshot for a runtime, if any.
171    async fn load_session_snapshot(
172        &self,
173        runtime_id: &LogicalRuntimeId,
174    ) -> Result<Option<Vec<u8>>, RuntimeStoreError>;
175
176    /// Persist a single input state (for durable-before-ack).
177    async fn persist_input_state(
178        &self,
179        runtime_id: &LogicalRuntimeId,
180        state: &StoredInputState,
181    ) -> Result<(), RuntimeStoreError>;
182
183    /// Load a single input state.
184    async fn load_input_state(
185        &self,
186        runtime_id: &LogicalRuntimeId,
187        input_id: &InputId,
188    ) -> Result<Option<StoredInputState>, RuntimeStoreError>;
189
190    /// Load the last persisted runtime state, if any.
191    async fn load_runtime_state(
192        &self,
193        runtime_id: &LogicalRuntimeId,
194    ) -> Result<Option<RuntimeState>, RuntimeStoreError>;
195
196    /// Atomically commit machine-owned lifecycle state changes.
197    ///
198    /// Writes runtime state + all input state updates in a single atomic
199    /// operation. `MachineLifecycleCommit` has no public constructor, so this
200    /// cannot be used by compatibility callers to pick runtime truth.
201    async fn commit_machine_lifecycle(
202        &self,
203        runtime_id: &LogicalRuntimeId,
204        commit: MachineLifecycleCommit,
205        input_states: &[StoredInputState],
206    ) -> Result<(), RuntimeStoreError>;
207
208    /// Persist a snapshot of the ops lifecycle registry state.
209    async fn persist_ops_lifecycle(
210        &self,
211        runtime_id: &LogicalRuntimeId,
212        snapshot: &crate::ops_lifecycle::PersistedOpsSnapshot,
213    ) -> Result<(), RuntimeStoreError> {
214        let _ = (runtime_id, snapshot);
215        Err(RuntimeStoreError::Unsupported(
216            "persist_ops_lifecycle".into(),
217        ))
218    }
219
220    /// Load a previously persisted ops lifecycle snapshot.
221    async fn load_ops_lifecycle(
222        &self,
223        runtime_id: &LogicalRuntimeId,
224    ) -> Result<Option<crate::ops_lifecycle::PersistedOpsSnapshot>, RuntimeStoreError> {
225        let _ = runtime_id;
226        Err(RuntimeStoreError::Unsupported("load_ops_lifecycle".into()))
227    }
228}
229
230pub use memory::InMemoryRuntimeStore;
231#[cfg(feature = "sqlite-store")]
232pub use sqlite::SqliteRuntimeStore;