meerkat-mob 0.5.2

Multi-agent orchestration runtime for Meerkat
Documentation
//! MobStorage bundle.
//!
//! Groups the event store with a session store for a mob's isolated storage.

#[cfg(not(target_arch = "wasm32"))]
use crate::store::SqliteMobStores;
use crate::store::{
    InMemoryMobEventStore, InMemoryMobRunStore, InMemoryMobSpecStore, InMemoryRealmProfileStore,
    MobEventStore, MobRunStore, MobSpecStore, RealmProfileStore,
};
#[cfg(not(target_arch = "wasm32"))]
use std::path::Path;
use std::sync::Arc;

/// Storage bundle for a mob.
///
/// Contains both the mob event store (structural state) and session
/// store (for meerkat sessions). Each mob has its own isolated storage.
pub struct MobStorage {
    /// Event store for mob structural events.
    pub events: Arc<dyn MobEventStore>,
    /// Flow run persistence store.
    pub runs: Arc<dyn MobRunStore>,
    /// Flow spec persistence store.
    pub specs: Arc<dyn MobSpecStore>,
    /// Realm-scoped reusable profile store.
    pub realm_profiles: Option<Arc<dyn RealmProfileStore>>,
}

impl MobStorage {
    /// Create a storage bundle with in-memory stores (for tests and ephemeral mobs).
    pub fn in_memory() -> Self {
        let (runs, specs) = Self::in_memory_flow_stores();
        Self {
            events: Arc::new(InMemoryMobEventStore::new()),
            runs,
            specs,
            realm_profiles: Some(Arc::new(InMemoryRealmProfileStore::new())),
        }
    }

    /// Create in-memory run/spec stores for flow persistence.
    pub fn in_memory_flow_stores() -> (Arc<dyn MobRunStore>, Arc<dyn MobSpecStore>) {
        (
            Arc::new(InMemoryMobRunStore::new()),
            Arc::new(InMemoryMobSpecStore::new()),
        )
    }

    /// Build a full storage bundle from a custom event store and in-memory flow stores.
    pub fn with_events(events: Arc<dyn MobEventStore>) -> Self {
        let (runs, specs) = Self::in_memory_flow_stores();
        Self {
            events,
            runs,
            specs,
            realm_profiles: Some(Arc::new(InMemoryRealmProfileStore::new())),
        }
    }

    /// Build a storage bundle from custom store implementations.
    pub fn custom(
        events: Arc<dyn MobEventStore>,
        runs: Arc<dyn MobRunStore>,
        specs: Arc<dyn MobSpecStore>,
    ) -> Self {
        Self {
            events,
            runs,
            specs,
            realm_profiles: None,
        }
    }

    /// Create a storage bundle backed by a single SQLite database file.
    ///
    /// Uses WAL mode — no exclusive file lock is held, so the same path
    /// can be reopened after drop within the same process.
    #[cfg(not(target_arch = "wasm32"))]
    pub fn persistent(path: impl AsRef<Path>) -> Result<Self, crate::MobError> {
        let stores = SqliteMobStores::open(path)?;
        Ok(Self {
            events: Arc::new(stores.event_store()),
            runs: Arc::new(stores.run_store()),
            specs: Arc::new(stores.spec_store()),
            realm_profiles: Some(Arc::new(stores.realm_profile_store())),
        })
    }
}

impl std::fmt::Debug for MobStorage {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("MobStorage")
            .field("events", &"<dyn MobEventStore>")
            .field("runs", &"<dyn MobRunStore>")
            .field("specs", &"<dyn MobSpecStore>")
            .field(
                "realm_profiles",
                &self
                    .realm_profiles
                    .as_ref()
                    .map(|_| "<dyn RealmProfileStore>"),
            )
            .finish()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::event::{MobEventKind, NewMobEvent};
    use crate::ids::{MobId, RunId};
    use crate::run::{MobRun, MobRunStatus};
    use chrono::Utc;

    #[tokio::test]
    async fn test_in_memory_storage_creates_working_stores() {
        let storage = MobStorage::in_memory();

        // Event store works
        let event = NewMobEvent {
            mob_id: MobId::from("test"),
            timestamp: None,
            kind: MobEventKind::MobCompleted,
        };
        let stored = storage.events.append(event).await.unwrap();
        assert_eq!(stored.cursor, 1);

        let all = storage.events.replay_all().await.unwrap();
        assert_eq!(all.len(), 1);
    }

    #[tokio::test]
    async fn test_in_memory_flow_stores_create_working_run_and_spec_stores() {
        let (runs, specs) = MobStorage::in_memory_flow_stores();
        let run = MobRun {
            run_id: RunId::new(),
            mob_id: MobId::from("mob"),
            flow_id: crate::FlowId::from("flow"),
            status: MobRunStatus::Pending,
            flow_state: MobRun::flow_state_for_steps([crate::ids::StepId::from("step-1")]).unwrap(),
            activation_params: serde_json::json!({}),
            created_at: Utc::now(),
            completed_at: None,
            step_ledger: Vec::new(),
            failure_ledger: Vec::new(),
            frames: std::collections::BTreeMap::new(),
            loops: std::collections::BTreeMap::new(),
            loop_iteration_ledger: Vec::new(),
            schema_version: 4,
            root_step_outputs: indexmap::IndexMap::new(),
            loop_iteration_outputs: std::collections::BTreeMap::new(),
        };
        runs.create_run(run.clone()).await.unwrap();
        assert!(runs.get_run(&run.run_id).await.unwrap().is_some());

        let definition = crate::definition::MobDefinition::from_toml(
            r#"
[mob]
id = "mob"
[profiles.worker]
model = "test"
"#,
        )
        .unwrap();
        let revision = specs
            .put_spec(&MobId::from("mob"), &definition, None)
            .await
            .unwrap();
        assert_eq!(revision, 1);
    }

    #[tokio::test]
    async fn test_persistent_storage_uses_shared_database_for_all_stores() {
        let dir = tempfile::tempdir().unwrap();
        let db_path = dir.path().join("mob.db");
        let storage = MobStorage::persistent(&db_path).unwrap();

        let event = NewMobEvent {
            mob_id: MobId::from("mob"),
            timestamp: None,
            kind: MobEventKind::MobCompleted,
        };
        storage.events.append(event).await.unwrap();

        let run = MobRun {
            run_id: RunId::new(),
            mob_id: MobId::from("mob"),
            flow_id: crate::FlowId::from("flow"),
            status: MobRunStatus::Pending,
            flow_state: MobRun::flow_state_for_steps([crate::ids::StepId::from("step-1")]).unwrap(),
            activation_params: serde_json::json!({}),
            created_at: Utc::now(),
            completed_at: None,
            step_ledger: Vec::new(),
            failure_ledger: Vec::new(),
            frames: std::collections::BTreeMap::new(),
            loops: std::collections::BTreeMap::new(),
            loop_iteration_ledger: Vec::new(),
            schema_version: 4,
            root_step_outputs: indexmap::IndexMap::new(),
            loop_iteration_outputs: std::collections::BTreeMap::new(),
        };
        storage.runs.create_run(run.clone()).await.unwrap();
        assert!(storage.runs.get_run(&run.run_id).await.unwrap().is_some());

        let definition = crate::definition::MobDefinition::from_toml(
            r#"
[mob]
id = "mob"
[profiles.worker]
model = "test"
"#,
        )
        .unwrap();
        let revision = storage
            .specs
            .put_spec(&MobId::from("mob"), &definition, None)
            .await
            .unwrap();
        assert_eq!(revision, 1);
    }
}