1use std::path::Path;
8
9use std::fs::File;
10use std::io::{BufRead, BufReader};
11
12use crate::storage::sqlite::SqliteStorage;
13use crate::sync::file::read_jsonl;
14use crate::sync::hash::content_hash;
15use crate::sync::types::{
16 CheckpointRecord, ContextItemRecord, DeletionRecord, EntityStats, ImportStats, IssueRecord,
17 MemoryRecord, MergeStrategy, SessionRecord, SyncError, SyncRecord, SyncResult,
18};
19
20pub struct Importer<'a> {
25 storage: &'a mut SqliteStorage,
26 strategy: MergeStrategy,
27}
28
29impl<'a> Importer<'a> {
30 #[must_use]
32 pub fn new(storage: &'a mut SqliteStorage, strategy: MergeStrategy) -> Self {
33 Self { storage, strategy }
34 }
35
36 pub fn import(&mut self, path: &Path) -> SyncResult<ImportStats> {
45 let records = read_jsonl(path)?;
46 let mut stats = ImportStats::default();
47
48 for record in records {
49 match record {
50 SyncRecord::Session(rec) => self.import_session(rec, &mut stats.sessions)?,
51 SyncRecord::Issue(rec) => self.import_issue(rec, &mut stats.issues)?,
52 SyncRecord::ContextItem(rec) => {
53 self.import_context_item(rec, &mut stats.context_items)?;
54 }
55 SyncRecord::Memory(rec) => self.import_memory(rec, &mut stats.memories)?,
56 SyncRecord::Checkpoint(rec) => {
57 self.import_checkpoint(rec, &mut stats.checkpoints)?;
58 }
59 }
60 }
61
62 Ok(stats)
63 }
64
65 pub fn import_all(&mut self, dir: &Path) -> SyncResult<ImportStats> {
75 let mut total_stats = ImportStats::default();
76
77 let files = [
79 ("sessions.jsonl", "sessions"),
80 ("issues.jsonl", "issues"),
81 ("context_items.jsonl", "context_items"),
82 ("memories.jsonl", "memories"),
83 ("checkpoints.jsonl", "checkpoints"),
84 ];
85
86 for (filename, _entity) in files {
87 let path = dir.join(filename);
88 if path.exists() {
89 let stats = self.import(&path)?;
90 merge_stats(&mut total_stats, &stats);
91 }
92 }
93
94 let deletions_path = dir.join("deletions.jsonl");
96 if deletions_path.exists() {
97 self.import_deletions(&deletions_path)?;
98 }
99
100 Ok(total_stats)
101 }
102
103 pub fn import_deletions(&mut self, path: &Path) -> SyncResult<usize> {
112 let file = File::open(path)?;
113 let reader = BufReader::new(file);
114 let mut deleted_count = 0;
115
116 for (line_num, line) in reader.lines().enumerate() {
117 let line = line?;
118 if line.trim().is_empty() {
119 continue;
120 }
121
122 let deletion: DeletionRecord = serde_json::from_str(&line).map_err(|e| {
123 SyncError::InvalidRecord {
124 line: line_num + 1,
125 message: e.to_string(),
126 }
127 })?;
128
129 let entity_type = deletion.entity_type.to_string();
131 let was_deleted = self
132 .storage
133 .apply_deletion(&entity_type, &deletion.entity_id)
134 .map_err(|e| SyncError::Database(e.to_string()))?;
135
136 if was_deleted {
137 deleted_count += 1;
138 }
139 }
140
141 Ok(deleted_count)
142 }
143
144 fn import_session(&mut self, rec: SessionRecord, stats: &mut EntityStats) -> SyncResult<()> {
146 let existing = self
147 .storage
148 .get_session(&rec.data.id)
149 .map_err(|e| SyncError::Database(e.to_string()))?;
150
151 match existing {
152 Some(local) => {
153 let local_hash = content_hash(&local);
155 if local_hash == rec.content_hash {
156 stats.skipped += 1;
158 return Ok(());
159 }
160
161 match self.strategy {
163 MergeStrategy::PreferNewer => {
164 if rec.data.updated_at > local.updated_at {
165 self.storage
166 .upsert_session(&rec.data)
167 .map_err(|e| SyncError::Database(e.to_string()))?;
168 stats.updated += 1;
169 } else {
170 stats.skipped += 1;
171 }
172 }
173 MergeStrategy::PreferLocal => {
174 stats.skipped += 1;
175 }
176 MergeStrategy::PreferExternal => {
177 self.storage
178 .upsert_session(&rec.data)
179 .map_err(|e| SyncError::Database(e.to_string()))?;
180 stats.updated += 1;
181 }
182 }
183 }
184 None => {
185 self.storage
187 .upsert_session(&rec.data)
188 .map_err(|e| SyncError::Database(e.to_string()))?;
189 stats.created += 1;
190 }
191 }
192
193 Ok(())
194 }
195
196 fn import_issue(&mut self, rec: IssueRecord, stats: &mut EntityStats) -> SyncResult<()> {
198 let existing = self
199 .storage
200 .get_issue(&rec.data.id, None)
201 .map_err(|e| SyncError::Database(e.to_string()))?;
202
203 match existing {
204 Some(local) => {
205 let local_hash = content_hash(&local);
206 if local_hash == rec.content_hash {
207 stats.skipped += 1;
208 return Ok(());
209 }
210
211 match self.strategy {
212 MergeStrategy::PreferNewer => {
213 if rec.data.updated_at > local.updated_at {
214 self.storage
215 .upsert_issue(&rec.data)
216 .map_err(|e| SyncError::Database(e.to_string()))?;
217 stats.updated += 1;
218 } else {
219 stats.skipped += 1;
220 }
221 }
222 MergeStrategy::PreferLocal => {
223 stats.skipped += 1;
224 }
225 MergeStrategy::PreferExternal => {
226 self.storage
227 .upsert_issue(&rec.data)
228 .map_err(|e| SyncError::Database(e.to_string()))?;
229 stats.updated += 1;
230 }
231 }
232 }
233 None => {
234 self.storage
235 .upsert_issue(&rec.data)
236 .map_err(|e| SyncError::Database(e.to_string()))?;
237 stats.created += 1;
238 }
239 }
240
241 Ok(())
242 }
243
244 fn import_context_item(
246 &mut self,
247 rec: ContextItemRecord,
248 stats: &mut EntityStats,
249 ) -> SyncResult<()> {
250 let existing = self
251 .storage
252 .get_context_item(&rec.data.id)
253 .map_err(|e| SyncError::Database(e.to_string()))?;
254
255 match existing {
256 Some(local) => {
257 let local_hash = content_hash(&local);
258 if local_hash == rec.content_hash {
259 stats.skipped += 1;
260 return Ok(());
261 }
262
263 match self.strategy {
264 MergeStrategy::PreferNewer => {
265 if rec.data.updated_at > local.updated_at {
266 self.storage
267 .upsert_context_item(&rec.data)
268 .map_err(|e| SyncError::Database(e.to_string()))?;
269 stats.updated += 1;
270 } else {
271 stats.skipped += 1;
272 }
273 }
274 MergeStrategy::PreferLocal => {
275 stats.skipped += 1;
276 }
277 MergeStrategy::PreferExternal => {
278 self.storage
279 .upsert_context_item(&rec.data)
280 .map_err(|e| SyncError::Database(e.to_string()))?;
281 stats.updated += 1;
282 }
283 }
284 }
285 None => {
286 self.storage
287 .upsert_context_item(&rec.data)
288 .map_err(|e| SyncError::Database(e.to_string()))?;
289 stats.created += 1;
290 }
291 }
292
293 Ok(())
294 }
295
296 fn import_memory(&mut self, rec: MemoryRecord, stats: &mut EntityStats) -> SyncResult<()> {
298 self.storage
301 .upsert_memory(&rec.data)
302 .map_err(|e| SyncError::Database(e.to_string()))?;
303 stats.created += 1;
304 Ok(())
305 }
306
307 fn import_checkpoint(
309 &mut self,
310 rec: CheckpointRecord,
311 stats: &mut EntityStats,
312 ) -> SyncResult<()> {
313 let existing = self
314 .storage
315 .get_checkpoint(&rec.data.id)
316 .map_err(|e| SyncError::Database(e.to_string()))?;
317
318 match existing {
319 Some(local) => {
320 let local_hash = content_hash(&local);
321 if local_hash == rec.content_hash {
322 stats.skipped += 1;
323 return Ok(());
324 }
325
326 match self.strategy {
328 MergeStrategy::PreferNewer | MergeStrategy::PreferExternal => {
329 self.storage
330 .upsert_checkpoint(&rec.data)
331 .map_err(|e| SyncError::Database(e.to_string()))?;
332 stats.updated += 1;
333 }
334 MergeStrategy::PreferLocal => {
335 stats.skipped += 1;
336 }
337 }
338 }
339 None => {
340 self.storage
341 .upsert_checkpoint(&rec.data)
342 .map_err(|e| SyncError::Database(e.to_string()))?;
343 stats.created += 1;
344 }
345 }
346
347 Ok(())
348 }
349}
350
351fn merge_stats(total: &mut ImportStats, add: &ImportStats) {
353 total.sessions.created += add.sessions.created;
354 total.sessions.updated += add.sessions.updated;
355 total.sessions.skipped += add.sessions.skipped;
356 total.sessions.conflicts += add.sessions.conflicts;
357
358 total.issues.created += add.issues.created;
359 total.issues.updated += add.issues.updated;
360 total.issues.skipped += add.issues.skipped;
361 total.issues.conflicts += add.issues.conflicts;
362
363 total.context_items.created += add.context_items.created;
364 total.context_items.updated += add.context_items.updated;
365 total.context_items.skipped += add.context_items.skipped;
366 total.context_items.conflicts += add.context_items.conflicts;
367
368 total.memories.created += add.memories.created;
369 total.memories.updated += add.memories.updated;
370 total.memories.skipped += add.memories.skipped;
371 total.memories.conflicts += add.memories.conflicts;
372
373 total.checkpoints.created += add.checkpoints.created;
374 total.checkpoints.updated += add.checkpoints.updated;
375 total.checkpoints.skipped += add.checkpoints.skipped;
376 total.checkpoints.conflicts += add.checkpoints.conflicts;
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382 use crate::storage::sqlite::Session;
383 use crate::sync::file::write_jsonl;
384 use tempfile::TempDir;
385
386 fn make_session(id: &str, updated_at: i64) -> Session {
387 Session {
388 id: id.to_string(),
389 name: "Test".to_string(),
390 description: None,
391 branch: None,
392 channel: None,
393 project_path: Some("/test".to_string()),
394 status: "active".to_string(),
395 ended_at: None,
396 created_at: 1000,
397 updated_at,
398 }
399 }
400
401 #[test]
402 fn test_import_new_session() {
403 let temp_dir = TempDir::new().unwrap();
404 let db_path = temp_dir.path().join("test.db");
405 let mut storage = SqliteStorage::open(&db_path).unwrap();
406
407 let session = make_session("sess_1", 1000);
409 let record = SyncRecord::Session(SessionRecord {
410 data: session.clone(),
411 content_hash: content_hash(&session),
412 exported_at: "2025-01-20T00:00:00Z".to_string(),
413 });
414 let jsonl_path = temp_dir.path().join("sessions.jsonl");
415 write_jsonl(&jsonl_path, &[record]).unwrap();
416
417 let mut importer = Importer::new(&mut storage, MergeStrategy::PreferNewer);
419 let stats = importer.import(&jsonl_path).unwrap();
420
421 assert_eq!(stats.sessions.created, 1);
422 assert_eq!(stats.sessions.updated, 0);
423
424 let imported = storage.get_session("sess_1").unwrap();
426 assert!(imported.is_some());
427 }
428
429 #[test]
430 fn test_import_prefer_newer() {
431 let temp_dir = TempDir::new().unwrap();
432 let db_path = temp_dir.path().join("test.db");
433 let mut storage = SqliteStorage::open(&db_path).unwrap();
434
435 storage
437 .create_session("sess_1", "Local", None, Some("/test"), None, "test")
438 .unwrap();
439
440 let newer_session = Session {
442 id: "sess_1".to_string(),
443 name: "External".to_string(),
444 description: None,
445 branch: None,
446 channel: None,
447 project_path: Some("/test".to_string()),
448 status: "active".to_string(),
449 ended_at: None,
450 created_at: 1000,
451 updated_at: chrono::Utc::now().timestamp_millis() + 10000, };
453 let record = SyncRecord::Session(SessionRecord {
454 data: newer_session.clone(),
455 content_hash: content_hash(&newer_session),
456 exported_at: "2025-01-20T00:00:00Z".to_string(),
457 });
458 let jsonl_path = temp_dir.path().join("sessions.jsonl");
459 write_jsonl(&jsonl_path, &[record]).unwrap();
460
461 let mut importer = Importer::new(&mut storage, MergeStrategy::PreferNewer);
463 let stats = importer.import(&jsonl_path).unwrap();
464
465 assert_eq!(stats.sessions.updated, 1);
466
467 let imported = storage.get_session("sess_1").unwrap().unwrap();
469 assert_eq!(imported.name, "External");
470 }
471
472 #[test]
473 fn test_import_prefer_local() {
474 let temp_dir = TempDir::new().unwrap();
475 let db_path = temp_dir.path().join("test.db");
476 let mut storage = SqliteStorage::open(&db_path).unwrap();
477
478 storage
480 .create_session("sess_1", "Local", None, Some("/test"), None, "test")
481 .unwrap();
482
483 let external_session = Session {
485 id: "sess_1".to_string(),
486 name: "External".to_string(),
487 description: None,
488 branch: None,
489 channel: None,
490 project_path: Some("/test".to_string()),
491 status: "active".to_string(),
492 ended_at: None,
493 created_at: 1000,
494 updated_at: chrono::Utc::now().timestamp_millis() + 10000,
495 };
496 let record = SyncRecord::Session(SessionRecord {
497 data: external_session.clone(),
498 content_hash: content_hash(&external_session),
499 exported_at: "2025-01-20T00:00:00Z".to_string(),
500 });
501 let jsonl_path = temp_dir.path().join("sessions.jsonl");
502 write_jsonl(&jsonl_path, &[record]).unwrap();
503
504 let mut importer = Importer::new(&mut storage, MergeStrategy::PreferLocal);
506 let stats = importer.import(&jsonl_path).unwrap();
507
508 assert_eq!(stats.sessions.skipped, 1);
509
510 let imported = storage.get_session("sess_1").unwrap().unwrap();
512 assert_eq!(imported.name, "Local");
513 }
514}