1use std::fs;
2use std::path::Path;
3
4use crate::core::{Event, EventFilter, EventStore, Result, ShuttleError};
5use crate::store::SqliteEventStore;
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
10pub struct MeshArchive {
11 pub exported_at: DateTime<Utc>,
12 pub event_count: usize,
13 pub events: Vec<Event>,
14}
15
16#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
17pub struct ImportReport {
18 pub imported: usize,
19 pub skipped_duplicates: usize,
20}
21
22#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
23pub struct SyncReport {
24 pub local_imported: usize,
25 pub peer_imported: usize,
26 pub skipped_duplicates: usize,
27}
28
29pub async fn export_archive(store: &SqliteEventStore) -> Result<MeshArchive> {
30 let mut events = all_events(store).await?;
31 events.sort_by(|left, right| {
32 left.created_at
33 .cmp(&right.created_at)
34 .then(left.id.cmp(&right.id))
35 });
36 Ok(MeshArchive {
37 exported_at: Utc::now(),
38 event_count: events.len(),
39 events,
40 })
41}
42
43pub fn write_archive(path: impl AsRef<Path>, archive: &MeshArchive) -> Result<()> {
44 let contents = serde_json::to_string_pretty(archive)
45 .map_err(|err| ShuttleError::Serialization(err.to_string()))?;
46 fs::write(path, contents).map_err(|err| ShuttleError::Store(err.to_string()))
47}
48
49pub fn read_archive(path: impl AsRef<Path>) -> Result<MeshArchive> {
50 let contents = fs::read_to_string(path).map_err(|err| ShuttleError::Store(err.to_string()))?;
51 let archive = serde_json::from_str(&contents)
52 .map_err(|err| ShuttleError::Serialization(err.to_string()))?;
53 Ok(archive)
54}
55
56pub async fn import_archive(
57 store: &SqliteEventStore,
58 archive: MeshArchive,
59) -> Result<ImportReport> {
60 import_events(store, archive.events).await
61}
62
63pub async fn import_archive_into_workspace(
64 store: &SqliteEventStore,
65 archive: MeshArchive,
66 target_workspace_id: &str,
67) -> Result<ImportReport> {
68 import_events_into_workspace(store, archive.events, Some(target_workspace_id)).await
69}
70
71pub async fn sync_bidirectional(
72 local: &SqliteEventStore,
73 peer: &SqliteEventStore,
74) -> Result<SyncReport> {
75 let local_events = all_events(local).await?;
76 let peer_events = all_events(peer).await?;
77 let local_report = import_events(local, peer_events).await?;
78 let peer_report = import_events(peer, local_events).await?;
79
80 Ok(SyncReport {
81 local_imported: local_report.imported,
82 peer_imported: peer_report.imported,
83 skipped_duplicates: local_report.skipped_duplicates + peer_report.skipped_duplicates,
84 })
85}
86
87pub async fn sync_bidirectional_into_workspaces(
88 local: &SqliteEventStore,
89 local_workspace_id: &str,
90 peer: &SqliteEventStore,
91 peer_workspace_id: Option<&str>,
92) -> Result<SyncReport> {
93 let local_events = all_events(local).await?;
94 let peer_events = all_events(peer).await?;
95 let local_report =
96 import_events_into_workspace(local, peer_events, Some(local_workspace_id)).await?;
97 let peer_report = import_events_into_workspace(peer, local_events, peer_workspace_id).await?;
98
99 Ok(SyncReport {
100 local_imported: local_report.imported,
101 peer_imported: peer_report.imported,
102 skipped_duplicates: local_report.skipped_duplicates + peer_report.skipped_duplicates,
103 })
104}
105
106async fn all_events(store: &SqliteEventStore) -> Result<Vec<Event>> {
107 store
108 .list(EventFilter {
109 limit: Some(u32::MAX),
110 ..EventFilter::default()
111 })
112 .await
113}
114
115async fn import_events(store: &SqliteEventStore, events: Vec<Event>) -> Result<ImportReport> {
116 import_events_into_workspace(store, events, None).await
117}
118
119async fn import_events_into_workspace(
120 store: &SqliteEventStore,
121 events: Vec<Event>,
122 target_workspace_id: Option<&str>,
123) -> Result<ImportReport> {
124 let mut report = ImportReport::default();
125 for event in events {
126 let event = if let Some(workspace_id) = target_workspace_id {
127 event_for_workspace(event, workspace_id)
128 } else {
129 event
130 };
131 if store.append_if_absent(event)? {
132 report.imported += 1;
133 } else {
134 report.skipped_duplicates += 1;
135 }
136 }
137 Ok(report)
138}
139
140fn event_for_workspace(mut event: Event, workspace_id: &str) -> Event {
141 if event.workspace_id != workspace_id {
142 if let Some(metadata) = event.metadata_json.as_object_mut() {
143 metadata.insert(
144 "mesh_source_workspace_id".to_owned(),
145 serde_json::json!(event.workspace_id),
146 );
147 }
148 event.workspace_id = workspace_id.to_owned();
149 }
150 event
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use crate::core::{EventStore, EventType};
157
158 #[test]
159 fn imports_are_idempotent_and_preserve_event_ids() {
160 let source_dir = tempfile::tempdir().unwrap();
161 let target_dir = tempfile::tempdir().unwrap();
162 let source = SqliteEventStore::open(source_dir.path().join("source.db")).unwrap();
163 let target = SqliteEventStore::open(target_dir.path().join("target.db")).unwrap();
164 let event = crate::message::new_message(
165 "workspace".into(),
166 "codex".into(),
167 "session".into(),
168 "claude".into(),
169 "review this branch".into(),
170 );
171 let event_id = event.id;
172 futures_executor::block_on(source.append(event)).unwrap();
173
174 let archive = futures_executor::block_on(export_archive(&source)).unwrap();
175 let first = futures_executor::block_on(import_archive(&target, archive.clone())).unwrap();
176 let second = futures_executor::block_on(import_archive(&target, archive)).unwrap();
177 let events = futures_executor::block_on(target.list(EventFilter {
178 event_type: Some(EventType::Message),
179 ..EventFilter::default()
180 }))
181 .unwrap();
182
183 assert_eq!(first.imported, 1);
184 assert_eq!(first.skipped_duplicates, 0);
185 assert_eq!(second.imported, 0);
186 assert_eq!(second.skipped_duplicates, 1);
187 assert_eq!(events.len(), 1);
188 assert_eq!(events[0].id, event_id);
189 }
190
191 #[test]
192 fn workspace_import_keeps_ids_but_makes_events_visible_locally() {
193 let source_dir = tempfile::tempdir().unwrap();
194 let target_dir = tempfile::tempdir().unwrap();
195 let source = SqliteEventStore::open(source_dir.path().join("source.db")).unwrap();
196 let target = SqliteEventStore::open(target_dir.path().join("target.db")).unwrap();
197 let event = crate::memory::new_memory(
198 "source-workspace".into(),
199 "codex".into(),
200 "session".into(),
201 "shared memory".into(),
202 );
203 let event_id = event.id;
204 futures_executor::block_on(source.append(event)).unwrap();
205
206 let archive = futures_executor::block_on(export_archive(&source)).unwrap();
207 let report = futures_executor::block_on(import_archive_into_workspace(
208 &target,
209 archive,
210 "target-workspace",
211 ))
212 .unwrap();
213 let events = futures_executor::block_on(target.list(EventFilter {
214 workspace_id: Some("target-workspace".into()),
215 ..EventFilter::default()
216 }))
217 .unwrap();
218
219 assert_eq!(report.imported, 1);
220 assert_eq!(events.len(), 1);
221 assert_eq!(events[0].id, event_id);
222 assert_eq!(events[0].workspace_id, "target-workspace");
223 assert_eq!(
224 events[0].metadata_json["mesh_source_workspace_id"],
225 "source-workspace"
226 );
227 }
228
229 #[test]
230 fn bidirectional_sync_resumes_after_each_peer_records_events() {
231 let local_dir = tempfile::tempdir().unwrap();
232 let peer_dir = tempfile::tempdir().unwrap();
233 let local = SqliteEventStore::open(local_dir.path().join("local.db")).unwrap();
234 let peer = SqliteEventStore::open(peer_dir.path().join("peer.db")).unwrap();
235
236 let memory = crate::memory::new_memory(
237 "workspace".into(),
238 "codex".into(),
239 "session".into(),
240 "SQLite is the local store".into(),
241 );
242 let task = crate::task::new_task(
243 "workspace".into(),
244 "claude".into(),
245 "session".into(),
246 "finish mesh sync".into(),
247 );
248 futures_executor::block_on(local.append(memory)).unwrap();
249 futures_executor::block_on(peer.append(task)).unwrap();
250
251 let first = futures_executor::block_on(sync_bidirectional(&local, &peer)).unwrap();
252 assert_eq!(first.local_imported, 1);
253 assert_eq!(first.peer_imported, 1);
254
255 let handoff = crate::task::new_handoff(
256 "workspace".into(),
257 "codex".into(),
258 "session".into(),
259 "claude".into(),
260 "continue after reconnect".into(),
261 );
262 let message = crate::message::new_message(
263 "workspace".into(),
264 "claude".into(),
265 "session".into(),
266 "codex".into(),
267 "synced".into(),
268 );
269 futures_executor::block_on(local.append(handoff)).unwrap();
270 futures_executor::block_on(peer.append(message)).unwrap();
271
272 let second = futures_executor::block_on(sync_bidirectional(&local, &peer)).unwrap();
273 assert_eq!(second.local_imported, 1);
274 assert_eq!(second.peer_imported, 1);
275 assert!(second.skipped_duplicates >= 4);
276
277 for store in [&local, &peer] {
278 let events = futures_executor::block_on(store.list(EventFilter {
279 limit: Some(u32::MAX),
280 ..EventFilter::default()
281 }))
282 .unwrap();
283 assert_eq!(events.len(), 4);
284 assert!(events
285 .iter()
286 .any(|event| event.event_type == EventType::Memory));
287 assert!(events
288 .iter()
289 .any(|event| event.event_type == EventType::Task));
290 assert!(events
291 .iter()
292 .any(|event| event.event_type == EventType::Handoff));
293 assert!(events
294 .iter()
295 .any(|event| event.event_type == EventType::Message));
296 }
297 }
298
299 #[test]
300 fn archives_round_trip_through_json_files() {
301 let source_dir = tempfile::tempdir().unwrap();
302 let file_dir = tempfile::tempdir().unwrap();
303 let source = SqliteEventStore::open(source_dir.path().join("source.db")).unwrap();
304 let event = crate::memory::new_memory(
305 "workspace".into(),
306 "codex".into(),
307 "session".into(),
308 "portable archive".into(),
309 );
310 futures_executor::block_on(source.append(event)).unwrap();
311
312 let archive = futures_executor::block_on(export_archive(&source)).unwrap();
313 let path = file_dir.path().join("mesh.json");
314 write_archive(&path, &archive).unwrap();
315 let read_back = read_archive(&path).unwrap();
316
317 assert_eq!(read_back.event_count, 1);
318 assert_eq!(read_back.events[0].content, "portable archive");
319 }
320}