Skip to main content

meerkat_mob/store/
mod.rs

1//! Mob store traits and implementations.
2
3mod in_memory;
4mod realm_profile;
5#[cfg(not(target_arch = "wasm32"))]
6mod sqlite;
7
8pub use in_memory::{
9    InMemoryMobEventStore, InMemoryMobRunStore, InMemoryMobRuntimeMetadataStore,
10    InMemoryMobSpecStore, InMemoryRealmProfileStore,
11};
12pub use realm_profile::{RealmProfileStore, StoredRealmProfile};
13#[cfg(not(target_arch = "wasm32"))]
14pub use sqlite::{
15    SqliteMobEventStore, SqliteMobRunStore, SqliteMobRuntimeMetadataStore, SqliteMobSpecStore,
16    SqliteMobStores, SqliteRealmProfileStore,
17};
18
19use crate::definition::MobDefinition;
20use crate::event::{MemberRef, MobEvent, MobEventKind, NewMobEvent};
21use crate::ids::{
22    AgentIdentity, FlowId, FrameId, Generation, LoopId, LoopInstanceId, MobId, RunId, StepId,
23};
24use crate::machines::mob_machine as mob_dsl;
25use crate::run::flow_run;
26use crate::run::{
27    FailureLedgerEntry, FrameSnapshot, LoopIterationLedgerEntry, LoopSnapshot, MobRun,
28    MobRunStatus, StepLedgerEntry,
29};
30#[cfg(target_arch = "wasm32")]
31use crate::tokio;
32use async_trait::async_trait;
33use chrono::{DateTime, Utc};
34use meerkat_contracts::wire::supervisor_bridge::{BridgeBootstrapToken, BridgeProtocolVersion};
35use serde::{Deserialize, Serialize};
36use tokio::sync::broadcast;
37
38/// Receiver for append-driven structural mob events.
39pub type MobEventReceiver = broadcast::Receiver<MobEvent>;
40
41pub(crate) fn terminal_event_identity(kind: &MobEventKind) -> Option<(&RunId, &FlowId)> {
42    match kind {
43        MobEventKind::FlowCompleted {
44            run_id, flow_id, ..
45        }
46        | MobEventKind::FlowFailed {
47            run_id, flow_id, ..
48        }
49        | MobEventKind::FlowCanceled { run_id, flow_id } => Some((run_id, flow_id)),
50        _ => None,
51    }
52}
53
54/// Frame-aware atomic persistence operation required by the flow/frame store contract.
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
56#[serde(rename_all = "snake_case")]
57pub enum FrameAtomicOperation {
58    CasFrameState,
59    CasGrantNodeSlot,
60    CasCompleteStepAndRecordOutput,
61    CasStartLoop,
62    CasLoopRequestBodyFrame,
63    CasGrantBodyFrameStart,
64    CasCompleteBodyFrame,
65    CasCompleteLoop,
66}
67
68impl std::fmt::Display for FrameAtomicOperation {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        match self {
71            Self::CasFrameState => write!(f, "cas_frame_state"),
72            Self::CasGrantNodeSlot => write!(f, "cas_grant_node_slot"),
73            Self::CasCompleteStepAndRecordOutput => {
74                write!(f, "cas_complete_step_and_record_output")
75            }
76            Self::CasStartLoop => write!(f, "cas_start_loop"),
77            Self::CasLoopRequestBodyFrame => write!(f, "cas_loop_request_body_frame"),
78            Self::CasGrantBodyFrameStart => write!(f, "cas_grant_body_frame_start"),
79            Self::CasCompleteBodyFrame => write!(f, "cas_complete_body_frame"),
80            Self::CasCompleteLoop => write!(f, "cas_complete_loop"),
81        }
82    }
83}
84
85/// Errors from mob storage operations.
86///
87/// Scoped to storage concerns only — callers convert to [`MobError`](crate::MobError)
88/// at the boundary via the `From` impl.
89#[derive(Debug, thiserror::Error)]
90pub enum MobStoreError {
91    /// A write operation failed.
92    #[error("Write failed: {0}")]
93    WriteFailed(String),
94
95    /// A read operation failed.
96    #[error("Read failed: {0}")]
97    ReadFailed(String),
98
99    /// The requested entity was not found.
100    #[error("Not found: {0}")]
101    NotFound(String),
102
103    /// A compare-and-swap precondition was not met.
104    #[error("CAS conflict: {0}")]
105    CasConflict(String),
106
107    /// Spec revision compare-and-swap failed (structured variant for typed conversion).
108    #[error("spec revision conflict for mob {mob_id}: expected {expected:?}, actual {actual}")]
109    SpecRevisionConflict {
110        mob_id: crate::ids::MobId,
111        expected: Option<u64>,
112        actual: u64,
113    },
114
115    /// The backend cannot provide the requested frame-aware atomic operation.
116    #[error("frame-aware atomic persistence unavailable for operation '{operation}'")]
117    FrameAtomicPersistenceUnavailable { operation: FrameAtomicOperation },
118
119    /// Serialization or deserialization failed.
120    #[error("Serialization error: {0}")]
121    Serialization(String),
122
123    /// Internal error.
124    #[error("Internal error: {0}")]
125    Internal(String),
126}
127
128/// Persisted runtime-side supervisor authority for a mob.
129#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
130pub struct SupervisorAuthorityRecord {
131    /// Raw secret bytes for reconstructing the mob-owned supervisor keypair.
132    pub secret_key: [u8; 32],
133    /// Canonical peer id string for the corresponding public key.
134    pub public_peer_id: String,
135    /// Monotonic supervisor epoch for stale-authority rejection.
136    pub epoch: u64,
137    /// Protocol version carried on supervisor commands.
138    pub protocol_version: BridgeProtocolVersion,
139    /// Explicit pending rotation retained after a partial remote rotation.
140    #[serde(default, skip_serializing_if = "Option::is_none")]
141    pub pending_rotation: Option<SupervisorPendingRotationRecord>,
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
145pub struct SupervisorPendingRotationRecord {
146    /// Raw secret bytes for reconstructing the attempted supervisor keypair.
147    pub secret_key: [u8; 32],
148    /// Canonical peer id string for the attempted supervisor public key.
149    pub public_peer_id: String,
150    /// Attempted supervisor epoch.
151    pub epoch: u64,
152    /// Protocol version carried on supervisor commands.
153    pub protocol_version: BridgeProtocolVersion,
154    /// Remote peer ids that already accepted this attempted authority.
155    #[serde(default)]
156    pub accepted_peer_ids: Vec<String>,
157}
158
159impl SupervisorAuthorityRecord {
160    /// Mint a fresh supervisor authority record.
161    pub fn generate(protocol_version: BridgeProtocolVersion) -> Self {
162        let keypair = meerkat_comms::Keypair::generate();
163        Self {
164            secret_key: keypair.secret_bytes(),
165            public_peer_id: keypair.public_key().to_peer_id().as_str(),
166            epoch: 0,
167            protocol_version,
168            pending_rotation: None,
169        }
170    }
171
172    /// Reconstruct the signing keypair for runtime use.
173    pub fn keypair(&self) -> meerkat_comms::Keypair {
174        meerkat_comms::Keypair::from_secret(self.secret_key)
175    }
176
177    pub fn without_pending_rotation(&self) -> Self {
178        let mut record = self.clone();
179        record.pending_rotation = None;
180        record
181    }
182
183    pub fn apply_process_local_pending_rotation(
184        &mut self,
185        pending: SupervisorPendingRotationRecord,
186    ) -> bool {
187        if !pending.accepted_peer_ids.is_empty()
188            && pending.epoch <= self.epoch
189            && pending.public_peer_id != self.public_peer_id
190        {
191            return match self.pending_rotation.as_ref() {
192                Some(durable) if durable.same_attempted_authority(&pending) => {
193                    self.pending_rotation = Some(pending);
194                    true
195                }
196                None => {
197                    self.pending_rotation = Some(pending);
198                    true
199                }
200                Some(_) => false,
201            };
202        }
203        if self.epoch.checked_add(1) != Some(pending.epoch) {
204            return false;
205        }
206        if pending.accepted_peer_ids.is_empty() {
207            return match self.pending_rotation.as_ref() {
208                Some(durable) if durable.same_attempted_authority(&pending) => {
209                    self.pending_rotation = None;
210                    true
211                }
212                None => false,
213                Some(_) => false,
214            };
215        }
216        match self.pending_rotation.as_ref() {
217            Some(durable) if durable.same_attempted_authority(&pending) => {
218                self.pending_rotation = Some(pending);
219                true
220            }
221            None => {
222                self.pending_rotation = Some(pending);
223                true
224            }
225            Some(_) => false,
226        }
227    }
228}
229
230impl SupervisorPendingRotationRecord {
231    pub fn from_authority(
232        authority: &SupervisorAuthorityRecord,
233        accepted_peer_ids: Vec<String>,
234    ) -> Self {
235        Self {
236            secret_key: authority.secret_key,
237            public_peer_id: authority.public_peer_id.clone(),
238            epoch: authority.epoch,
239            protocol_version: authority.protocol_version,
240            accepted_peer_ids,
241        }
242    }
243
244    pub fn authority_record(&self) -> SupervisorAuthorityRecord {
245        SupervisorAuthorityRecord {
246            secret_key: self.secret_key,
247            public_peer_id: self.public_peer_id.clone(),
248            epoch: self.epoch,
249            protocol_version: self.protocol_version,
250            pending_rotation: None,
251        }
252    }
253
254    pub fn same_attempted_authority(&self, other: &Self) -> bool {
255        self.secret_key == other.secret_key
256            && self.public_peer_id == other.public_peer_id
257            && self.epoch == other.epoch
258            && self
259                .protocol_version
260                .same_protocol_as(other.protocol_version)
261    }
262
263    pub fn remove_accepted_peer_ids(&mut self, peer_ids: &[String]) -> bool {
264        let original_len = self.accepted_peer_ids.len();
265        self.accepted_peer_ids
266            .retain(|peer_id| !peer_ids.iter().any(|candidate| candidate == peer_id));
267        self.accepted_peer_ids.len() != original_len
268    }
269}
270
271/// Projection status for legacy external binding metadata.
272#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
273pub enum ExternalBindingOverlayStatus {
274    /// The legacy external binding was normalized to a peer-only member ref.
275    Normalized,
276    /// Normalization failed and the member should surface as broken.
277    Failed { reason: String },
278}
279
280/// Compatibility projection metadata for a legacy external binding.
281///
282/// This record never creates roster membership and is not restart authority for
283/// member material, bridge binding, lifecycle status, or restore failure state.
284/// Resume rebuilds those facts from `MemberSpawned`/`MemberRetired` events and
285/// MobMachine-owned state; overlays remain persisted only for compatibility
286/// projection, diagnostics, and cleanup of older runtimes that wrote them.
287#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
288pub struct ExternalBindingOverlayRecord {
289    /// Stable member identity.
290    pub agent_identity: AgentIdentity,
291    /// Generation the overlay applies to.
292    pub generation: Generation,
293    /// Peer-only runtime binding when normalization succeeds.
294    ///
295    /// Crate-private alongside `MemberRef` itself (finding A7): the pre-0.6
296    /// bridge identity is never surfaced to external callers. Serde still
297    /// carries the value through the persisted overlay record.
298    pub(crate) normalized_member_ref: Option<MemberRef>,
299    /// Optional bootstrap proof for re-establishing supervisor control.
300    pub bootstrap_token: Option<BridgeBootstrapToken>,
301    /// Current normalization status.
302    pub status: ExternalBindingOverlayStatus,
303    /// Last update time for conflict resolution and diagnostics.
304    pub updated_at: DateTime<Utc>,
305}
306
307/// Trait for persisting and querying mob events.
308#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
309#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
310pub trait MobEventStore: Send + Sync {
311    /// Append a new event to the store.
312    async fn append(&self, event: NewMobEvent) -> Result<MobEvent, MobStoreError>;
313
314    /// Append a terminal flow event only when no terminal event for the same
315    /// mob/run/flow has already been persisted.
316    async fn append_terminal_event_if_absent(
317        &self,
318        event: NewMobEvent,
319    ) -> Result<Option<MobEvent>, MobStoreError>;
320
321    /// Append multiple events atomically.
322    ///
323    /// Implementations must ensure all-or-nothing semantics: either every
324    /// event is persisted or none are. No default implementation is provided
325    /// to force implementors to consider atomicity.
326    async fn append_batch(&self, events: Vec<NewMobEvent>) -> Result<Vec<MobEvent>, MobStoreError>;
327
328    /// Poll for events after a given cursor, up to a limit.
329    async fn poll(&self, after_cursor: u64, limit: usize) -> Result<Vec<MobEvent>, MobStoreError>;
330
331    /// Replay all events from the beginning.
332    async fn replay_all(&self) -> Result<Vec<MobEvent>, MobStoreError>;
333
334    /// Return the latest persisted event cursor, or 0 when no events exist.
335    async fn latest_cursor(&self) -> Result<u64, MobStoreError>;
336
337    /// Subscribe to structural events appended through this store after this call.
338    fn subscribe(&self) -> Result<MobEventReceiver, MobStoreError> {
339        Err(MobStoreError::Internal(
340            "mob event store does not support native event subscriptions".to_string(),
341        ))
342    }
343
344    /// Delete all persisted events.
345    async fn clear(&self) -> Result<(), MobStoreError>;
346
347    /// Prune events older than a timestamp. Returns count of deleted events.
348    async fn prune(&self, _older_than: DateTime<Utc>) -> Result<u64, MobStoreError> {
349        Ok(0)
350    }
351}
352
353/// Trait for persisting authoritative runtime-side mob metadata.
354#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
355#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
356pub trait MobRuntimeMetadataStore: Send + Sync {
357    /// Load the mob-owned supervisor authority record.
358    async fn load_supervisor_authority(
359        &self,
360        mob_id: &MobId,
361    ) -> Result<Option<SupervisorAuthorityRecord>, MobStoreError>;
362
363    /// Upsert the mob-owned supervisor authority record.
364    async fn put_supervisor_authority(
365        &self,
366        mob_id: &MobId,
367        record: &SupervisorAuthorityRecord,
368    ) -> Result<(), MobStoreError>;
369
370    /// Compare-and-put the mob-owned supervisor authority record.
371    ///
372    /// Returns `true` when the persisted record exactly matched `expected`
373    /// and was replaced by `record`, `false` when another writer changed or
374    /// removed the record first.
375    async fn compare_and_put_supervisor_authority(
376        &self,
377        mob_id: &MobId,
378        expected: &SupervisorAuthorityRecord,
379        record: &SupervisorAuthorityRecord,
380    ) -> Result<bool, MobStoreError>;
381
382    /// Insert the mob-owned supervisor authority record if it is missing.
383    ///
384    /// Returns `true` when the caller won initialization, `false` when an
385    /// existing record already owned the key.
386    async fn put_supervisor_authority_if_absent(
387        &self,
388        mob_id: &MobId,
389        record: &SupervisorAuthorityRecord,
390    ) -> Result<bool, MobStoreError>;
391
392    /// Delete the mob-owned supervisor authority record.
393    async fn delete_supervisor_authority(&self, mob_id: &MobId) -> Result<(), MobStoreError>;
394
395    /// List all external binding compatibility projection records for a mob.
396    async fn list_external_binding_overlays(
397        &self,
398        mob_id: &MobId,
399    ) -> Result<Vec<ExternalBindingOverlayRecord>, MobStoreError>;
400
401    /// Insert a new overlay if one does not already exist for the identity/generation key.
402    ///
403    /// Returns `true` when the record was inserted, `false` when an existing
404    /// overlay already owns the key.
405    async fn put_external_binding_overlay_if_absent(
406        &self,
407        mob_id: &MobId,
408        record: &ExternalBindingOverlayRecord,
409    ) -> Result<bool, MobStoreError>;
410
411    /// Upsert an overlay for the identity/generation key.
412    async fn upsert_external_binding_overlay(
413        &self,
414        mob_id: &MobId,
415        record: &ExternalBindingOverlayRecord,
416    ) -> Result<(), MobStoreError>;
417
418    /// Delete the overlay for a specific identity/generation key.
419    async fn delete_external_binding_overlay(
420        &self,
421        mob_id: &MobId,
422        agent_identity: &AgentIdentity,
423        generation: Generation,
424    ) -> Result<(), MobStoreError>;
425
426    /// Delete all overlays for the given mob.
427    async fn delete_external_binding_overlays(&self, mob_id: &MobId) -> Result<(), MobStoreError>;
428}
429
430/// Trait for persisting and querying flow runs.
431#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
432#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
433pub trait MobRunStore: Send + Sync {
434    async fn create_run(&self, run: MobRun) -> Result<(), MobStoreError>;
435    async fn get_run(&self, run_id: &RunId) -> Result<Option<MobRun>, MobStoreError>;
436    async fn list_runs(
437        &self,
438        mob_id: &MobId,
439        flow_id: Option<&FlowId>,
440    ) -> Result<Vec<MobRun>, MobStoreError>;
441    async fn cas_run_status(
442        &self,
443        run_id: &RunId,
444        expected: MobRunStatus,
445        next: MobRunStatus,
446    ) -> Result<bool, MobStoreError>;
447    async fn cas_flow_state(
448        &self,
449        run_id: &RunId,
450        expected: &flow_run::State,
451        next: &flow_run::State,
452    ) -> Result<bool, MobStoreError>;
453    async fn cas_flow_state_with_authority(
454        &self,
455        run_id: &RunId,
456        expected: &flow_run::State,
457        next: &flow_run::State,
458        authority_inputs: Vec<mob_dsl::MobMachineInput>,
459    ) -> Result<bool, MobStoreError>;
460    async fn cas_run_snapshot(
461        &self,
462        run_id: &RunId,
463        expected_status: MobRunStatus,
464        expected_flow_state: &flow_run::State,
465        next_status: MobRunStatus,
466        next_flow_state: &flow_run::State,
467    ) -> Result<bool, MobStoreError>;
468    async fn cas_run_snapshot_with_authority(
469        &self,
470        run_id: &RunId,
471        expected_status: MobRunStatus,
472        expected_flow_state: &flow_run::State,
473        next_status: MobRunStatus,
474        next_flow_state: &flow_run::State,
475        authority_inputs: Vec<mob_dsl::MobMachineInput>,
476    ) -> Result<bool, MobStoreError>;
477    async fn append_step_entry(
478        &self,
479        run_id: &RunId,
480        entry: StepLedgerEntry,
481    ) -> Result<(), MobStoreError>;
482    async fn append_step_entry_if_absent(
483        &self,
484        run_id: &RunId,
485        entry: StepLedgerEntry,
486    ) -> Result<bool, MobStoreError>;
487    async fn put_step_output(
488        &self,
489        run_id: &RunId,
490        step_id: &StepId,
491        output: serde_json::Value,
492    ) -> Result<(), MobStoreError>;
493    async fn append_failure_entry(
494        &self,
495        run_id: &RunId,
496        entry: FailureLedgerEntry,
497    ) -> Result<(), MobStoreError>;
498
499    /// Upsert a loop snapshot. Creates or overwrites the entry for `loop_instance_id`
500    /// in `run.loops` and optionally records a `LoopIterationLedgerEntry`.
501    ///
502    /// Used by the sequential `FlowFrameEngine` to persist loop state so that
503    /// `reconcile_run_state` can reconstruct in-progress loops after a crash.
504    ///
505    /// Implementations must treat `ledger_entry` as idempotent by logical
506    /// iteration identity. Replaying the same `(loop_instance_id, iteration, frame_id)`
507    /// on resume must not append a duplicate row.
508    async fn upsert_loop_snapshot(
509        &self,
510        run_id: &RunId,
511        loop_instance_id: &LoopInstanceId,
512        snapshot: LoopSnapshot,
513        ledger_entry: Option<LoopIterationLedgerEntry>,
514    ) -> Result<(), MobStoreError>;
515
516    // Phase 3: CAS wrappers for frame and loop state.
517
518    /// CAS wrapper 1: frame state update.
519    ///
520    /// If `expected` is `None`, this is an insert (frame must not yet exist).
521    /// If `expected` is `Some(snapshot)`, the current frame state must match.
522    /// Returns `Ok(true)` on success, `Ok(false)` on mismatch.
523    ///
524    /// Implementations must atomically compare and replace the frame snapshot.
525    /// Backends that cannot make that guarantee must return
526    /// [`MobStoreError::FrameAtomicPersistenceUnavailable`] for this operation.
527    async fn cas_frame_state(
528        &self,
529        run_id: &RunId,
530        frame_id: &FrameId,
531        expected: Option<&FrameSnapshot>,
532        next: FrameSnapshot,
533    ) -> Result<bool, MobStoreError>;
534    async fn cas_frame_state_with_authority(
535        &self,
536        run_id: &RunId,
537        frame_id: &FrameId,
538        expected: Option<&FrameSnapshot>,
539        next: FrameSnapshot,
540        authority_inputs: Vec<mob_dsl::MobMachineInput>,
541    ) -> Result<bool, MobStoreError>;
542
543    /// CAS wrapper 2: grant node slot — atomically update run flow state + frame state.
544    ///
545    /// Implementations must commit the run-state and frame-state changes in one
546    /// atomic unit. Backends that cannot make that guarantee must return
547    /// [`MobStoreError::FrameAtomicPersistenceUnavailable`].
548    async fn cas_grant_node_slot(
549        &self,
550        run_id: &RunId,
551        expected_run_state: &flow_run::State,
552        next_run_state: flow_run::State,
553        frame_id: &FrameId,
554        expected_frame: &FrameSnapshot,
555        next_frame: FrameSnapshot,
556    ) -> Result<bool, MobStoreError>;
557    #[allow(clippy::too_many_arguments)]
558    async fn cas_grant_node_slot_with_authority(
559        &self,
560        run_id: &RunId,
561        expected_run_state: &flow_run::State,
562        next_run_state: flow_run::State,
563        frame_id: &FrameId,
564        expected_frame: &FrameSnapshot,
565        next_frame: FrameSnapshot,
566        authority_inputs: Vec<mob_dsl::MobMachineInput>,
567    ) -> Result<bool, MobStoreError>;
568
569    /// CAS wrapper 3: complete step — update frame state and record step output.
570    ///
571    /// When `loop_context` is `None`, the output is stored in `root_step_outputs`.
572    /// When `loop_context` is `Some((loop_id, iteration))`, the output is stored
573    /// in `loop_iteration_outputs[loop_id][iteration]`.
574    ///
575    /// Implementations must commit the frame-state update and step-output write
576    /// in one atomic unit. Backends that cannot make that guarantee must return
577    /// [`MobStoreError::FrameAtomicPersistenceUnavailable`].
578    #[allow(clippy::too_many_arguments)]
579    async fn cas_complete_step_and_record_output(
580        &self,
581        run_id: &RunId,
582        frame_id: &FrameId,
583        expected_frame: &FrameSnapshot,
584        next_frame: FrameSnapshot,
585        step_output_key: String,
586        step_output: serde_json::Value,
587        loop_context: Option<(&LoopId, u64)>,
588    ) -> Result<bool, MobStoreError>;
589    #[allow(clippy::too_many_arguments)]
590    async fn cas_complete_step_and_record_output_with_authority(
591        &self,
592        run_id: &RunId,
593        frame_id: &FrameId,
594        expected_frame: &FrameSnapshot,
595        next_frame: FrameSnapshot,
596        step_output_key: String,
597        step_output: serde_json::Value,
598        loop_context: Option<(&LoopId, u64)>,
599        authority_inputs: Vec<mob_dsl::MobMachineInput>,
600    ) -> Result<bool, MobStoreError>;
601
602    /// CAS wrapper 4: start loop — register loop + update run state + parent frame.
603    ///
604    /// Implementations must register the loop and commit the run/frame updates
605    /// in one atomic unit. Backends that cannot make that guarantee must return
606    /// [`MobStoreError::FrameAtomicPersistenceUnavailable`].
607    #[allow(clippy::too_many_arguments)]
608    async fn cas_start_loop(
609        &self,
610        run_id: &RunId,
611        loop_instance_id: &LoopInstanceId,
612        expected_run_state: &flow_run::State,
613        next_run_state: flow_run::State,
614        frame_id: &FrameId,
615        expected_frame: &FrameSnapshot,
616        next_frame: FrameSnapshot,
617        initial_loop: LoopSnapshot,
618    ) -> Result<bool, MobStoreError>;
619    #[allow(clippy::too_many_arguments)]
620    async fn cas_start_loop_with_authority(
621        &self,
622        run_id: &RunId,
623        loop_instance_id: &LoopInstanceId,
624        expected_run_state: &flow_run::State,
625        next_run_state: flow_run::State,
626        frame_id: &FrameId,
627        expected_frame: &FrameSnapshot,
628        next_frame: FrameSnapshot,
629        initial_loop: LoopSnapshot,
630        authority_inputs: Vec<mob_dsl::MobMachineInput>,
631    ) -> Result<bool, MobStoreError>;
632
633    /// CAS wrapper 5: register pending body frame — loop transition + run state update.
634    ///
635    /// Implementations must commit the loop transition and run-state update in
636    /// one atomic unit. Backends that cannot make that guarantee must return
637    /// [`MobStoreError::FrameAtomicPersistenceUnavailable`].
638    async fn cas_loop_request_body_frame(
639        &self,
640        run_id: &RunId,
641        loop_instance_id: &LoopInstanceId,
642        expected_loop: &LoopSnapshot,
643        next_loop: LoopSnapshot,
644        expected_run_state: &flow_run::State,
645        next_run_state: flow_run::State,
646    ) -> Result<bool, MobStoreError>;
647    #[allow(clippy::too_many_arguments)]
648    async fn cas_loop_request_body_frame_with_authority(
649        &self,
650        run_id: &RunId,
651        loop_instance_id: &LoopInstanceId,
652        expected_loop: &LoopSnapshot,
653        next_loop: LoopSnapshot,
654        expected_run_state: &flow_run::State,
655        next_run_state: flow_run::State,
656        authority_inputs: Vec<mob_dsl::MobMachineInput>,
657    ) -> Result<bool, MobStoreError>;
658
659    /// CAS wrapper 6: body frame start — loop transition + register new frame + run state update.
660    ///
661    /// Implementations must commit the loop transition, frame registration, and
662    /// run-state update in one atomic unit. Backends that cannot make that
663    /// guarantee must return [`MobStoreError::FrameAtomicPersistenceUnavailable`].
664    #[allow(clippy::too_many_arguments)]
665    async fn cas_grant_body_frame_start(
666        &self,
667        run_id: &RunId,
668        loop_instance_id: &LoopInstanceId,
669        expected_loop: &LoopSnapshot,
670        next_loop: LoopSnapshot,
671        frame_id: &FrameId,
672        initial_frame: FrameSnapshot,
673        ledger_entry: LoopIterationLedgerEntry,
674        expected_run_state: &flow_run::State,
675        next_run_state: flow_run::State,
676    ) -> Result<bool, MobStoreError>;
677    #[allow(clippy::too_many_arguments)]
678    async fn cas_grant_body_frame_start_with_authority(
679        &self,
680        run_id: &RunId,
681        loop_instance_id: &LoopInstanceId,
682        expected_loop: &LoopSnapshot,
683        next_loop: LoopSnapshot,
684        frame_id: &FrameId,
685        initial_frame: FrameSnapshot,
686        ledger_entry: LoopIterationLedgerEntry,
687        expected_run_state: &flow_run::State,
688        next_run_state: flow_run::State,
689        authority_inputs: Vec<mob_dsl::MobMachineInput>,
690    ) -> Result<bool, MobStoreError>;
691
692    /// CAS wrapper 7: body frame completion — terminalize frame + loop state update + run state.
693    ///
694    /// Implementations must commit the frame terminalization, loop update, and
695    /// run-state update in one atomic unit. Backends that cannot make that
696    /// guarantee must return [`MobStoreError::FrameAtomicPersistenceUnavailable`].
697    #[allow(clippy::too_many_arguments)]
698    async fn cas_complete_body_frame(
699        &self,
700        run_id: &RunId,
701        loop_instance_id: &LoopInstanceId,
702        expected_loop: &LoopSnapshot,
703        next_loop: LoopSnapshot,
704        frame_id: &FrameId,
705        expected_frame: &FrameSnapshot,
706        next_frame: FrameSnapshot,
707        expected_run_state: &flow_run::State,
708        next_run_state: flow_run::State,
709    ) -> Result<bool, MobStoreError>;
710    #[allow(clippy::too_many_arguments)]
711    async fn cas_complete_body_frame_with_authority(
712        &self,
713        run_id: &RunId,
714        loop_instance_id: &LoopInstanceId,
715        expected_loop: &LoopSnapshot,
716        next_loop: LoopSnapshot,
717        frame_id: &FrameId,
718        expected_frame: &FrameSnapshot,
719        next_frame: FrameSnapshot,
720        expected_run_state: &flow_run::State,
721        next_run_state: flow_run::State,
722        authority_inputs: Vec<mob_dsl::MobMachineInput>,
723    ) -> Result<bool, MobStoreError>;
724
725    /// CAS wrapper 8: loop completion — loop state + run state + parent frame update.
726    ///
727    /// Implementations must commit the loop state, run state, and parent-frame
728    /// update in one atomic unit. Backends that cannot make that guarantee must
729    /// return [`MobStoreError::FrameAtomicPersistenceUnavailable`].
730    #[allow(clippy::too_many_arguments)]
731    async fn cas_complete_loop(
732        &self,
733        run_id: &RunId,
734        loop_instance_id: &LoopInstanceId,
735        expected_loop: &LoopSnapshot,
736        next_loop: LoopSnapshot,
737        frame_id: &FrameId,
738        expected_frame: &FrameSnapshot,
739        next_frame: FrameSnapshot,
740        expected_run_state: &flow_run::State,
741        next_run_state: flow_run::State,
742    ) -> Result<bool, MobStoreError>;
743    #[allow(clippy::too_many_arguments)]
744    async fn cas_complete_loop_with_authority(
745        &self,
746        run_id: &RunId,
747        loop_instance_id: &LoopInstanceId,
748        expected_loop: &LoopSnapshot,
749        next_loop: LoopSnapshot,
750        frame_id: &FrameId,
751        expected_frame: &FrameSnapshot,
752        next_frame: FrameSnapshot,
753        expected_run_state: &flow_run::State,
754        next_run_state: flow_run::State,
755        authority_inputs: Vec<mob_dsl::MobMachineInput>,
756    ) -> Result<bool, MobStoreError>;
757}
758
759/// Trait for persisting and querying mob specs.
760#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
761#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
762pub trait MobSpecStore: Send + Sync {
763    /// Put a spec. Returns new revision.
764    async fn put_spec(
765        &self,
766        mob_id: &MobId,
767        definition: &MobDefinition,
768        revision: Option<u64>,
769    ) -> Result<u64, MobStoreError>;
770
771    async fn get_spec(&self, mob_id: &MobId)
772    -> Result<Option<(MobDefinition, u64)>, MobStoreError>;
773    async fn list_specs(&self) -> Result<Vec<MobId>, MobStoreError>;
774    async fn delete_spec(
775        &self,
776        mob_id: &MobId,
777        revision: Option<u64>,
778    ) -> Result<bool, MobStoreError>;
779}