1mod in_memory;
4mod realm_profile;
5#[cfg(not(target_arch = "wasm32"))]
6mod sqlite;
7
8pub use in_memory::{
9 InMemoryMobEventStore, InMemoryMobRunStore, InMemoryMobRuntimeMetadataStore,
10 InMemoryMobSpecStore, InMemoryRealmProfileStore,
11};
12pub use realm_profile::{RealmProfileStore, StoredRealmProfile};
13#[cfg(not(target_arch = "wasm32"))]
14pub use sqlite::{
15 SqliteMobEventStore, SqliteMobRunStore, SqliteMobRuntimeMetadataStore, SqliteMobSpecStore,
16 SqliteMobStores, SqliteRealmProfileStore,
17};
18
19use crate::definition::MobDefinition;
20use crate::event::{MemberRef, MobEvent, MobEventKind, NewMobEvent};
21use crate::ids::{
22 AgentIdentity, FlowId, FrameId, Generation, LoopId, LoopInstanceId, MobId, RunId, StepId,
23};
24use crate::machines::mob_machine as mob_dsl;
25use crate::run::flow_run;
26use crate::run::{
27 FailureLedgerEntry, FrameSnapshot, LoopIterationLedgerEntry, LoopSnapshot, MobRun,
28 MobRunStatus, StepLedgerEntry,
29};
30#[cfg(target_arch = "wasm32")]
31use crate::tokio;
32use async_trait::async_trait;
33use chrono::{DateTime, Utc};
34use meerkat_contracts::wire::supervisor_bridge::{BridgeBootstrapToken, BridgeProtocolVersion};
35use serde::{Deserialize, Serialize};
36use tokio::sync::broadcast;
37
38pub type MobEventReceiver = broadcast::Receiver<MobEvent>;
40
41pub(crate) fn terminal_event_identity(kind: &MobEventKind) -> Option<(&RunId, &FlowId)> {
42 match kind {
43 MobEventKind::FlowCompleted {
44 run_id, flow_id, ..
45 }
46 | MobEventKind::FlowFailed {
47 run_id, flow_id, ..
48 }
49 | MobEventKind::FlowCanceled { run_id, flow_id } => Some((run_id, flow_id)),
50 _ => None,
51 }
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
56#[serde(rename_all = "snake_case")]
57pub enum FrameAtomicOperation {
58 CasFrameState,
59 CasGrantNodeSlot,
60 CasCompleteStepAndRecordOutput,
61 CasStartLoop,
62 CasLoopRequestBodyFrame,
63 CasGrantBodyFrameStart,
64 CasCompleteBodyFrame,
65 CasCompleteLoop,
66}
67
68impl std::fmt::Display for FrameAtomicOperation {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 match self {
71 Self::CasFrameState => write!(f, "cas_frame_state"),
72 Self::CasGrantNodeSlot => write!(f, "cas_grant_node_slot"),
73 Self::CasCompleteStepAndRecordOutput => {
74 write!(f, "cas_complete_step_and_record_output")
75 }
76 Self::CasStartLoop => write!(f, "cas_start_loop"),
77 Self::CasLoopRequestBodyFrame => write!(f, "cas_loop_request_body_frame"),
78 Self::CasGrantBodyFrameStart => write!(f, "cas_grant_body_frame_start"),
79 Self::CasCompleteBodyFrame => write!(f, "cas_complete_body_frame"),
80 Self::CasCompleteLoop => write!(f, "cas_complete_loop"),
81 }
82 }
83}
84
85#[derive(Debug, thiserror::Error)]
90pub enum MobStoreError {
91 #[error("Write failed: {0}")]
93 WriteFailed(String),
94
95 #[error("Read failed: {0}")]
97 ReadFailed(String),
98
99 #[error("Not found: {0}")]
101 NotFound(String),
102
103 #[error("CAS conflict: {0}")]
105 CasConflict(String),
106
107 #[error("spec revision conflict for mob {mob_id}: expected {expected:?}, actual {actual}")]
109 SpecRevisionConflict {
110 mob_id: crate::ids::MobId,
111 expected: Option<u64>,
112 actual: u64,
113 },
114
115 #[error("frame-aware atomic persistence unavailable for operation '{operation}'")]
117 FrameAtomicPersistenceUnavailable { operation: FrameAtomicOperation },
118
119 #[error("Serialization error: {0}")]
121 Serialization(String),
122
123 #[error("Internal error: {0}")]
125 Internal(String),
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
130pub struct SupervisorAuthorityRecord {
131 pub secret_key: [u8; 32],
133 pub public_peer_id: String,
135 pub epoch: u64,
137 pub protocol_version: BridgeProtocolVersion,
139 #[serde(default, skip_serializing_if = "Option::is_none")]
141 pub pending_rotation: Option<SupervisorPendingRotationRecord>,
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
145pub struct SupervisorPendingRotationRecord {
146 pub secret_key: [u8; 32],
148 pub public_peer_id: String,
150 pub epoch: u64,
152 pub protocol_version: BridgeProtocolVersion,
154 #[serde(default)]
156 pub accepted_peer_ids: Vec<String>,
157}
158
159impl SupervisorAuthorityRecord {
160 pub fn generate(protocol_version: BridgeProtocolVersion) -> Self {
162 let keypair = meerkat_comms::Keypair::generate();
163 Self {
164 secret_key: keypair.secret_bytes(),
165 public_peer_id: keypair.public_key().to_peer_id().as_str(),
166 epoch: 0,
167 protocol_version,
168 pending_rotation: None,
169 }
170 }
171
172 pub fn keypair(&self) -> meerkat_comms::Keypair {
174 meerkat_comms::Keypair::from_secret(self.secret_key)
175 }
176
177 pub fn without_pending_rotation(&self) -> Self {
178 let mut record = self.clone();
179 record.pending_rotation = None;
180 record
181 }
182
183 pub fn apply_process_local_pending_rotation(
184 &mut self,
185 pending: SupervisorPendingRotationRecord,
186 ) -> bool {
187 if !pending.accepted_peer_ids.is_empty()
188 && pending.epoch <= self.epoch
189 && pending.public_peer_id != self.public_peer_id
190 {
191 return match self.pending_rotation.as_ref() {
192 Some(durable) if durable.same_attempted_authority(&pending) => {
193 self.pending_rotation = Some(pending);
194 true
195 }
196 None => {
197 self.pending_rotation = Some(pending);
198 true
199 }
200 Some(_) => false,
201 };
202 }
203 if self.epoch.checked_add(1) != Some(pending.epoch) {
204 return false;
205 }
206 if pending.accepted_peer_ids.is_empty() {
207 return match self.pending_rotation.as_ref() {
208 Some(durable) if durable.same_attempted_authority(&pending) => {
209 self.pending_rotation = None;
210 true
211 }
212 None => false,
213 Some(_) => false,
214 };
215 }
216 match self.pending_rotation.as_ref() {
217 Some(durable) if durable.same_attempted_authority(&pending) => {
218 self.pending_rotation = Some(pending);
219 true
220 }
221 None => {
222 self.pending_rotation = Some(pending);
223 true
224 }
225 Some(_) => false,
226 }
227 }
228}
229
230impl SupervisorPendingRotationRecord {
231 pub fn from_authority(
232 authority: &SupervisorAuthorityRecord,
233 accepted_peer_ids: Vec<String>,
234 ) -> Self {
235 Self {
236 secret_key: authority.secret_key,
237 public_peer_id: authority.public_peer_id.clone(),
238 epoch: authority.epoch,
239 protocol_version: authority.protocol_version,
240 accepted_peer_ids,
241 }
242 }
243
244 pub fn authority_record(&self) -> SupervisorAuthorityRecord {
245 SupervisorAuthorityRecord {
246 secret_key: self.secret_key,
247 public_peer_id: self.public_peer_id.clone(),
248 epoch: self.epoch,
249 protocol_version: self.protocol_version,
250 pending_rotation: None,
251 }
252 }
253
254 pub fn same_attempted_authority(&self, other: &Self) -> bool {
255 self.secret_key == other.secret_key
256 && self.public_peer_id == other.public_peer_id
257 && self.epoch == other.epoch
258 && self
259 .protocol_version
260 .same_protocol_as(other.protocol_version)
261 }
262
263 pub fn remove_accepted_peer_ids(&mut self, peer_ids: &[String]) -> bool {
264 let original_len = self.accepted_peer_ids.len();
265 self.accepted_peer_ids
266 .retain(|peer_id| !peer_ids.iter().any(|candidate| candidate == peer_id));
267 self.accepted_peer_ids.len() != original_len
268 }
269}
270
271#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
273pub enum ExternalBindingOverlayStatus {
274 Normalized,
276 Failed { reason: String },
278}
279
280#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
288pub struct ExternalBindingOverlayRecord {
289 pub agent_identity: AgentIdentity,
291 pub generation: Generation,
293 pub(crate) normalized_member_ref: Option<MemberRef>,
299 pub bootstrap_token: Option<BridgeBootstrapToken>,
301 pub status: ExternalBindingOverlayStatus,
303 pub updated_at: DateTime<Utc>,
305}
306
307#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
309#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
310pub trait MobEventStore: Send + Sync {
311 async fn append(&self, event: NewMobEvent) -> Result<MobEvent, MobStoreError>;
313
314 async fn append_terminal_event_if_absent(
317 &self,
318 event: NewMobEvent,
319 ) -> Result<Option<MobEvent>, MobStoreError>;
320
321 async fn append_batch(&self, events: Vec<NewMobEvent>) -> Result<Vec<MobEvent>, MobStoreError>;
327
328 async fn poll(&self, after_cursor: u64, limit: usize) -> Result<Vec<MobEvent>, MobStoreError>;
330
331 async fn replay_all(&self) -> Result<Vec<MobEvent>, MobStoreError>;
333
334 async fn latest_cursor(&self) -> Result<u64, MobStoreError>;
336
337 fn subscribe(&self) -> Result<MobEventReceiver, MobStoreError> {
339 Err(MobStoreError::Internal(
340 "mob event store does not support native event subscriptions".to_string(),
341 ))
342 }
343
344 async fn clear(&self) -> Result<(), MobStoreError>;
346
347 async fn prune(&self, _older_than: DateTime<Utc>) -> Result<u64, MobStoreError> {
349 Ok(0)
350 }
351}
352
353#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
355#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
356pub trait MobRuntimeMetadataStore: Send + Sync {
357 async fn load_supervisor_authority(
359 &self,
360 mob_id: &MobId,
361 ) -> Result<Option<SupervisorAuthorityRecord>, MobStoreError>;
362
363 async fn put_supervisor_authority(
365 &self,
366 mob_id: &MobId,
367 record: &SupervisorAuthorityRecord,
368 ) -> Result<(), MobStoreError>;
369
370 async fn compare_and_put_supervisor_authority(
376 &self,
377 mob_id: &MobId,
378 expected: &SupervisorAuthorityRecord,
379 record: &SupervisorAuthorityRecord,
380 ) -> Result<bool, MobStoreError>;
381
382 async fn put_supervisor_authority_if_absent(
387 &self,
388 mob_id: &MobId,
389 record: &SupervisorAuthorityRecord,
390 ) -> Result<bool, MobStoreError>;
391
392 async fn delete_supervisor_authority(&self, mob_id: &MobId) -> Result<(), MobStoreError>;
394
395 async fn list_external_binding_overlays(
397 &self,
398 mob_id: &MobId,
399 ) -> Result<Vec<ExternalBindingOverlayRecord>, MobStoreError>;
400
401 async fn put_external_binding_overlay_if_absent(
406 &self,
407 mob_id: &MobId,
408 record: &ExternalBindingOverlayRecord,
409 ) -> Result<bool, MobStoreError>;
410
411 async fn upsert_external_binding_overlay(
413 &self,
414 mob_id: &MobId,
415 record: &ExternalBindingOverlayRecord,
416 ) -> Result<(), MobStoreError>;
417
418 async fn delete_external_binding_overlay(
420 &self,
421 mob_id: &MobId,
422 agent_identity: &AgentIdentity,
423 generation: Generation,
424 ) -> Result<(), MobStoreError>;
425
426 async fn delete_external_binding_overlays(&self, mob_id: &MobId) -> Result<(), MobStoreError>;
428}
429
430#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
432#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
433pub trait MobRunStore: Send + Sync {
434 async fn create_run(&self, run: MobRun) -> Result<(), MobStoreError>;
435 async fn get_run(&self, run_id: &RunId) -> Result<Option<MobRun>, MobStoreError>;
436 async fn list_runs(
437 &self,
438 mob_id: &MobId,
439 flow_id: Option<&FlowId>,
440 ) -> Result<Vec<MobRun>, MobStoreError>;
441 async fn cas_run_status(
442 &self,
443 run_id: &RunId,
444 expected: MobRunStatus,
445 next: MobRunStatus,
446 ) -> Result<bool, MobStoreError>;
447 async fn cas_flow_state(
448 &self,
449 run_id: &RunId,
450 expected: &flow_run::State,
451 next: &flow_run::State,
452 ) -> Result<bool, MobStoreError>;
453 async fn cas_flow_state_with_authority(
454 &self,
455 run_id: &RunId,
456 expected: &flow_run::State,
457 next: &flow_run::State,
458 authority_inputs: Vec<mob_dsl::MobMachineInput>,
459 ) -> Result<bool, MobStoreError>;
460 async fn cas_run_snapshot(
461 &self,
462 run_id: &RunId,
463 expected_status: MobRunStatus,
464 expected_flow_state: &flow_run::State,
465 next_status: MobRunStatus,
466 next_flow_state: &flow_run::State,
467 ) -> Result<bool, MobStoreError>;
468 async fn cas_run_snapshot_with_authority(
469 &self,
470 run_id: &RunId,
471 expected_status: MobRunStatus,
472 expected_flow_state: &flow_run::State,
473 next_status: MobRunStatus,
474 next_flow_state: &flow_run::State,
475 authority_inputs: Vec<mob_dsl::MobMachineInput>,
476 ) -> Result<bool, MobStoreError>;
477 async fn append_step_entry(
478 &self,
479 run_id: &RunId,
480 entry: StepLedgerEntry,
481 ) -> Result<(), MobStoreError>;
482 async fn append_step_entry_if_absent(
483 &self,
484 run_id: &RunId,
485 entry: StepLedgerEntry,
486 ) -> Result<bool, MobStoreError>;
487 async fn put_step_output(
488 &self,
489 run_id: &RunId,
490 step_id: &StepId,
491 output: serde_json::Value,
492 ) -> Result<(), MobStoreError>;
493 async fn append_failure_entry(
494 &self,
495 run_id: &RunId,
496 entry: FailureLedgerEntry,
497 ) -> Result<(), MobStoreError>;
498
499 async fn upsert_loop_snapshot(
509 &self,
510 run_id: &RunId,
511 loop_instance_id: &LoopInstanceId,
512 snapshot: LoopSnapshot,
513 ledger_entry: Option<LoopIterationLedgerEntry>,
514 ) -> Result<(), MobStoreError>;
515
516 async fn cas_frame_state(
528 &self,
529 run_id: &RunId,
530 frame_id: &FrameId,
531 expected: Option<&FrameSnapshot>,
532 next: FrameSnapshot,
533 ) -> Result<bool, MobStoreError>;
534 async fn cas_frame_state_with_authority(
535 &self,
536 run_id: &RunId,
537 frame_id: &FrameId,
538 expected: Option<&FrameSnapshot>,
539 next: FrameSnapshot,
540 authority_inputs: Vec<mob_dsl::MobMachineInput>,
541 ) -> Result<bool, MobStoreError>;
542
543 async fn cas_grant_node_slot(
549 &self,
550 run_id: &RunId,
551 expected_run_state: &flow_run::State,
552 next_run_state: flow_run::State,
553 frame_id: &FrameId,
554 expected_frame: &FrameSnapshot,
555 next_frame: FrameSnapshot,
556 ) -> Result<bool, MobStoreError>;
557 #[allow(clippy::too_many_arguments)]
558 async fn cas_grant_node_slot_with_authority(
559 &self,
560 run_id: &RunId,
561 expected_run_state: &flow_run::State,
562 next_run_state: flow_run::State,
563 frame_id: &FrameId,
564 expected_frame: &FrameSnapshot,
565 next_frame: FrameSnapshot,
566 authority_inputs: Vec<mob_dsl::MobMachineInput>,
567 ) -> Result<bool, MobStoreError>;
568
569 #[allow(clippy::too_many_arguments)]
579 async fn cas_complete_step_and_record_output(
580 &self,
581 run_id: &RunId,
582 frame_id: &FrameId,
583 expected_frame: &FrameSnapshot,
584 next_frame: FrameSnapshot,
585 step_output_key: String,
586 step_output: serde_json::Value,
587 loop_context: Option<(&LoopId, u64)>,
588 ) -> Result<bool, MobStoreError>;
589 #[allow(clippy::too_many_arguments)]
590 async fn cas_complete_step_and_record_output_with_authority(
591 &self,
592 run_id: &RunId,
593 frame_id: &FrameId,
594 expected_frame: &FrameSnapshot,
595 next_frame: FrameSnapshot,
596 step_output_key: String,
597 step_output: serde_json::Value,
598 loop_context: Option<(&LoopId, u64)>,
599 authority_inputs: Vec<mob_dsl::MobMachineInput>,
600 ) -> Result<bool, MobStoreError>;
601
602 #[allow(clippy::too_many_arguments)]
608 async fn cas_start_loop(
609 &self,
610 run_id: &RunId,
611 loop_instance_id: &LoopInstanceId,
612 expected_run_state: &flow_run::State,
613 next_run_state: flow_run::State,
614 frame_id: &FrameId,
615 expected_frame: &FrameSnapshot,
616 next_frame: FrameSnapshot,
617 initial_loop: LoopSnapshot,
618 ) -> Result<bool, MobStoreError>;
619 #[allow(clippy::too_many_arguments)]
620 async fn cas_start_loop_with_authority(
621 &self,
622 run_id: &RunId,
623 loop_instance_id: &LoopInstanceId,
624 expected_run_state: &flow_run::State,
625 next_run_state: flow_run::State,
626 frame_id: &FrameId,
627 expected_frame: &FrameSnapshot,
628 next_frame: FrameSnapshot,
629 initial_loop: LoopSnapshot,
630 authority_inputs: Vec<mob_dsl::MobMachineInput>,
631 ) -> Result<bool, MobStoreError>;
632
633 async fn cas_loop_request_body_frame(
639 &self,
640 run_id: &RunId,
641 loop_instance_id: &LoopInstanceId,
642 expected_loop: &LoopSnapshot,
643 next_loop: LoopSnapshot,
644 expected_run_state: &flow_run::State,
645 next_run_state: flow_run::State,
646 ) -> Result<bool, MobStoreError>;
647 #[allow(clippy::too_many_arguments)]
648 async fn cas_loop_request_body_frame_with_authority(
649 &self,
650 run_id: &RunId,
651 loop_instance_id: &LoopInstanceId,
652 expected_loop: &LoopSnapshot,
653 next_loop: LoopSnapshot,
654 expected_run_state: &flow_run::State,
655 next_run_state: flow_run::State,
656 authority_inputs: Vec<mob_dsl::MobMachineInput>,
657 ) -> Result<bool, MobStoreError>;
658
659 #[allow(clippy::too_many_arguments)]
665 async fn cas_grant_body_frame_start(
666 &self,
667 run_id: &RunId,
668 loop_instance_id: &LoopInstanceId,
669 expected_loop: &LoopSnapshot,
670 next_loop: LoopSnapshot,
671 frame_id: &FrameId,
672 initial_frame: FrameSnapshot,
673 ledger_entry: LoopIterationLedgerEntry,
674 expected_run_state: &flow_run::State,
675 next_run_state: flow_run::State,
676 ) -> Result<bool, MobStoreError>;
677 #[allow(clippy::too_many_arguments)]
678 async fn cas_grant_body_frame_start_with_authority(
679 &self,
680 run_id: &RunId,
681 loop_instance_id: &LoopInstanceId,
682 expected_loop: &LoopSnapshot,
683 next_loop: LoopSnapshot,
684 frame_id: &FrameId,
685 initial_frame: FrameSnapshot,
686 ledger_entry: LoopIterationLedgerEntry,
687 expected_run_state: &flow_run::State,
688 next_run_state: flow_run::State,
689 authority_inputs: Vec<mob_dsl::MobMachineInput>,
690 ) -> Result<bool, MobStoreError>;
691
692 #[allow(clippy::too_many_arguments)]
698 async fn cas_complete_body_frame(
699 &self,
700 run_id: &RunId,
701 loop_instance_id: &LoopInstanceId,
702 expected_loop: &LoopSnapshot,
703 next_loop: LoopSnapshot,
704 frame_id: &FrameId,
705 expected_frame: &FrameSnapshot,
706 next_frame: FrameSnapshot,
707 expected_run_state: &flow_run::State,
708 next_run_state: flow_run::State,
709 ) -> Result<bool, MobStoreError>;
710 #[allow(clippy::too_many_arguments)]
711 async fn cas_complete_body_frame_with_authority(
712 &self,
713 run_id: &RunId,
714 loop_instance_id: &LoopInstanceId,
715 expected_loop: &LoopSnapshot,
716 next_loop: LoopSnapshot,
717 frame_id: &FrameId,
718 expected_frame: &FrameSnapshot,
719 next_frame: FrameSnapshot,
720 expected_run_state: &flow_run::State,
721 next_run_state: flow_run::State,
722 authority_inputs: Vec<mob_dsl::MobMachineInput>,
723 ) -> Result<bool, MobStoreError>;
724
725 #[allow(clippy::too_many_arguments)]
731 async fn cas_complete_loop(
732 &self,
733 run_id: &RunId,
734 loop_instance_id: &LoopInstanceId,
735 expected_loop: &LoopSnapshot,
736 next_loop: LoopSnapshot,
737 frame_id: &FrameId,
738 expected_frame: &FrameSnapshot,
739 next_frame: FrameSnapshot,
740 expected_run_state: &flow_run::State,
741 next_run_state: flow_run::State,
742 ) -> Result<bool, MobStoreError>;
743 #[allow(clippy::too_many_arguments)]
744 async fn cas_complete_loop_with_authority(
745 &self,
746 run_id: &RunId,
747 loop_instance_id: &LoopInstanceId,
748 expected_loop: &LoopSnapshot,
749 next_loop: LoopSnapshot,
750 frame_id: &FrameId,
751 expected_frame: &FrameSnapshot,
752 next_frame: FrameSnapshot,
753 expected_run_state: &flow_run::State,
754 next_run_state: flow_run::State,
755 authority_inputs: Vec<mob_dsl::MobMachineInput>,
756 ) -> Result<bool, MobStoreError>;
757}
758
759#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
761#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
762pub trait MobSpecStore: Send + Sync {
763 async fn put_spec(
765 &self,
766 mob_id: &MobId,
767 definition: &MobDefinition,
768 revision: Option<u64>,
769 ) -> Result<u64, MobStoreError>;
770
771 async fn get_spec(&self, mob_id: &MobId)
772 -> Result<Option<(MobDefinition, u64)>, MobStoreError>;
773 async fn list_specs(&self) -> Result<Vec<MobId>, MobStoreError>;
774 async fn delete_spec(
775 &self,
776 mob_id: &MobId,
777 revision: Option<u64>,
778 ) -> Result<bool, MobStoreError>;
779}