shuttle-rs 2026.6.2

Local-first event log CLI for agent memory, repository context, task coordination, handoffs, messaging, mesh sync, and MCP access.
Documentation
use std::fs;
use std::path::Path;

use crate::core::{Event, EventFilter, EventStore, Result, ShuttleError};
use crate::store::SqliteEventStore;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MeshArchive {
    pub exported_at: DateTime<Utc>,
    pub event_count: usize,
    pub events: Vec<Event>,
}

#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct ImportReport {
    pub imported: usize,
    pub skipped_duplicates: usize,
}

#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct SyncReport {
    pub local_imported: usize,
    pub peer_imported: usize,
    pub skipped_duplicates: usize,
}

pub async fn export_archive(store: &SqliteEventStore) -> Result<MeshArchive> {
    let mut events = all_events(store).await?;
    events.sort_by(|left, right| {
        left.created_at
            .cmp(&right.created_at)
            .then(left.id.cmp(&right.id))
    });
    Ok(MeshArchive {
        exported_at: Utc::now(),
        event_count: events.len(),
        events,
    })
}

pub fn write_archive(path: impl AsRef<Path>, archive: &MeshArchive) -> Result<()> {
    let contents = serde_json::to_string_pretty(archive)
        .map_err(|err| ShuttleError::Serialization(err.to_string()))?;
    fs::write(path, contents).map_err(|err| ShuttleError::Store(err.to_string()))
}

pub fn read_archive(path: impl AsRef<Path>) -> Result<MeshArchive> {
    let contents = fs::read_to_string(path).map_err(|err| ShuttleError::Store(err.to_string()))?;
    let archive = serde_json::from_str(&contents)
        .map_err(|err| ShuttleError::Serialization(err.to_string()))?;
    Ok(archive)
}

pub async fn import_archive(
    store: &SqliteEventStore,
    archive: MeshArchive,
) -> Result<ImportReport> {
    import_events(store, archive.events).await
}

pub async fn import_archive_into_workspace(
    store: &SqliteEventStore,
    archive: MeshArchive,
    target_workspace_id: &str,
) -> Result<ImportReport> {
    import_events_into_workspace(store, archive.events, Some(target_workspace_id)).await
}

pub async fn sync_bidirectional(
    local: &SqliteEventStore,
    peer: &SqliteEventStore,
) -> Result<SyncReport> {
    let local_events = all_events(local).await?;
    let peer_events = all_events(peer).await?;
    let local_report = import_events(local, peer_events).await?;
    let peer_report = import_events(peer, local_events).await?;

    Ok(SyncReport {
        local_imported: local_report.imported,
        peer_imported: peer_report.imported,
        skipped_duplicates: local_report.skipped_duplicates + peer_report.skipped_duplicates,
    })
}

pub async fn sync_bidirectional_into_workspaces(
    local: &SqliteEventStore,
    local_workspace_id: &str,
    peer: &SqliteEventStore,
    peer_workspace_id: Option<&str>,
) -> Result<SyncReport> {
    let local_events = all_events(local).await?;
    let peer_events = all_events(peer).await?;
    let local_report =
        import_events_into_workspace(local, peer_events, Some(local_workspace_id)).await?;
    let peer_report = import_events_into_workspace(peer, local_events, peer_workspace_id).await?;

    Ok(SyncReport {
        local_imported: local_report.imported,
        peer_imported: peer_report.imported,
        skipped_duplicates: local_report.skipped_duplicates + peer_report.skipped_duplicates,
    })
}

async fn all_events(store: &SqliteEventStore) -> Result<Vec<Event>> {
    store
        .list(EventFilter {
            limit: Some(u32::MAX),
            ..EventFilter::default()
        })
        .await
}

async fn import_events(store: &SqliteEventStore, events: Vec<Event>) -> Result<ImportReport> {
    import_events_into_workspace(store, events, None).await
}

async fn import_events_into_workspace(
    store: &SqliteEventStore,
    events: Vec<Event>,
    target_workspace_id: Option<&str>,
) -> Result<ImportReport> {
    let mut report = ImportReport::default();
    for event in events {
        let event = if let Some(workspace_id) = target_workspace_id {
            event_for_workspace(event, workspace_id)
        } else {
            event
        };
        if store.append_if_absent(event)? {
            report.imported += 1;
        } else {
            report.skipped_duplicates += 1;
        }
    }
    Ok(report)
}

