mod in_memory;
mod realm_profile;
#[cfg(not(target_arch = "wasm32"))]
mod sqlite;
pub use in_memory::{
InMemoryMobEventStore, InMemoryMobRunStore, InMemoryMobSpecStore, InMemoryRealmProfileStore,
};
pub use realm_profile::{RealmProfileStore, StoredRealmProfile};
#[cfg(not(target_arch = "wasm32"))]
pub use sqlite::{
SqliteMobEventStore, SqliteMobRunStore, SqliteMobSpecStore, SqliteMobStores,
SqliteRealmProfileStore,
};
use crate::definition::MobDefinition;
use crate::event::{MobEvent, NewMobEvent};
use crate::ids::{FlowId, FrameId, LoopId, LoopInstanceId, MobId, RunId, StepId};
use crate::run::{
FailureLedgerEntry, FrameSnapshot, LoopIterationLedgerEntry, LoopSnapshot, MobRun,
MobRunStatus, StepLedgerEntry,
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use meerkat_machine_kernels::KernelState;
#[derive(Debug, thiserror::Error)]
pub enum MobStoreError {
#[error("Write failed: {0}")]
WriteFailed(String),
#[error("Read failed: {0}")]
ReadFailed(String),
#[error("Not found: {0}")]
NotFound(String),
#[error("CAS conflict: {0}")]
CasConflict(String),
#[error("spec revision conflict for mob {mob_id}: expected {expected:?}, actual {actual}")]
SpecRevisionConflict {
mob_id: crate::ids::MobId,
expected: Option<u64>,
actual: u64,
},
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Internal error: {0}")]
Internal(String),
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait MobEventStore: Send + Sync {
async fn append(&self, event: NewMobEvent) -> Result<MobEvent, MobStoreError>;
async fn append_batch(&self, events: Vec<NewMobEvent>) -> Result<Vec<MobEvent>, MobStoreError>;
async fn poll(&self, after_cursor: u64, limit: usize) -> Result<Vec<MobEvent>, MobStoreError>;
async fn replay_all(&self) -> Result<Vec<MobEvent>, MobStoreError>;
async fn clear(&self) -> Result<(), MobStoreError>;
async fn prune(&self, _older_than: DateTime<Utc>) -> Result<u64, MobStoreError> {
Ok(0)
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait MobRunStore: Send + Sync {
async fn create_run(&self, run: MobRun) -> Result<(), MobStoreError>;
async fn get_run(&self, run_id: &RunId) -> Result<Option<MobRun>, MobStoreError>;
async fn list_runs(
&self,
mob_id: &MobId,
flow_id: Option<&FlowId>,
) -> Result<Vec<MobRun>, MobStoreError>;
async fn cas_run_status(
&self,
run_id: &RunId,
expected: MobRunStatus,
next: MobRunStatus,
) -> Result<bool, MobStoreError>;
async fn cas_flow_state(
&self,
run_id: &RunId,
expected: &KernelState,
next: &KernelState,
) -> Result<bool, MobStoreError>;
async fn cas_run_snapshot(
&self,
run_id: &RunId,
expected_status: MobRunStatus,
expected_flow_state: &KernelState,
next_status: MobRunStatus,
next_flow_state: &KernelState,
) -> Result<bool, MobStoreError>;
async fn append_step_entry(
&self,
run_id: &RunId,
entry: StepLedgerEntry,
) -> Result<(), MobStoreError>;
async fn append_step_entry_if_absent(
&self,
run_id: &RunId,
entry: StepLedgerEntry,
) -> Result<bool, MobStoreError>;
async fn put_step_output(
&self,
run_id: &RunId,
step_id: &StepId,
output: serde_json::Value,
) -> Result<(), MobStoreError>;
async fn append_failure_entry(
&self,
run_id: &RunId,
entry: FailureLedgerEntry,
) -> Result<(), MobStoreError>;
async fn upsert_loop_snapshot(
&self,
run_id: &RunId,
loop_instance_id: &LoopInstanceId,
snapshot: LoopSnapshot,
ledger_entry: Option<LoopIterationLedgerEntry>,
) -> Result<(), MobStoreError>;
async fn cas_frame_state(
&self,
run_id: &RunId,
frame_id: &FrameId,
expected: Option<&FrameSnapshot>,
next: FrameSnapshot,
) -> Result<bool, MobStoreError>;
async fn cas_grant_node_slot(
&self,
run_id: &RunId,
expected_run_state: &KernelState,
next_run_state: KernelState,
frame_id: &FrameId,
expected_frame: &FrameSnapshot,
next_frame: FrameSnapshot,
) -> Result<bool, MobStoreError>;
#[allow(clippy::too_many_arguments)]
async fn cas_complete_step_and_record_output(
&self,
run_id: &RunId,
frame_id: &FrameId,
expected_frame: &FrameSnapshot,
next_frame: FrameSnapshot,
step_output_key: String,
step_output: serde_json::Value,
loop_context: Option<(&LoopId, u64)>,
) -> Result<bool, MobStoreError>;
#[allow(clippy::too_many_arguments)]
async fn cas_start_loop(
&self,
run_id: &RunId,
loop_instance_id: &LoopInstanceId,
expected_run_state: &KernelState,
next_run_state: KernelState,
frame_id: &FrameId,
expected_frame: &FrameSnapshot,
next_frame: FrameSnapshot,
initial_loop: LoopSnapshot,
) -> Result<bool, MobStoreError>;
async fn cas_loop_request_body_frame(
&self,
run_id: &RunId,
loop_instance_id: &LoopInstanceId,
expected_loop: &LoopSnapshot,
next_loop: LoopSnapshot,
expected_run_state: &KernelState,
next_run_state: KernelState,
) -> Result<bool, MobStoreError>;
#[allow(clippy::too_many_arguments)]
async fn cas_grant_body_frame_start(
&self,
run_id: &RunId,
loop_instance_id: &LoopInstanceId,
expected_loop: &LoopSnapshot,
next_loop: LoopSnapshot,
frame_id: &FrameId,
initial_frame: FrameSnapshot,
ledger_entry: LoopIterationLedgerEntry,
expected_run_state: &KernelState,
next_run_state: KernelState,
) -> Result<bool, MobStoreError>;
#[allow(clippy::too_many_arguments)]
async fn cas_complete_body_frame(
&self,
run_id: &RunId,
loop_instance_id: &LoopInstanceId,
expected_loop: &LoopSnapshot,
next_loop: LoopSnapshot,
frame_id: &FrameId,
expected_frame: &FrameSnapshot,
next_frame: FrameSnapshot,
expected_run_state: &KernelState,
next_run_state: KernelState,
) -> Result<bool, MobStoreError>;
#[allow(clippy::too_many_arguments)]
async fn cas_complete_loop(
&self,
run_id: &RunId,
loop_instance_id: &LoopInstanceId,
expected_loop: &LoopSnapshot,
next_loop: LoopSnapshot,
frame_id: &FrameId,
expected_frame: &FrameSnapshot,
next_frame: FrameSnapshot,
expected_run_state: &KernelState,
next_run_state: KernelState,
) -> Result<bool, MobStoreError>;
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait MobSpecStore: Send + Sync {
async fn put_spec(
&self,
mob_id: &MobId,
definition: &MobDefinition,
revision: Option<u64>,
) -> Result<u64, MobStoreError>;
async fn get_spec(&self, mob_id: &MobId)
-> Result<Option<(MobDefinition, u64)>, MobStoreError>;
async fn list_specs(&self) -> Result<Vec<MobId>, MobStoreError>;
async fn delete_spec(
&self,
mob_id: &MobId,
revision: Option<u64>,
) -> Result<bool, MobStoreError>;
}