Skip to main content

meerkat_mob/store/
mod.rs

1//! Mob store traits and implementations.
2
3mod in_memory;
4mod realm_profile;
5#[cfg(not(target_arch = "wasm32"))]
6mod sqlite;
7
8pub use in_memory::{
9    InMemoryMobEventStore, InMemoryMobRunStore, InMemoryMobSpecStore, InMemoryRealmProfileStore,
10};
11pub use realm_profile::{RealmProfileStore, StoredRealmProfile};
12#[cfg(not(target_arch = "wasm32"))]
13pub use sqlite::{
14    SqliteMobEventStore, SqliteMobRunStore, SqliteMobSpecStore, SqliteMobStores,
15    SqliteRealmProfileStore,
16};
17
18use crate::definition::MobDefinition;
19use crate::event::{MobEvent, NewMobEvent};
20use crate::ids::{FlowId, FrameId, LoopId, LoopInstanceId, MobId, RunId, StepId};
21use crate::run::{
22    FailureLedgerEntry, FrameSnapshot, LoopIterationLedgerEntry, LoopSnapshot, MobRun,
23    MobRunStatus, StepLedgerEntry,
24};
25use async_trait::async_trait;
26use chrono::{DateTime, Utc};
27use meerkat_machine_kernels::KernelState;
28
29/// Errors from mob storage operations.
30///
31/// Scoped to storage concerns only — callers convert to [`MobError`](crate::MobError)
32/// at the boundary via the `From` impl.
33#[derive(Debug, thiserror::Error)]
34pub enum MobStoreError {
35    /// A write operation failed.
36    #[error("Write failed: {0}")]
37    WriteFailed(String),
38
39    /// A read operation failed.
40    #[error("Read failed: {0}")]
41    ReadFailed(String),
42
43    /// The requested entity was not found.
44    #[error("Not found: {0}")]
45    NotFound(String),
46
47    /// A compare-and-swap precondition was not met.
48    #[error("CAS conflict: {0}")]
49    CasConflict(String),
50
51    /// Spec revision compare-and-swap failed (structured variant for typed conversion).
52    #[error("spec revision conflict for mob {mob_id}: expected {expected:?}, actual {actual}")]
53    SpecRevisionConflict {
54        mob_id: crate::ids::MobId,
55        expected: Option<u64>,
56        actual: u64,
57    },
58
59    /// Serialization or deserialization failed.
60    #[error("Serialization error: {0}")]
61    Serialization(String),
62
63    /// Internal error.
64    #[error("Internal error: {0}")]
65    Internal(String),
66}
67
68/// Trait for persisting and querying mob events.
69#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
70#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
71pub trait MobEventStore: Send + Sync {
72    /// Append a new event to the store.
73    async fn append(&self, event: NewMobEvent) -> Result<MobEvent, MobStoreError>;
74
75    /// Append multiple events atomically.
76    ///
77    /// Implementations must ensure all-or-nothing semantics: either every
78    /// event is persisted or none are. No default implementation is provided
79    /// to force implementors to consider atomicity.
80    async fn append_batch(&self, events: Vec<NewMobEvent>) -> Result<Vec<MobEvent>, MobStoreError>;
81
82    /// Poll for events after a given cursor, up to a limit.
83    async fn poll(&self, after_cursor: u64, limit: usize) -> Result<Vec<MobEvent>, MobStoreError>;
84
85    /// Replay all events from the beginning.
86    async fn replay_all(&self) -> Result<Vec<MobEvent>, MobStoreError>;
87
88    /// Delete all persisted events.
89    async fn clear(&self) -> Result<(), MobStoreError>;
90
91    /// Prune events older than a timestamp. Returns count of deleted events.
92    async fn prune(&self, _older_than: DateTime<Utc>) -> Result<u64, MobStoreError> {
93        Ok(0)
94    }
95}
96
97/// Trait for persisting and querying flow runs.
98#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
99#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
100pub trait MobRunStore: Send + Sync {
101    async fn create_run(&self, run: MobRun) -> Result<(), MobStoreError>;
102    async fn get_run(&self, run_id: &RunId) -> Result<Option<MobRun>, MobStoreError>;
103    async fn list_runs(
104        &self,
105        mob_id: &MobId,
106        flow_id: Option<&FlowId>,
107    ) -> Result<Vec<MobRun>, MobStoreError>;
108    async fn cas_run_status(
109        &self,
110        run_id: &RunId,
111        expected: MobRunStatus,
112        next: MobRunStatus,
113    ) -> Result<bool, MobStoreError>;
114    async fn cas_flow_state(
115        &self,
116        run_id: &RunId,
117        expected: &KernelState,
118        next: &KernelState,
119    ) -> Result<bool, MobStoreError>;
120    async fn cas_run_snapshot(
121        &self,
122        run_id: &RunId,
123        expected_status: MobRunStatus,
124        expected_flow_state: &KernelState,
125        next_status: MobRunStatus,
126        next_flow_state: &KernelState,
127    ) -> Result<bool, MobStoreError>;
128    async fn append_step_entry(
129        &self,
130        run_id: &RunId,
131        entry: StepLedgerEntry,
132    ) -> Result<(), MobStoreError>;
133    async fn append_step_entry_if_absent(
134        &self,
135        run_id: &RunId,
136        entry: StepLedgerEntry,
137    ) -> Result<bool, MobStoreError>;
138    async fn put_step_output(
139        &self,
140        run_id: &RunId,
141        step_id: &StepId,
142        output: serde_json::Value,
143    ) -> Result<(), MobStoreError>;
144    async fn append_failure_entry(
145        &self,
146        run_id: &RunId,
147        entry: FailureLedgerEntry,
148    ) -> Result<(), MobStoreError>;
149
150    /// Upsert a loop snapshot. Creates or overwrites the entry for `loop_instance_id`
151    /// in `run.loops` and optionally records a `LoopIterationLedgerEntry`.
152    ///
153    /// Used by the sequential `FlowFrameEngine` to persist loop state so that
154    /// `reconcile_run_state` can reconstruct in-progress loops after a crash.
155    ///
156    /// Implementations must treat `ledger_entry` as idempotent by logical
157    /// iteration identity. Replaying the same `(loop_instance_id, iteration, frame_id)`
158    /// on resume must not append a duplicate row.
159    async fn upsert_loop_snapshot(
160        &self,
161        run_id: &RunId,
162        loop_instance_id: &LoopInstanceId,
163        snapshot: LoopSnapshot,
164        ledger_entry: Option<LoopIterationLedgerEntry>,
165    ) -> Result<(), MobStoreError>;
166
167    // Phase 3: CAS wrappers for frame and loop state.
168
169    /// CAS wrapper 1: frame state update.
170    ///
171    /// If `expected` is `None`, this is an insert (frame must not yet exist).
172    /// If `expected` is `Some(snapshot)`, the current frame state must match.
173    /// Returns `Ok(true)` on success, `Ok(false)` on mismatch.
174    ///
175    /// # Frame support
176    async fn cas_frame_state(
177        &self,
178        run_id: &RunId,
179        frame_id: &FrameId,
180        expected: Option<&FrameSnapshot>,
181        next: FrameSnapshot,
182    ) -> Result<bool, MobStoreError>;
183
184    /// CAS wrapper 2: grant node slot — atomically update run flow state + frame state.
185    ///
186    /// # Frame support
187    /// Backends that do not support frame-aware atomic persistence may return
188    /// `Err(MobError::NotYetImplemented(...))`.
189    async fn cas_grant_node_slot(
190        &self,
191        run_id: &RunId,
192        expected_run_state: &KernelState,
193        next_run_state: KernelState,
194        frame_id: &FrameId,
195        expected_frame: &FrameSnapshot,
196        next_frame: FrameSnapshot,
197    ) -> Result<bool, MobStoreError>;
198
199    /// CAS wrapper 3: complete step — update frame state and record step output.
200    ///
201    /// When `loop_context` is `None`, the output is stored in `root_step_outputs`.
202    /// When `loop_context` is `Some((loop_id, iteration))`, the output is stored
203    /// in `loop_iteration_outputs[loop_id][iteration]`.
204    ///
205    /// # Frame support
206    /// Backends that do not support frame-aware atomic persistence may return
207    /// `Err(MobError::NotYetImplemented(...))`.
208    #[allow(clippy::too_many_arguments)]
209    async fn cas_complete_step_and_record_output(
210        &self,
211        run_id: &RunId,
212        frame_id: &FrameId,
213        expected_frame: &FrameSnapshot,
214        next_frame: FrameSnapshot,
215        step_output_key: String,
216        step_output: serde_json::Value,
217        loop_context: Option<(&LoopId, u64)>,
218    ) -> Result<bool, MobStoreError>;
219
220    /// CAS wrapper 4: start loop — register loop + update run state + parent frame.
221    ///
222    /// # Frame support
223    /// Backends that do not support frame-aware atomic persistence may return
224    /// `Err(MobError::NotYetImplemented(...))`.
225    #[allow(clippy::too_many_arguments)]
226    async fn cas_start_loop(
227        &self,
228        run_id: &RunId,
229        loop_instance_id: &LoopInstanceId,
230        expected_run_state: &KernelState,
231        next_run_state: KernelState,
232        frame_id: &FrameId,
233        expected_frame: &FrameSnapshot,
234        next_frame: FrameSnapshot,
235        initial_loop: LoopSnapshot,
236    ) -> Result<bool, MobStoreError>;
237
238    /// CAS wrapper 5: register pending body frame — loop transition + run state update.
239    ///
240    /// # Frame support
241    /// Backends that do not support frame-aware atomic persistence may return
242    /// `Err(MobError::NotYetImplemented(...))`.
243    async fn cas_loop_request_body_frame(
244        &self,
245        run_id: &RunId,
246        loop_instance_id: &LoopInstanceId,
247        expected_loop: &LoopSnapshot,
248        next_loop: LoopSnapshot,
249        expected_run_state: &KernelState,
250        next_run_state: KernelState,
251    ) -> Result<bool, MobStoreError>;
252
253    /// CAS wrapper 6: body frame start — loop transition + register new frame + run state update.
254    ///
255    /// # Frame support
256    /// Backends that do not support frame-aware atomic persistence may return
257    /// `Err(MobError::NotYetImplemented(...))`.
258    #[allow(clippy::too_many_arguments)]
259    async fn cas_grant_body_frame_start(
260        &self,
261        run_id: &RunId,
262        loop_instance_id: &LoopInstanceId,
263        expected_loop: &LoopSnapshot,
264        next_loop: LoopSnapshot,
265        frame_id: &FrameId,
266        initial_frame: FrameSnapshot,
267        ledger_entry: LoopIterationLedgerEntry,
268        expected_run_state: &KernelState,
269        next_run_state: KernelState,
270    ) -> Result<bool, MobStoreError>;
271
272    /// CAS wrapper 7: body frame completion — terminalize frame + loop state update + run state.
273    ///
274    /// # Frame support
275    /// Backends that do not support frame-aware atomic persistence may return
276    /// `Err(MobError::NotYetImplemented(...))`.
277    #[allow(clippy::too_many_arguments)]
278    async fn cas_complete_body_frame(
279        &self,
280        run_id: &RunId,
281        loop_instance_id: &LoopInstanceId,
282        expected_loop: &LoopSnapshot,
283        next_loop: LoopSnapshot,
284        frame_id: &FrameId,
285        expected_frame: &FrameSnapshot,
286        next_frame: FrameSnapshot,
287        expected_run_state: &KernelState,
288        next_run_state: KernelState,
289    ) -> Result<bool, MobStoreError>;
290
291    /// CAS wrapper 8: loop completion — loop state + run state + parent frame update.
292    ///
293    /// # Frame support
294    /// Backends that do not support frame-aware atomic persistence may return
295    /// `Err(MobError::NotYetImplemented(...))`.
296    #[allow(clippy::too_many_arguments)]
297    async fn cas_complete_loop(
298        &self,
299        run_id: &RunId,
300        loop_instance_id: &LoopInstanceId,
301        expected_loop: &LoopSnapshot,
302        next_loop: LoopSnapshot,
303        frame_id: &FrameId,
304        expected_frame: &FrameSnapshot,
305        next_frame: FrameSnapshot,
306        expected_run_state: &KernelState,
307        next_run_state: KernelState,
308    ) -> Result<bool, MobStoreError>;
309}
310
311/// Trait for persisting and querying mob specs.
312#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
313#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
314pub trait MobSpecStore: Send + Sync {
315    /// Put a spec. Returns new revision.
316    async fn put_spec(
317        &self,
318        mob_id: &MobId,
319        definition: &MobDefinition,
320        revision: Option<u64>,
321    ) -> Result<u64, MobStoreError>;
322
323    async fn get_spec(&self, mob_id: &MobId)
324    -> Result<Option<(MobDefinition, u64)>, MobStoreError>;
325    async fn list_specs(&self) -> Result<Vec<MobId>, MobStoreError>;
326    async fn delete_spec(
327        &self,
328        mob_id: &MobId,
329        revision: Option<u64>,
330    ) -> Result<bool, MobStoreError>;
331}