fn event_for_workspace(mut event: Event, workspace_id: &str) -> Event {
    if event.workspace_id != workspace_id {
        if let Some(metadata) = event.metadata_json.as_object_mut() {
            metadata.insert(
                "mesh_source_workspace_id".to_owned(),
                serde_json::json!(event.workspace_id),
            );
        }
        event.workspace_id = workspace_id.to_owned();
    }
    event
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::core::{EventStore, EventType};

    #[test]
    fn imports_are_idempotent_and_preserve_event_ids() {
        let source_dir = tempfile::tempdir().unwrap();
        let target_dir = tempfile::tempdir().unwrap();
        let source = SqliteEventStore::open(source_dir.path().join("source.db")).unwrap();
        let target = SqliteEventStore::open(target_dir.path().join("target.db")).unwrap();
        let event = crate::message::new_message(
            "workspace".into(),
            "codex".into(),
            "session".into(),
            "claude".into(),
            "review this branch".into(),
        );
        let event_id = event.id;
        futures_executor::block_on(source.append(event)).unwrap();

        let archive = futures_executor::block_on(export_archive(&source)).unwrap();
        let first = futures_executor::block_on(import_archive(&target, archive.clone())).unwrap();
        let second = futures_executor::block_on(import_archive(&target, archive)).unwrap();
        let events = futures_executor::block_on(target.list(EventFilter {
            event_type: Some(EventType::Message),
            ..EventFilter::default()
        }))
        .unwrap();

        assert_eq!(first.imported, 1);
        assert_eq!(first.skipped_duplicates, 0);
        assert_eq!(second.imported, 0);
        assert_eq!(second.skipped_duplicates, 1);
        assert_eq!(events.len(), 1);
        assert_eq!(events[0].id, event_id);
    }

    #[test]
    fn workspace_import_keeps_ids_but_makes_events_visible_locally() {
        let source_dir = tempfile::tempdir().unwrap();
        let target_dir = tempfile::tempdir().unwrap();
        let source = SqliteEventStore::open(source_dir.path().join("source.db")).unwrap();
        let target = SqliteEventStore::open(target_dir.path().join("target.db")).unwrap();
        let event = crate::memory::new_memory(
            "source-workspace".into(),
            "codex".into(),
            "session".into(),
            "shared memory".into(),
        );
        let event_id = event.id;
        futures_executor::block_on(source.append(event)).unwrap();

        let archive = futures_executor::block_on(export_archive(&source)).unwrap();
        let report = futures_executor::block_on(import_archive_into_workspace(
            &target,
            archive,
            "target-workspace",
        ))
        .unwrap();
        let events = futures_executor::block_on(target.list(EventFilter {
            workspace_id: Some("target-workspace".into()),
            ..EventFilter::default()
        }))
        .unwrap();

        assert_eq!(report.imported, 1);
        assert_eq!(events.len(), 1);
        assert_eq!(events[0].id, event_id);
        assert_eq!(events[0].workspace_id, "target-workspace");
        assert_eq!(
            events[0].metadata_json["mesh_source_workspace_id"],
            "source-workspace"
        );
    }

    #[test]
    fn bidirectional_sync_resumes_after_each_peer_records_events() {
        let local_dir = tempfile::tempdir().unwrap();
        let peer_dir = tempfile::tempdir().unwrap();
        let local = SqliteEventStore::open(local_dir.path().join("local.db")).unwrap();
        let peer = SqliteEventStore::open(peer_dir.path().join("peer.db")).unwrap();

        let memory = crate::memory::new_memory(
            "workspace".into(),
            "codex".into(),
            "session".into(),
            "SQLite is the local store".into(),
        );
        let task = crate::task::new_task(
            "workspace".into(),
            "claude".into(),
            "session".into(),
            "finish mesh sync".into(),
        );
        futures_executor::block_on(local.append(memory)).unwrap();
        futures_executor::block_on(peer.append(task)).unwrap();

        let first = futures_executor::block_on(sync_bidirectional(&local, &peer)).unwrap();
        assert_eq!(first.local_imported, 1);
        assert_eq!(first.peer_imported, 1);

        let handoff = crate::task::new_handoff(
            "workspace".into(),
            "codex".into(),
            "session".into(),
            "claude".into(),
            "continue after reconnect".into(),
        );
        let message = crate::message::new_message(
            "workspace".into(),
            "claude".into(),
            "session".into(),
            "codex".into(),
            "synced".into(),
        );
        futures_executor::block_on(local.append(handoff)).unwrap();
        futures_executor::block_on(peer.append(message)).unwrap();

        let second = futures_executor::block_on(sync_bidirectional(&local, &peer)).unwrap();
        assert_eq!(second.local_imported, 1);
        assert_eq!(second.peer_imported, 1);
        assert!(second.skipped_duplicates >= 4);

        for store in [&local, &peer] {
            let events = futures_executor::block_on(store.list(EventFilter {
                limit: Some(u32::MAX),
                ..EventFilter::default()
            }))
            .unwrap();
            assert_eq!(events.len(), 4);
            assert!(events
                .iter()
                .any(|event| event.event_type == EventType::Memory));
            assert!(events
                .iter()
                .any(|event| event.event_type == EventType::Task));
            assert!(events
                .iter()
                .any(|event| event.event_type == EventType::Handoff));
            assert!(events
                .iter()
                .any(|event| event.event_type == EventType::Message));
        }
    }

    #[test]
    fn archives_round_trip_through_json_files() {
        let source_dir = tempfile::tempdir().unwrap();
        let file_dir = tempfile::tempdir().unwrap();
        let source = SqliteEventStore::open(source_dir.path().join("source.db")).unwrap();
        let event = crate::memory::new_memory(
            "workspace".into(),
            "codex".into(),
            "session".into(),
            "portable archive".into(),
        );
        futures_executor::block_on(source.append(event)).unwrap();

        let archive = futures_executor::block_on(export_archive(&source)).unwrap();
        let path = file_dir.path().join("mesh.json");
        write_archive(&path, &archive).unwrap();
        let read_back = read_archive(&path).unwrap();

        assert_eq!(read_back.event_count, 1);
        assert_eq!(read_back.events[0].content, "portable archive");
    }
}