1use std::path::Path;
8
9use std::fs::File;
10use std::io::{BufRead, BufReader};
11
12use crate::storage::sqlite::SqliteStorage;
13use crate::sync::file::read_jsonl;
14use crate::sync::hash::content_hash;
15use crate::sync::types::{
16 CheckpointRecord, ContextItemRecord, DeletionRecord, EntityStats, ImportStats, IssueRecord,
17 MemoryRecord, MergeStrategy, PlanRecord, SessionRecord, SyncError, SyncRecord, SyncResult,
18 TimeEntryRecord,
19};
20
21pub struct Importer<'a> {
26 storage: &'a mut SqliteStorage,
27 strategy: MergeStrategy,
28}
29
30impl<'a> Importer<'a> {
31 #[must_use]
33 pub fn new(storage: &'a mut SqliteStorage, strategy: MergeStrategy) -> Self {
34 Self { storage, strategy }
35 }
36
37 pub fn import(&mut self, path: &Path) -> SyncResult<ImportStats> {
46 let records = read_jsonl(path)?;
47 let mut stats = ImportStats::default();
48
49 for record in records {
50 match record {
51 SyncRecord::Session(rec) => self.import_session(rec, &mut stats.sessions)?,
52 SyncRecord::Issue(rec) => self.import_issue(rec, &mut stats.issues)?,
53 SyncRecord::ContextItem(rec) => {
54 self.import_context_item(rec, &mut stats.context_items)?;
55 }
56 SyncRecord::Memory(rec) => self.import_memory(rec, &mut stats.memories)?,
57 SyncRecord::Checkpoint(rec) => {
58 self.import_checkpoint(rec, &mut stats.checkpoints)?;
59 }
60 SyncRecord::Plan(rec) => {
61 self.import_plan(rec, &mut stats.plans)?;
62 }
63 SyncRecord::TimeEntry(rec) => {
64 self.import_time_entry(rec, &mut stats.time_entries)?;
65 }
66 }
67 }
68
69 Ok(stats)
70 }
71
72 pub fn import_all(&mut self, dir: &Path) -> SyncResult<ImportStats> {
82 let mut total_stats = ImportStats::default();
83
84 let files = [
86 ("sessions.jsonl", "sessions"),
87 ("issues.jsonl", "issues"),
88 ("context_items.jsonl", "context_items"),
89 ("memories.jsonl", "memories"),
90 ("checkpoints.jsonl", "checkpoints"),
91 ("plans.jsonl", "plans"),
92 ("time_entries.jsonl", "time_entries"),
93 ];
94
95 for (filename, _entity) in files {
96 let path = dir.join(filename);
97 if path.exists() {
98 let stats = self.import(&path)?;
99 merge_stats(&mut total_stats, &stats);
100 }
101 }
102
103 let deletions_path = dir.join("deletions.jsonl");
105 if deletions_path.exists() {
106 self.import_deletions(&deletions_path)?;
107 }
108
109 Ok(total_stats)
110 }
111
112 pub fn import_deletions(&mut self, path: &Path) -> SyncResult<usize> {
121 let file = File::open(path)?;
122 let reader = BufReader::new(file);
123 let mut deleted_count = 0;
124
125 for (line_num, line) in reader.lines().enumerate() {
126 let line = line?;
127 if line.trim().is_empty() {
128 continue;
129 }
130
131 let deletion: DeletionRecord = serde_json::from_str(&line).map_err(|e| {
132 SyncError::InvalidRecord {
133 line: line_num + 1,
134 message: e.to_string(),
135 }
136 })?;
137
138 let entity_type = deletion.entity_type.to_string();
140 let was_deleted = self
141 .storage
142 .apply_deletion(&entity_type, &deletion.entity_id)
143 .map_err(|e| SyncError::Database(e.to_string()))?;
144
145 if was_deleted {
146 deleted_count += 1;
147 }
148 }
149
150 Ok(deleted_count)
151 }
152
153 fn import_session(&mut self, rec: SessionRecord, stats: &mut EntityStats) -> SyncResult<()> {
155 let existing = self
156 .storage
157 .get_session(&rec.data.id)
158 .map_err(|e| SyncError::Database(e.to_string()))?;
159
160 match existing {
161 Some(local) => {
162 let local_hash = content_hash(&local);
164 if local_hash == rec.content_hash {
165 stats.skipped += 1;
167 return Ok(());
168 }
169
170 match self.strategy {
172 MergeStrategy::PreferNewer => {
173 if rec.data.updated_at > local.updated_at {
174 self.storage
175 .upsert_session(&rec.data)
176 .map_err(|e| SyncError::Database(e.to_string()))?;
177 stats.updated += 1;
178 } else {
179 stats.skipped += 1;
180 }
181 }
182 MergeStrategy::PreferLocal => {
183 stats.skipped += 1;
184 }
185 MergeStrategy::PreferExternal => {
186 self.storage
187 .upsert_session(&rec.data)
188 .map_err(|e| SyncError::Database(e.to_string()))?;
189 stats.updated += 1;
190 }
191 }
192 }
193 None => {
194 self.storage
196 .upsert_session(&rec.data)
197 .map_err(|e| SyncError::Database(e.to_string()))?;
198 stats.created += 1;
199 }
200 }
201
202 Ok(())
203 }
204
205 fn import_issue(&mut self, rec: IssueRecord, stats: &mut EntityStats) -> SyncResult<()> {
207 let existing = self
208 .storage
209 .get_issue(&rec.data.id, None)
210 .map_err(|e| SyncError::Database(e.to_string()))?;
211
212 match existing {
213 Some(local) => {
214 let local_hash = content_hash(&local);
215 if local_hash == rec.content_hash {
216 stats.skipped += 1;
217 return Ok(());
218 }
219
220 match self.strategy {
221 MergeStrategy::PreferNewer => {
222 if rec.data.updated_at > local.updated_at {
223 self.storage
224 .upsert_issue(&rec.data)
225 .map_err(|e| SyncError::Database(e.to_string()))?;
226 stats.updated += 1;
227 } else {
228 stats.skipped += 1;
229 }
230 }
231 MergeStrategy::PreferLocal => {
232 stats.skipped += 1;
233 }
234 MergeStrategy::PreferExternal => {
235 self.storage
236 .upsert_issue(&rec.data)
237 .map_err(|e| SyncError::Database(e.to_string()))?;
238 stats.updated += 1;
239 }
240 }
241 }
242 None => {
243 self.storage
244 .upsert_issue(&rec.data)
245 .map_err(|e| SyncError::Database(e.to_string()))?;
246 stats.created += 1;
247 }
248 }
249
250 Ok(())
251 }
252
253 fn import_context_item(
255 &mut self,
256 rec: ContextItemRecord,
257 stats: &mut EntityStats,
258 ) -> SyncResult<()> {
259 let existing = self
260 .storage
261 .get_context_item(&rec.data.id)
262 .map_err(|e| SyncError::Database(e.to_string()))?;
263
264 match existing {
265 Some(local) => {
266 let local_hash = content_hash(&local);
267 if local_hash == rec.content_hash {
268 stats.skipped += 1;
269 return Ok(());
270 }
271
272 match self.strategy {
273 MergeStrategy::PreferNewer => {
274 if rec.data.updated_at > local.updated_at {
275 self.storage
276 .upsert_context_item(&rec.data)
277 .map_err(|e| SyncError::Database(e.to_string()))?;
278 stats.updated += 1;
279 } else {
280 stats.skipped += 1;
281 }
282 }
283 MergeStrategy::PreferLocal => {
284 stats.skipped += 1;
285 }
286 MergeStrategy::PreferExternal => {
287 self.storage
288 .upsert_context_item(&rec.data)
289 .map_err(|e| SyncError::Database(e.to_string()))?;
290 stats.updated += 1;
291 }
292 }
293 }
294 None => {
295 self.storage
296 .upsert_context_item(&rec.data)
297 .map_err(|e| SyncError::Database(e.to_string()))?;
298 stats.created += 1;
299 }
300 }
301
302 Ok(())
303 }
304
305 fn import_memory(&mut self, rec: MemoryRecord, stats: &mut EntityStats) -> SyncResult<()> {
307 self.storage
310 .upsert_memory(&rec.data)
311 .map_err(|e| SyncError::Database(e.to_string()))?;
312 stats.created += 1;
313 Ok(())
314 }
315
316 fn import_checkpoint(
318 &mut self,
319 rec: CheckpointRecord,
320 stats: &mut EntityStats,
321 ) -> SyncResult<()> {
322 let existing = self
323 .storage
324 .get_checkpoint(&rec.data.id)
325 .map_err(|e| SyncError::Database(e.to_string()))?;
326
327 match existing {
328 Some(local) => {
329 let local_hash = content_hash(&local);
330 if local_hash == rec.content_hash {
331 stats.skipped += 1;
332 return Ok(());
333 }
334
335 match self.strategy {
337 MergeStrategy::PreferNewer | MergeStrategy::PreferExternal => {
338 self.storage
339 .upsert_checkpoint(&rec.data)
340 .map_err(|e| SyncError::Database(e.to_string()))?;
341 stats.updated += 1;
342 }
343 MergeStrategy::PreferLocal => {
344 stats.skipped += 1;
345 }
346 }
347 }
348 None => {
349 self.storage
350 .upsert_checkpoint(&rec.data)
351 .map_err(|e| SyncError::Database(e.to_string()))?;
352 stats.created += 1;
353 }
354 }
355
356 Ok(())
357 }
358
359 fn import_plan(&mut self, rec: PlanRecord, stats: &mut EntityStats) -> SyncResult<()> {
361 let existing = self
362 .storage
363 .get_plan(&rec.data.id)
364 .map_err(|e| SyncError::Database(e.to_string()))?;
365
366 match existing {
367 Some(local) => {
368 let local_hash = content_hash(&local);
369 if local_hash == rec.content_hash {
370 stats.skipped += 1;
371 return Ok(());
372 }
373
374 match self.strategy {
375 MergeStrategy::PreferNewer => {
376 if rec.data.updated_at > local.updated_at {
377 self.storage
378 .upsert_plan(&rec.data)
379 .map_err(|e| SyncError::Database(e.to_string()))?;
380 stats.updated += 1;
381 } else {
382 stats.skipped += 1;
383 }
384 }
385 MergeStrategy::PreferLocal => {
386 stats.skipped += 1;
387 }
388 MergeStrategy::PreferExternal => {
389 self.storage
390 .upsert_plan(&rec.data)
391 .map_err(|e| SyncError::Database(e.to_string()))?;
392 stats.updated += 1;
393 }
394 }
395 }
396 None => {
397 self.storage
398 .upsert_plan(&rec.data)
399 .map_err(|e| SyncError::Database(e.to_string()))?;
400 stats.created += 1;
401 }
402 }
403
404 Ok(())
405 }
406
407 fn import_time_entry(
408 &mut self,
409 rec: TimeEntryRecord,
410 stats: &mut EntityStats,
411 ) -> SyncResult<()> {
412 let existing = self
413 .storage
414 .get_time_entry(&rec.data.id, None)
415 .map_err(|e| SyncError::Database(e.to_string()))?;
416
417 match existing {
418 Some(local) => {
419 let local_hash = content_hash(&local);
420 if local_hash == rec.content_hash {
421 stats.skipped += 1;
422 return Ok(());
423 }
424
425 match self.strategy {
426 MergeStrategy::PreferNewer => {
427 if rec.data.updated_at > local.updated_at {
428 self.storage
429 .upsert_time_entry(&rec.data)
430 .map_err(|e| SyncError::Database(e.to_string()))?;
431 stats.updated += 1;
432 } else {
433 stats.skipped += 1;
434 }
435 }
436 MergeStrategy::PreferLocal => {
437 stats.skipped += 1;
438 }
439 MergeStrategy::PreferExternal => {
440 self.storage
441 .upsert_time_entry(&rec.data)
442 .map_err(|e| SyncError::Database(e.to_string()))?;
443 stats.updated += 1;
444 }
445 }
446 }
447 None => {
448 self.storage
449 .upsert_time_entry(&rec.data)
450 .map_err(|e| SyncError::Database(e.to_string()))?;
451 stats.created += 1;
452 }
453 }
454
455 Ok(())
456 }
457}
458
459fn merge_stats(total: &mut ImportStats, add: &ImportStats) {
461 total.sessions.created += add.sessions.created;
462 total.sessions.updated += add.sessions.updated;
463 total.sessions.skipped += add.sessions.skipped;
464 total.sessions.conflicts += add.sessions.conflicts;
465
466 total.issues.created += add.issues.created;
467 total.issues.updated += add.issues.updated;
468 total.issues.skipped += add.issues.skipped;
469 total.issues.conflicts += add.issues.conflicts;
470
471 total.context_items.created += add.context_items.created;
472 total.context_items.updated += add.context_items.updated;
473 total.context_items.skipped += add.context_items.skipped;
474 total.context_items.conflicts += add.context_items.conflicts;
475
476 total.memories.created += add.memories.created;
477 total.memories.updated += add.memories.updated;
478 total.memories.skipped += add.memories.skipped;
479 total.memories.conflicts += add.memories.conflicts;
480
481 total.checkpoints.created += add.checkpoints.created;
482 total.checkpoints.updated += add.checkpoints.updated;
483 total.checkpoints.skipped += add.checkpoints.skipped;
484 total.checkpoints.conflicts += add.checkpoints.conflicts;
485
486 total.plans.created += add.plans.created;
487 total.plans.updated += add.plans.updated;
488 total.plans.skipped += add.plans.skipped;
489 total.plans.conflicts += add.plans.conflicts;
490
491 total.time_entries.created += add.time_entries.created;
492 total.time_entries.updated += add.time_entries.updated;
493 total.time_entries.skipped += add.time_entries.skipped;
494 total.time_entries.conflicts += add.time_entries.conflicts;
495}
496
497#[cfg(test)]
498mod tests {
499 use super::*;
500 use crate::storage::sqlite::Session;
501 use crate::sync::file::write_jsonl;
502 use tempfile::TempDir;
503
504 fn make_session(id: &str, updated_at: i64) -> Session {
505 Session {
506 id: id.to_string(),
507 name: "Test".to_string(),
508 description: None,
509 branch: None,
510 channel: None,
511 project_path: Some("/test".to_string()),
512 status: "active".to_string(),
513 ended_at: None,
514 created_at: 1000,
515 updated_at,
516 }
517 }
518
519 #[test]
520 fn test_import_new_session() {
521 let temp_dir = TempDir::new().unwrap();
522 let db_path = temp_dir.path().join("test.db");
523 let mut storage = SqliteStorage::open(&db_path).unwrap();
524
525 let session = make_session("sess_1", 1000);
527 let record = SyncRecord::Session(SessionRecord {
528 data: session.clone(),
529 content_hash: content_hash(&session),
530 exported_at: "2025-01-20T00:00:00Z".to_string(),
531 });
532 let jsonl_path = temp_dir.path().join("sessions.jsonl");
533 write_jsonl(&jsonl_path, &[record]).unwrap();
534
535 let mut importer = Importer::new(&mut storage, MergeStrategy::PreferNewer);
537 let stats = importer.import(&jsonl_path).unwrap();
538
539 assert_eq!(stats.sessions.created, 1);
540 assert_eq!(stats.sessions.updated, 0);
541
542 let imported = storage.get_session("sess_1").unwrap();
544 assert!(imported.is_some());
545 }
546
547 #[test]
548 fn test_import_prefer_newer() {
549 let temp_dir = TempDir::new().unwrap();
550 let db_path = temp_dir.path().join("test.db");
551 let mut storage = SqliteStorage::open(&db_path).unwrap();
552
553 storage
555 .create_session("sess_1", "Local", None, Some("/test"), None, "test")
556 .unwrap();
557
558 let newer_session = Session {
560 id: "sess_1".to_string(),
561 name: "External".to_string(),
562 description: None,
563 branch: None,
564 channel: None,
565 project_path: Some("/test".to_string()),
566 status: "active".to_string(),
567 ended_at: None,
568 created_at: 1000,
569 updated_at: chrono::Utc::now().timestamp_millis() + 10000, };
571 let record = SyncRecord::Session(SessionRecord {
572 data: newer_session.clone(),
573 content_hash: content_hash(&newer_session),
574 exported_at: "2025-01-20T00:00:00Z".to_string(),
575 });
576 let jsonl_path = temp_dir.path().join("sessions.jsonl");
577 write_jsonl(&jsonl_path, &[record]).unwrap();
578
579 let mut importer = Importer::new(&mut storage, MergeStrategy::PreferNewer);
581 let stats = importer.import(&jsonl_path).unwrap();
582
583 assert_eq!(stats.sessions.updated, 1);
584
585 let imported = storage.get_session("sess_1").unwrap().unwrap();
587 assert_eq!(imported.name, "External");
588 }
589
590 #[test]
591 fn test_import_prefer_local() {
592 let temp_dir = TempDir::new().unwrap();
593 let db_path = temp_dir.path().join("test.db");
594 let mut storage = SqliteStorage::open(&db_path).unwrap();
595
596 storage
598 .create_session("sess_1", "Local", None, Some("/test"), None, "test")
599 .unwrap();
600
601 let external_session = Session {
603 id: "sess_1".to_string(),
604 name: "External".to_string(),
605 description: None,
606 branch: None,
607 channel: None,
608 project_path: Some("/test".to_string()),
609 status: "active".to_string(),
610 ended_at: None,
611 created_at: 1000,
612 updated_at: chrono::Utc::now().timestamp_millis() + 10000,
613 };
614 let record = SyncRecord::Session(SessionRecord {
615 data: external_session.clone(),
616 content_hash: content_hash(&external_session),
617 exported_at: "2025-01-20T00:00:00Z".to_string(),
618 });
619 let jsonl_path = temp_dir.path().join("sessions.jsonl");
620 write_jsonl(&jsonl_path, &[record]).unwrap();
621
622 let mut importer = Importer::new(&mut storage, MergeStrategy::PreferLocal);
624 let stats = importer.import(&jsonl_path).unwrap();
625
626 assert_eq!(stats.sessions.skipped, 1);
627
628 let imported = storage.get_session("sess_1").unwrap().unwrap();
630 assert_eq!(imported.name, "Local");
631 }
632}