Skip to main content

shuttle_rs/
mesh.rs

1use std::fs;
2use std::path::Path;
3
4use crate::core::{Event, EventFilter, EventStore, Result, ShuttleError};
5use crate::store::SqliteEventStore;
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
10pub struct MeshArchive {
11    pub exported_at: DateTime<Utc>,
12    pub event_count: usize,
13    pub events: Vec<Event>,
14}
15
16#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
17pub struct ImportReport {
18    pub imported: usize,
19    pub skipped_duplicates: usize,
20}
21
22#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
23pub struct SyncReport {
24    pub local_imported: usize,
25    pub peer_imported: usize,
26    pub skipped_duplicates: usize,
27}
28
29pub async fn export_archive(store: &SqliteEventStore) -> Result<MeshArchive> {
30    let mut events = all_events(store).await?;
31    events.sort_by(|left, right| {
32        left.created_at
33            .cmp(&right.created_at)
34            .then(left.id.cmp(&right.id))
35    });
36    Ok(MeshArchive {
37        exported_at: Utc::now(),
38        event_count: events.len(),
39        events,
40    })
41}
42
43pub fn write_archive(path: impl AsRef<Path>, archive: &MeshArchive) -> Result<()> {
44    let contents = serde_json::to_string_pretty(archive)
45        .map_err(|err| ShuttleError::Serialization(err.to_string()))?;
46    fs::write(path, contents).map_err(|err| ShuttleError::Store(err.to_string()))
47}
48
49pub fn read_archive(path: impl AsRef<Path>) -> Result<MeshArchive> {
50    let contents = fs::read_to_string(path).map_err(|err| ShuttleError::Store(err.to_string()))?;
51    let archive = serde_json::from_str(&contents)
52        .map_err(|err| ShuttleError::Serialization(err.to_string()))?;
53    Ok(archive)
54}
55
56pub async fn import_archive(
57    store: &SqliteEventStore,
58    archive: MeshArchive,
59) -> Result<ImportReport> {
60    import_events(store, archive.events).await
61}
62
63pub async fn import_archive_into_workspace(
64    store: &SqliteEventStore,
65    archive: MeshArchive,
66    target_workspace_id: &str,
67) -> Result<ImportReport> {
68    import_events_into_workspace(store, archive.events, Some(target_workspace_id)).await
69}
70
71pub async fn sync_bidirectional(
72    local: &SqliteEventStore,
73    peer: &SqliteEventStore,
74) -> Result<SyncReport> {
75    let local_events = all_events(local).await?;
76    let peer_events = all_events(peer).await?;
77    let local_report = import_events(local, peer_events).await?;
78    let peer_report = import_events(peer, local_events).await?;
79
80    Ok(SyncReport {
81        local_imported: local_report.imported,
82        peer_imported: peer_report.imported,
83        skipped_duplicates: local_report.skipped_duplicates + peer_report.skipped_duplicates,
84    })
85}
86
87pub async fn sync_bidirectional_into_workspaces(
88    local: &SqliteEventStore,
89    local_workspace_id: &str,
90    peer: &SqliteEventStore,
91    peer_workspace_id: Option<&str>,
92) -> Result<SyncReport> {
93    let local_events = all_events(local).await?;
94    let peer_events = all_events(peer).await?;
95    let local_report =
96        import_events_into_workspace(local, peer_events, Some(local_workspace_id)).await?;
97    let peer_report = import_events_into_workspace(peer, local_events, peer_workspace_id).await?;
98
99    Ok(SyncReport {
100        local_imported: local_report.imported,
101        peer_imported: peer_report.imported,
102        skipped_duplicates: local_report.skipped_duplicates + peer_report.skipped_duplicates,
103    })
104}
105
106async fn all_events(store: &SqliteEventStore) -> Result<Vec<Event>> {
107    store
108        .list(EventFilter {
109            limit: Some(u32::MAX),
110            ..EventFilter::default()
111        })
112        .await
113}
114
115async fn import_events(store: &SqliteEventStore, events: Vec<Event>) -> Result<ImportReport> {
116    import_events_into_workspace(store, events, None).await
117}
118
119async fn import_events_into_workspace(
120    store: &SqliteEventStore,
121    events: Vec<Event>,
122    target_workspace_id: Option<&str>,
123) -> Result<ImportReport> {
124    let mut report = ImportReport::default();
125    for event in events {
126        let event = if let Some(workspace_id) = target_workspace_id {
127            event_for_workspace(event, workspace_id)
128        } else {
129            event
130        };
131        if store.append_if_absent(event)? {
132            report.imported += 1;
133        } else {
134            report.skipped_duplicates += 1;
135        }
136    }
137    Ok(report)
138}
139
140fn event_for_workspace(mut event: Event, workspace_id: &str) -> Event {
141    if event.workspace_id != workspace_id {
142        if let Some(metadata) = event.metadata_json.as_object_mut() {
143            metadata.insert(
144                "mesh_source_workspace_id".to_owned(),
145                serde_json::json!(event.workspace_id),
146            );
147        }
148        event.workspace_id = workspace_id.to_owned();
149    }
150    event
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156    use crate::core::{EventStore, EventType};
157
158    #[test]
159    fn imports_are_idempotent_and_preserve_event_ids() {
160        let source_dir = tempfile::tempdir().unwrap();
161        let target_dir = tempfile::tempdir().unwrap();
162        let source = SqliteEventStore::open(source_dir.path().join("source.db")).unwrap();
163        let target = SqliteEventStore::open(target_dir.path().join("target.db")).unwrap();
164        let event = crate::message::new_message(
165            "workspace".into(),
166            "codex".into(),
167            "session".into(),
168            "claude".into(),
169            "review this branch".into(),
170        );
171        let event_id = event.id;
172        futures_executor::block_on(source.append(event)).unwrap();
173
174        let archive = futures_executor::block_on(export_archive(&source)).unwrap();
175        let first = futures_executor::block_on(import_archive(&target, archive.clone())).unwrap();
176        let second = futures_executor::block_on(import_archive(&target, archive)).unwrap();
177        let events = futures_executor::block_on(target.list(EventFilter {
178            event_type: Some(EventType::Message),
179            ..EventFilter::default()
180        }))
181        .unwrap();
182
183        assert_eq!(first.imported, 1);
184        assert_eq!(first.skipped_duplicates, 0);
185        assert_eq!(second.imported, 0);
186        assert_eq!(second.skipped_duplicates, 1);
187        assert_eq!(events.len(), 1);
188        assert_eq!(events[0].id, event_id);
189    }
190
191    #[test]
192    fn workspace_import_keeps_ids_but_makes_events_visible_locally() {
193        let source_dir = tempfile::tempdir().unwrap();
194        let target_dir = tempfile::tempdir().unwrap();
195        let source = SqliteEventStore::open(source_dir.path().join("source.db")).unwrap();
196        let target = SqliteEventStore::open(target_dir.path().join("target.db")).unwrap();
197        let event = crate::memory::new_memory(
198            "source-workspace".into(),
199            "codex".into(),
200            "session".into(),
201            "shared memory".into(),
202        );
203        let event_id = event.id;
204        futures_executor::block_on(source.append(event)).unwrap();
205
206        let archive = futures_executor::block_on(export_archive(&source)).unwrap();
207        let report = futures_executor::block_on(import_archive_into_workspace(
208            &target,
209            archive,
210            "target-workspace",
211        ))
212        .unwrap();
213        let events = futures_executor::block_on(target.list(EventFilter {
214            workspace_id: Some("target-workspace".into()),
215            ..EventFilter::default()
216        }))
217        .unwrap();
218
219        assert_eq!(report.imported, 1);
220        assert_eq!(events.len(), 1);
221        assert_eq!(events[0].id, event_id);
222        assert_eq!(events[0].workspace_id, "target-workspace");
223        assert_eq!(
224            events[0].metadata_json["mesh_source_workspace_id"],
225            "source-workspace"
226        );
227    }
228
229    #[test]
230    fn bidirectional_sync_resumes_after_each_peer_records_events() {
231        let local_dir = tempfile::tempdir().unwrap();
232        let peer_dir = tempfile::tempdir().unwrap();
233        let local = SqliteEventStore::open(local_dir.path().join("local.db")).unwrap();
234        let peer = SqliteEventStore::open(peer_dir.path().join("peer.db")).unwrap();
235
236        let memory = crate::memory::new_memory(
237            "workspace".into(),
238            "codex".into(),
239            "session".into(),
240            "SQLite is the local store".into(),
241        );
242        let task = crate::task::new_task(
243            "workspace".into(),
244            "claude".into(),
245            "session".into(),
246            "finish mesh sync".into(),
247        );
248        futures_executor::block_on(local.append(memory)).unwrap();
249        futures_executor::block_on(peer.append(task)).unwrap();
250
251        let first = futures_executor::block_on(sync_bidirectional(&local, &peer)).unwrap();
252        assert_eq!(first.local_imported, 1);
253        assert_eq!(first.peer_imported, 1);
254
255        let handoff = crate::task::new_handoff(
256            "workspace".into(),
257            "codex".into(),
258            "session".into(),
259            "claude".into(),
260            "continue after reconnect".into(),
261        );
262        let message = crate::message::new_message(
263            "workspace".into(),
264            "claude".into(),
265            "session".into(),
266            "codex".into(),
267            "synced".into(),
268        );
269        futures_executor::block_on(local.append(handoff)).unwrap();
270        futures_executor::block_on(peer.append(message)).unwrap();
271
272        let second = futures_executor::block_on(sync_bidirectional(&local, &peer)).unwrap();
273        assert_eq!(second.local_imported, 1);
274        assert_eq!(second.peer_imported, 1);
275        assert!(second.skipped_duplicates >= 4);
276
277        for store in [&local, &peer] {
278            let events = futures_executor::block_on(store.list(EventFilter {
279                limit: Some(u32::MAX),
280                ..EventFilter::default()
281            }))
282            .unwrap();
283            assert_eq!(events.len(), 4);
284            assert!(events
285                .iter()
286                .any(|event| event.event_type == EventType::Memory));
287            assert!(events
288                .iter()
289                .any(|event| event.event_type == EventType::Task));
290            assert!(events
291                .iter()
292                .any(|event| event.event_type == EventType::Handoff));
293            assert!(events
294                .iter()
295                .any(|event| event.event_type == EventType::Message));
296        }
297    }
298
299    #[test]
300    fn archives_round_trip_through_json_files() {
301        let source_dir = tempfile::tempdir().unwrap();
302        let file_dir = tempfile::tempdir().unwrap();
303        let source = SqliteEventStore::open(source_dir.path().join("source.db")).unwrap();
304        let event = crate::memory::new_memory(
305            "workspace".into(),
306            "codex".into(),
307            "session".into(),
308            "portable archive".into(),
309        );
310        futures_executor::block_on(source.append(event)).unwrap();
311
312        let archive = futures_executor::block_on(export_archive(&source)).unwrap();
313        let path = file_dir.path().join("mesh.json");
314        write_archive(&path, &archive).unwrap();
315        let read_back = read_archive(&path).unwrap();
316
317        assert_eq!(read_back.event_count, 1);
318        assert_eq!(read_back.events[0].content, "portable archive");
319    }
320}