mod in_memory;
mod realm_profile;
#[cfg(not(target_arch = "wasm32"))]
mod sqlite;
pub use in_memory::{
InMemoryMobEventStore, InMemoryMobRunStore, InMemoryMobRuntimeMetadataStore,
InMemoryMobSpecStore, InMemoryRealmProfileStore,
};
pub use realm_profile::{RealmProfileStore, StoredRealmProfile};
#[cfg(not(target_arch = "wasm32"))]
pub use sqlite::{
SqliteMobEventStore, SqliteMobRunStore, SqliteMobRuntimeMetadataStore, SqliteMobSpecStore,
SqliteMobStores, SqliteRealmProfileStore,
};
use crate::definition::MobDefinition;
use crate::event::{MemberRef, MobEvent, MobEventKind, NewMobEvent};
use crate::ids::{
AgentIdentity, FlowId, FrameId, Generation, LoopId, LoopInstanceId, MobId, RunId, StepId,
};
use crate::machines::mob_machine as mob_dsl;
use crate::run::flow_run;
use crate::run::{
FailureLedgerEntry, FrameSnapshot, LoopIterationLedgerEntry, LoopSnapshot, MobRun,
MobRunStatus, StepLedgerEntry,
};
#[cfg(target_arch = "wasm32")]
use crate::tokio;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use meerkat_contracts::wire::supervisor_bridge::{BridgeBootstrapToken, BridgeProtocolVersion};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
pub type MobEventReceiver = broadcast::Receiver<MobEvent>;
pub(crate) fn terminal_event_identity(kind: &MobEventKind) -> Option<(&RunId, &FlowId)> {
match kind {
MobEventKind::FlowCompleted {
run_id, flow_id, ..
}
| MobEventKind::FlowFailed {
run_id, flow_id, ..
}
| MobEventKind::FlowCanceled { run_id, flow_id } => Some((run_id, flow_id)),
_ => None,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FrameAtomicOperation {
CasFrameState,
CasGrantNodeSlot,
CasCompleteStepAndRecordOutput,
CasStartLoop,
CasLoopRequestBodyFrame,
CasGrantBodyFrameStart,
CasCompleteBodyFrame,
CasCompleteLoop,
}
impl std::fmt::Display for FrameAtomicOperation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::CasFrameState => write!(f, "cas_frame_state"),
Self::CasGrantNodeSlot => write!(f, "cas_grant_node_slot"),
Self::CasCompleteStepAndRecordOutput => {
write!(f, "cas_complete_step_and_record_output")
}
Self::CasStartLoop => write!(f, "cas_start_loop"),
Self::CasLoopRequestBodyFrame => write!(f, "cas_loop_request_body_frame"),
Self::CasGrantBodyFrameStart => write!(f, "cas_grant_body_frame_start"),
Self::CasCompleteBodyFrame => write!(f, "cas_complete_body_frame"),
Self::CasCompleteLoop => write!(f, "cas_complete_loop"),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum MobStoreError {
#[error("Write failed: {0}")]
WriteFailed(String),
#[error("Read failed: {0}")]
ReadFailed(String),
#[error("Not found: {0}")]
NotFound(String),
#[error("CAS conflict: {0}")]
CasConflict(String),
#[error("spec revision conflict for mob {mob_id}: expected {expected:?}, actual {actual}")]
SpecRevisionConflict {
mob_id: crate::ids::MobId,
expected: Option<u64>,
actual: u64,
},
#[error("frame-aware atomic persistence unavailable for operation '{operation}'")]
FrameAtomicPersistenceUnavailable { operation: FrameAtomicOperation },
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Internal error: {0}")]
Internal(String),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SupervisorAuthorityRecord {
pub secret_key: [u8; 32],
pub public_peer_id: String,
pub epoch: u64,
pub protocol_version: BridgeProtocolVersion,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pending_rotation: Option<SupervisorPendingRotationRecord>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SupervisorPendingRotationRecord {
pub secret_key: [u8; 32],
pub public_peer_id: String,
pub epoch: u64,
pub protocol_version: BridgeProtocolVersion,
#[serde(default)]
pub accepted_peer_ids: Vec<String>,
}
impl SupervisorAuthorityRecord {
pub fn generate(protocol_version: BridgeProtocolVersion) -> Self {
let keypair = meerkat_comms::Keypair::generate();
Self {
secret_key: keypair.secret_bytes(),
public_peer_id: keypair.public_key().to_peer_id().as_str(),
epoch: 0,
protocol_version,
pending_rotation: None,
}
}
pub fn keypair(&self) -> meerkat_comms::Keypair {
meerkat_comms::Keypair::from_secret(self.secret_key)
}
pub fn without_pending_rotation(&self) -> Self {
let mut record = self.clone();
record.pending_rotation = None;
record
}
pub fn apply_process_local_pending_rotation(
&mut self,
pending: SupervisorPendingRotationRecord,
) -> bool {
if !pending.accepted_peer_ids.is_empty()
&& pending.epoch <= self.epoch
&& pending.public_peer_id != self.public_peer_id
{
return match self.pending_rotation.as_ref() {
Some(durable) if durable.same_attempted_authority(&pending) => {
self.pending_rotation = Some(pending);
true
}
None => {
self.pending_rotation = Some(pending);
true
}
Some(_) => false,
};
}
if self.epoch.checked_add(1) != Some(pending.epoch) {
return false;
}
if pending.accepted_peer_ids.is_empty() {
return match self.pending_rotation.as_ref() {
Some(durable) if durable.same_attempted_authority(&pending) => {
self.pending_rotation = None;
true
}
None => false,
Some(_) => false,
};
}
match self.pending_rotation.as_ref() {
Some(durable) if durable.same_attempted_authority(&pending) => {
self.pending_rotation = Some(pending);
true
}
None => {
self.pending_rotation = Some(pending);
true
}
Some(_) => false,
}
}
}
impl SupervisorPendingRotationRecord {
pub fn from_authority(
authority: &SupervisorAuthorityRecord,
accepted_peer_ids: Vec<String>,
) -> Self {
Self {
secret_key: authority.secret_key,
public_peer_id: authority.public_peer_id.clone(),
epoch: authority.epoch,
protocol_version: authority.protocol_version,
accepted_peer_ids,
}
}
pub fn authority_record(&self) -> SupervisorAuthorityRecord {
SupervisorAuthorityRecord {
secret_key: self.secret_key,
public_peer_id: self.public_peer_id.clone(),
epoch: self.epoch,
protocol_version: self.protocol_version,
pending_rotation: None,
}
}
pub fn same_attempted_authority(&self, other: &Self) -> bool {
self.secret_key == other.secret_key
&& self.public_peer_id == other.public_peer_id
&& self.epoch == other.epoch
&& self
.protocol_version
.same_protocol_as(other.protocol_version)
}
pub fn remove_accepted_peer_ids(&mut self, peer_ids: &[String]) -> bool {
let original_len = self.accepted_peer_ids.len();
self.accepted_peer_ids
.retain(|peer_id| !peer_ids.iter().any(|candidate| candidate == peer_id));
self.accepted_peer_ids.len() != original_len
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ExternalBindingOverlayStatus {
Normalized,
Failed { reason: String },
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ExternalBindingOverlayRecord {
pub agent_identity: AgentIdentity,
pub generation: Generation,
pub(crate) normalized_member_ref: Option<MemberRef>,
pub bootstrap_token: Option<BridgeBootstrapToken>,
pub status: ExternalBindingOverlayStatus,
pub updated_at: DateTime<Utc>,
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait MobEventStore: Send + Sync {
async fn append(&self, event: NewMobEvent) -> Result<MobEvent, MobStoreError>;
async fn append_terminal_event_if_absent(
&self,
event: NewMobEvent,
) -> Result<Option<MobEvent>, MobStoreError>;
async fn append_batch(&self, events: Vec<NewMobEvent>) -> Result<Vec<MobEvent>, MobStoreError>;
async fn poll(&self, after_cursor: u64, limit: usize) -> Result<Vec<MobEvent>, MobStoreError>;
async fn replay_all(&self) -> Result<Vec<MobEvent>, MobStoreError>;
async fn latest_cursor(&self) -> Result<u64, MobStoreError>;
fn subscribe(&self) -> Result<MobEventReceiver, MobStoreError> {
Err(MobStoreError::Internal(
"mob event store does not support native event subscriptions".to_string(),
))
}
async fn clear(&self) -> Result<(), MobStoreError>;
async fn prune(&self, _older_than: DateTime<Utc>) -> Result<u64, MobStoreError> {
Ok(0)
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait MobRuntimeMetadataStore: Send + Sync {
async fn load_supervisor_authority(
&self,
mob_id: &MobId,
) -> Result<Option<SupervisorAuthorityRecord>, MobStoreError>;
async fn put_supervisor_authority(
&self,
mob_id: &MobId,
record: &SupervisorAuthorityRecord,
) -> Result<(), MobStoreError>;
async fn compare_and_put_supervisor_authority(
&self,
mob_id: &MobId,
expected: &SupervisorAuthorityRecord,
record: &SupervisorAuthorityRecord,
) -> Result<bool, MobStoreError>;
async fn put_supervisor_authority_if_absent(
&self,
mob_id: &MobId,
record: &SupervisorAuthorityRecord,
) -> Result<bool, MobStoreError>;
async fn delete_supervisor_authority(&self, mob_id: &MobId) -> Result<(), MobStoreError>;
async fn list_external_binding_overlays(
&self,
mob_id: &MobId,
) -> Result<Vec<ExternalBindingOverlayRecord>, MobStoreError>;
async fn put_external_binding_overlay_if_absent(
&self,
mob_id: &MobId,
record: &ExternalBindingOverlayRecord,
) -> Result<bool, MobStoreError>;
async fn upsert_external_binding_overlay(
&self,
mob_id: &MobId,
record: &ExternalBindingOverlayRecord,
) -> Result<(), MobStoreError>;
async fn delete_external_binding_overlay(
&self,
mob_id: &MobId,
agent_identity: &AgentIdentity,
generation: Generation,
) -> Result<(), MobStoreError>;
async fn delete_external_binding_overlays(&self, mob_id: &MobId) -> Result<(), MobStoreError>;
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait MobRunStore: Send + Sync {
async fn create_run(&self, run: MobRun) -> Result<(), MobStoreError>;
async fn get_run(&self, run_id: &RunId) -> Result<Option<MobRun>, MobStoreError>;
async fn list_runs(
&self,
mob_id: &MobId,
flow_id: Option<&FlowId>,
) -> Result<Vec<MobRun>, MobStoreError>;
async fn cas_run_status(
&self,
run_id: &RunId,
expected: MobRunStatus,
next: MobRunStatus,
) -> Result<bool, MobStoreError>;
async fn cas_flow_state(
&self,
run_id: &RunId,
expected: &flow_run::State,
next: &flow_run::State,
) -> Result<bool, MobStoreError>;
async fn cas_flow_state_with_authority(
&self,
run_id: &RunId,
expected: &flow_run::State,
next: &flow_run::State,
authority_inputs: Vec<mob_dsl::MobMachineInput>,
) -> Result<bool, MobStoreError>;
async fn cas_run_snapshot(
&self,
run_id: &RunId,
expected_status: MobRunStatus,
expected_flow_state: &flow_run::State,
next_status: MobRunStatus,
next_flow_state: &flow_run::State,
) -> Result<bool, MobStoreError>;
async fn cas_run_snapshot_with_authority(
&self,
run_id: &RunId,
expected_status: MobRunStatus,
expected_flow_state: &flow_run::State,
next_status: MobRunStatus,
next_flow_state: &flow_run::State,
authority_inputs: Vec<mob_dsl::MobMachineInput>,
) -> Result<bool, MobStoreError>;
async fn append_step_entry(
&self,
run_id: &RunId,
entry: StepLedgerEntry,
) -> Result<(), MobStoreError>;
async fn append_step_entry_if_absent(
&self,
run_id: &RunId,
entry: StepLedgerEntry,
) -> Result<bool, MobStoreError>;
async fn put_step_output(
&self,
run_id: &RunId,
step_id: &StepId,
output: serde_json::Value,
) -> Result<(), MobStoreError>;
async fn append_failure_entry(
&self,
run_id: &RunId,
entry: FailureLedgerEntry,
) -> Result<(), MobStoreError>;
async fn upsert_loop_snapshot(
&self,
run_id: &RunId,
loop_instance_id: &LoopInstanceId,
snapshot: LoopSnapshot,
ledger_entry: Option<LoopIterationLedgerEntry>,
) -> Result<(), MobStoreError>;
async fn cas_frame_state(
&self,
run_id: &RunId,
frame_id: &FrameId,
expected: Option<&FrameSnapshot>,
next: FrameSnapshot,
) -> Result<bool, MobStoreError>;
async fn cas_frame_state_with_authority(
&self,
run_id: &RunId,
frame_id: &FrameId,
expected: Option<&FrameSnapshot>,
next: FrameSnapshot,
authority_inputs: Vec<mob_dsl::MobMachineInput>,
) -> Result<bool, MobStoreError>;
async fn cas_grant_node_slot(
&self,
run_id: &RunId,
expected_run_state: &flow_run::State,
next_run_state: flow_run::State,
frame_id: &FrameId,
expected_frame: &FrameSnapshot,
next_frame: FrameSnapshot,
) -> Result<bool, MobStoreError>;
#[allow(clippy::too_many_arguments)]
async fn cas_grant_node_slot_with_authority(
&self,
run_id: &RunId,
expected_run_state: &flow_run::State,
next_run_state: flow_run::State,
frame_id: &FrameId,
expected_frame: &FrameSnapshot,
next_frame: FrameSnapshot,
authority_inputs: Vec<mob_dsl::MobMachineInput>,
) -> Result<bool, MobStoreError>;
#[allow(clippy::too_many_arguments)]
async fn cas_complete_step_and_record_output(
&self,
run_id: &RunId,
frame_id: &FrameId,
expected_frame: &FrameSnapshot,
next_frame: FrameSnapshot,
step_output_key: String,
step_output: serde_json::Value,
loop_context: Option<(&LoopId, u64)>,
) -> Result<bool, MobStoreError>;
#[allow(clippy::too_many_arguments)]
async fn cas_complete_step_and_record_output_with_authority(
&self,
run_id: &RunId,
frame_id: &FrameId,
expected_frame: &FrameSnapshot,
next_frame: FrameSnapshot,
step_output_key: String,
step_output: serde_json::Value,
loop_context: Option<(&LoopId, u64)>,
authority_inputs: Vec<mob_dsl::MobMachineInput>,
) -> Result<bool, MobStoreError>;
#[allow(clippy::too_many_arguments)]
async fn cas_start_loop(
&self,
run_id: &RunId,
loop_instance_id: &LoopInstanceId,
expected_run_state: &flow_run::State,
next_run_state: flow_run::State,
frame_id: &FrameId,
expected_frame: &FrameSnapshot,
next_frame: FrameSnapshot,
initial_loop: LoopSnapshot,
) -> Result<bool, MobStoreError>;
#[allow(clippy::too_many_arguments)]
async fn cas_start_loop_with_authority(
&self,
run_id: &RunId,
loop_instance_id: &LoopInstanceId,
expected_run_state: &flow_run::State,
next_run_state: flow_run::State,
frame_id: &FrameId,
expected_frame: &FrameSnapshot,
next_frame: FrameSnapshot,
initial_loop: LoopSnapshot,
authority_inputs: Vec<mob_dsl::MobMachineInput>,
) -> Result<bool, MobStoreError>;
async fn cas_loop_request_body_frame(
&self,
run_id: &RunId,
loop_instance_id: &LoopInstanceId,
expected_loop: &LoopSnapshot,
next_loop: LoopSnapshot,
expected_run_state: &flow_run::State,
next_run_state: flow_run::State,
) -> Result<bool, MobStoreError>;
#[allow(clippy::too_many_arguments)]
async fn cas_loop_request_body_frame_with_authority(
&self,
run_id: &RunId,
loop_instance_id: &LoopInstanceId,
expected_loop: &LoopSnapshot,
next_loop: LoopSnapshot,
expected_run_state: &flow_run::State,
next_run_state: flow_run::State,
authority_inputs: Vec<mob_dsl::MobMachineInput>,
) -> Result<bool, MobStoreError>;
#[allow(clippy::too_many_arguments)]
async fn cas_grant_body_frame_start(
&self,
run_id: &RunId,
loop_instance_id: &LoopInstanceId,
expected_loop: &LoopSnapshot,
next_loop: LoopSnapshot,
frame_id: &FrameId,
initial_frame: FrameSnapshot,
ledger_entry: LoopIterationLedgerEntry,
expected_run_state: &flow_run::State,
next_run_state: flow_run::State,
) -> Result<bool, MobStoreError>;
#[allow(clippy::too_many_arguments)]
async fn cas_grant_body_frame_start_with_authority(
&self,
run_id: &RunId,
loop_instance_id: &LoopInstanceId,
expected_loop: &LoopSnapshot,
next_loop: LoopSnapshot,
frame_id: &FrameId,
initial_frame: FrameSnapshot,
ledger_entry: LoopIterationLedgerEntry,
expected_run_state: &flow_run::State,
next_run_state: flow_run::State,
authority_inputs: Vec<mob_dsl::MobMachineInput>,
) -> Result<bool, MobStoreError>;
#[allow(clippy::too_many_arguments)]
async fn cas_complete_body_frame(
&self,
run_id: &RunId,
loop_instance_id: &LoopInstanceId,
expected_loop: &LoopSnapshot,
next_loop: LoopSnapshot,
frame_id: &FrameId,
expected_frame: &FrameSnapshot,
next_frame: FrameSnapshot,
expected_run_state: &flow_run::State,
next_run_state: flow_run::State,
) -> Result<bool, MobStoreError>;
#[allow(clippy::too_many_arguments)]
async fn cas_complete_body_frame_with_authority(
&self,
run_id: &RunId,
loop_instance_id: &LoopInstanceId,
expected_loop: &LoopSnapshot,
next_loop: LoopSnapshot,
frame_id: &FrameId,
expected_frame: &FrameSnapshot,
next_frame: FrameSnapshot,
expected_run_state: &flow_run::State,
next_run_state: flow_run::State,
authority_inputs: Vec<mob_dsl::MobMachineInput>,
) -> Result<bool, MobStoreError>;
#[allow(clippy::too_many_arguments)]
async fn cas_complete_loop(
&self,
run_id: &RunId,
loop_instance_id: &LoopInstanceId,
expected_loop: &LoopSnapshot,
next_loop: LoopSnapshot,
frame_id: &FrameId,
expected_frame: &FrameSnapshot,
next_frame: FrameSnapshot,
expected_run_state: &flow_run::State,
next_run_state: flow_run::State,
) -> Result<bool, MobStoreError>;
#[allow(clippy::too_many_arguments)]
async fn cas_complete_loop_with_authority(
&self,
run_id: &RunId,
loop_instance_id: &LoopInstanceId,
expected_loop: &LoopSnapshot,
next_loop: LoopSnapshot,
frame_id: &FrameId,
expected_frame: &FrameSnapshot,
next_frame: FrameSnapshot,
expected_run_state: &flow_run::State,
next_run_state: flow_run::State,
authority_inputs: Vec<mob_dsl::MobMachineInput>,
) -> Result<bool, MobStoreError>;
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait MobSpecStore: Send + Sync {
async fn put_spec(
&self,
mob_id: &MobId,
definition: &MobDefinition,
revision: Option<u64>,
) -> Result<u64, MobStoreError>;
async fn get_spec(&self, mob_id: &MobId)
-> Result<Option<(MobDefinition, u64)>, MobStoreError>;
async fn list_specs(&self) -> Result<Vec<MobId>, MobStoreError>;
async fn delete_spec(
&self,
mob_id: &MobId,
revision: Option<u64>,
) -> Result<bool, MobStoreError>;
}