Skip to main content

sc/sync/
import.rs

1//! JSONL import functionality.
2//!
3//! This module handles importing records from JSONL files with merge support.
4//! It uses content hashing and timestamps to resolve conflicts between
5//! local and external records.
6
7use std::path::Path;
8
9use std::fs::File;
10use std::io::{BufRead, BufReader};
11
12use crate::storage::sqlite::SqliteStorage;
13use crate::sync::file::read_jsonl;
14use crate::sync::hash::content_hash;
15use crate::sync::types::{
16    CheckpointRecord, ContextItemRecord, DeletionRecord, EntityStats, ImportStats, IssueRecord,
17    MemoryRecord, MergeStrategy, PlanRecord, SessionRecord, SyncError, SyncRecord, SyncResult,
18    TimeEntryRecord,
19};
20
21/// Importer for JSONL sync files.
22///
23/// The importer reads records from JSONL files and merges them into the
24/// local database using the specified merge strategy.
25pub struct Importer<'a> {
26    storage: &'a mut SqliteStorage,
27    strategy: MergeStrategy,
28}
29
30impl<'a> Importer<'a> {
31    /// Create a new importer with the specified merge strategy.
32    #[must_use]
33    pub fn new(storage: &'a mut SqliteStorage, strategy: MergeStrategy) -> Self {
34        Self { storage, strategy }
35    }
36
37    /// Import records from a JSONL file.
38    ///
39    /// Each line in the file is parsed and merged into the local database.
40    /// The merge strategy determines how conflicts are resolved.
41    ///
42    /// # Errors
43    ///
44    /// Returns an error if the file cannot be read or records are invalid.
45    pub fn import(&mut self, path: &Path) -> SyncResult<ImportStats> {
46        let records = read_jsonl(path)?;
47        let mut stats = ImportStats::default();
48
49        for record in records {
50            match record {
51                SyncRecord::Session(rec) => self.import_session(rec, &mut stats.sessions)?,
52                SyncRecord::Issue(rec) => self.import_issue(rec, &mut stats.issues)?,
53                SyncRecord::ContextItem(rec) => {
54                    self.import_context_item(rec, &mut stats.context_items)?;
55                }
56                SyncRecord::Memory(rec) => self.import_memory(rec, &mut stats.memories)?,
57                SyncRecord::Checkpoint(rec) => {
58                    self.import_checkpoint(rec, &mut stats.checkpoints)?;
59                }
60                SyncRecord::Plan(rec) => {
61                    self.import_plan(rec, &mut stats.plans)?;
62                }
63                SyncRecord::TimeEntry(rec) => {
64                    self.import_time_entry(rec, &mut stats.time_entries)?;
65                }
66            }
67        }
68
69        Ok(stats)
70    }
71
72    /// Import all JSONL files from a directory.
73    ///
74    /// Imports files in order: sessions, issues, context_items, memories, checkpoints.
75    /// Then applies deletions last (to handle records that were created then deleted).
76    /// Files that don't exist are skipped.
77    ///
78    /// # Errors
79    ///
80    /// Returns an error if any file cannot be read.
81    pub fn import_all(&mut self, dir: &Path) -> SyncResult<ImportStats> {
82        let mut total_stats = ImportStats::default();
83
84        // Import data records in dependency order
85        let files = [
86            ("sessions.jsonl", "sessions"),
87            ("issues.jsonl", "issues"),
88            ("context_items.jsonl", "context_items"),
89            ("memories.jsonl", "memories"),
90            ("checkpoints.jsonl", "checkpoints"),
91            ("plans.jsonl", "plans"),
92            ("time_entries.jsonl", "time_entries"),
93        ];
94
95        for (filename, _entity) in files {
96            let path = dir.join(filename);
97            if path.exists() {
98                let stats = self.import(&path)?;
99                merge_stats(&mut total_stats, &stats);
100            }
101        }
102
103        // Apply deletions last (after importing any records that might be deleted)
104        let deletions_path = dir.join("deletions.jsonl");
105        if deletions_path.exists() {
106            self.import_deletions(&deletions_path)?;
107        }
108
109        Ok(total_stats)
110    }
111
112    /// Import deletions from a JSONL file.
113    ///
114    /// Deletions are applied to the local database by removing the specified entities.
115    /// This ensures that records deleted on one machine are deleted on all machines.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if the file cannot be read or deletions cannot be applied.
120    pub fn import_deletions(&mut self, path: &Path) -> SyncResult<usize> {
121        let file = File::open(path)?;
122        let reader = BufReader::new(file);
123        let mut deleted_count = 0;
124
125        for (line_num, line) in reader.lines().enumerate() {
126            let line = line?;
127            if line.trim().is_empty() {
128                continue;
129            }
130
131            let deletion: DeletionRecord = serde_json::from_str(&line).map_err(|e| {
132                SyncError::InvalidRecord {
133                    line: line_num + 1,
134                    message: e.to_string(),
135                }
136            })?;
137
138            // Apply the deletion
139            let entity_type = deletion.entity_type.to_string();
140            let was_deleted = self
141                .storage
142                .apply_deletion(&entity_type, &deletion.entity_id)
143                .map_err(|e| SyncError::Database(e.to_string()))?;
144
145            if was_deleted {
146                deleted_count += 1;
147            }
148        }
149
150        Ok(deleted_count)
151    }
152
153    /// Import a session record with merge.
154    fn import_session(&mut self, rec: SessionRecord, stats: &mut EntityStats) -> SyncResult<()> {
155        let existing = self
156            .storage
157            .get_session(&rec.data.id)
158            .map_err(|e| SyncError::Database(e.to_string()))?;
159
160        match existing {
161            Some(local) => {
162                // Compare content hashes
163                let local_hash = content_hash(&local);
164                if local_hash == rec.content_hash {
165                    // No change
166                    stats.skipped += 1;
167                    return Ok(());
168                }
169
170                // Apply merge strategy
171                match self.strategy {
172                    MergeStrategy::PreferNewer => {
173                        if rec.data.updated_at > local.updated_at {
174                            self.storage
175                                .upsert_session(&rec.data)
176                                .map_err(|e| SyncError::Database(e.to_string()))?;
177                            stats.updated += 1;
178                        } else {
179                            stats.skipped += 1;
180                        }
181                    }
182                    MergeStrategy::PreferLocal => {
183                        stats.skipped += 1;
184                    }
185                    MergeStrategy::PreferExternal => {
186                        self.storage
187                            .upsert_session(&rec.data)
188                            .map_err(|e| SyncError::Database(e.to_string()))?;
189                        stats.updated += 1;
190                    }
191                }
192            }
193            None => {
194                // New record
195                self.storage
196                    .upsert_session(&rec.data)
197                    .map_err(|e| SyncError::Database(e.to_string()))?;
198                stats.created += 1;
199            }
200        }
201
202        Ok(())
203    }
204
205    /// Import an issue record with merge.
206    fn import_issue(&mut self, rec: IssueRecord, stats: &mut EntityStats) -> SyncResult<()> {
207        let existing = self
208            .storage
209            .get_issue(&rec.data.id, None)
210            .map_err(|e| SyncError::Database(e.to_string()))?;
211
212        match existing {
213            Some(local) => {
214                let local_hash = content_hash(&local);
215                if local_hash == rec.content_hash {
216                    stats.skipped += 1;
217                    return Ok(());
218                }
219
220                match self.strategy {
221                    MergeStrategy::PreferNewer => {
222                        if rec.data.updated_at > local.updated_at {
223                            self.storage
224                                .upsert_issue(&rec.data)
225                                .map_err(|e| SyncError::Database(e.to_string()))?;
226                            stats.updated += 1;
227                        } else {
228                            stats.skipped += 1;
229                        }
230                    }
231                    MergeStrategy::PreferLocal => {
232                        stats.skipped += 1;
233                    }
234                    MergeStrategy::PreferExternal => {
235                        self.storage
236                            .upsert_issue(&rec.data)
237                            .map_err(|e| SyncError::Database(e.to_string()))?;
238                        stats.updated += 1;
239                    }
240                }
241            }
242            None => {
243                self.storage
244                    .upsert_issue(&rec.data)
245                    .map_err(|e| SyncError::Database(e.to_string()))?;
246                stats.created += 1;
247            }
248        }
249
250        Ok(())
251    }
252
253    /// Import a context item record with merge.
254    fn import_context_item(
255        &mut self,
256        rec: ContextItemRecord,
257        stats: &mut EntityStats,
258    ) -> SyncResult<()> {
259        let existing = self
260            .storage
261            .get_context_item(&rec.data.id)
262            .map_err(|e| SyncError::Database(e.to_string()))?;
263
264        match existing {
265            Some(local) => {
266                let local_hash = content_hash(&local);
267                if local_hash == rec.content_hash {
268                    stats.skipped += 1;
269                    return Ok(());
270                }
271
272                match self.strategy {
273                    MergeStrategy::PreferNewer => {
274                        if rec.data.updated_at > local.updated_at {
275                            self.storage
276                                .upsert_context_item(&rec.data)
277                                .map_err(|e| SyncError::Database(e.to_string()))?;
278                            stats.updated += 1;
279                        } else {
280                            stats.skipped += 1;
281                        }
282                    }
283                    MergeStrategy::PreferLocal => {
284                        stats.skipped += 1;
285                    }
286                    MergeStrategy::PreferExternal => {
287                        self.storage
288                            .upsert_context_item(&rec.data)
289                            .map_err(|e| SyncError::Database(e.to_string()))?;
290                        stats.updated += 1;
291                    }
292                }
293            }
294            None => {
295                self.storage
296                    .upsert_context_item(&rec.data)
297                    .map_err(|e| SyncError::Database(e.to_string()))?;
298                stats.created += 1;
299            }
300        }
301
302        Ok(())
303    }
304
305    /// Import a memory record with merge.
306    fn import_memory(&mut self, rec: MemoryRecord, stats: &mut EntityStats) -> SyncResult<()> {
307        // Memory items don't have a get_by_id, so we always upsert
308        // The ON CONFLICT handles deduplication by (project_path, key)
309        self.storage
310            .upsert_memory(&rec.data)
311            .map_err(|e| SyncError::Database(e.to_string()))?;
312        stats.created += 1;
313        Ok(())
314    }
315
316    /// Import a checkpoint record with merge.
317    fn import_checkpoint(
318        &mut self,
319        rec: CheckpointRecord,
320        stats: &mut EntityStats,
321    ) -> SyncResult<()> {
322        let existing = self
323            .storage
324            .get_checkpoint(&rec.data.id)
325            .map_err(|e| SyncError::Database(e.to_string()))?;
326
327        match existing {
328            Some(local) => {
329                let local_hash = content_hash(&local);
330                if local_hash == rec.content_hash {
331                    stats.skipped += 1;
332                    return Ok(());
333                }
334
335                // Checkpoints are immutable in nature, but we allow updates
336                match self.strategy {
337                    MergeStrategy::PreferNewer | MergeStrategy::PreferExternal => {
338                        self.storage
339                            .upsert_checkpoint(&rec.data)
340                            .map_err(|e| SyncError::Database(e.to_string()))?;
341                        stats.updated += 1;
342                    }
343                    MergeStrategy::PreferLocal => {
344                        stats.skipped += 1;
345                    }
346                }
347            }
348            None => {
349                self.storage
350                    .upsert_checkpoint(&rec.data)
351                    .map_err(|e| SyncError::Database(e.to_string()))?;
352                stats.created += 1;
353            }
354        }
355
356        Ok(())
357    }
358
359    /// Import a plan record with merge.
360    fn import_plan(&mut self, rec: PlanRecord, stats: &mut EntityStats) -> SyncResult<()> {
361        let existing = self
362            .storage
363            .get_plan(&rec.data.id)
364            .map_err(|e| SyncError::Database(e.to_string()))?;
365
366        match existing {
367            Some(local) => {
368                let local_hash = content_hash(&local);
369                if local_hash == rec.content_hash {
370                    stats.skipped += 1;
371                    return Ok(());
372                }
373
374                match self.strategy {
375                    MergeStrategy::PreferNewer => {
376                        if rec.data.updated_at > local.updated_at {
377                            self.storage
378                                .upsert_plan(&rec.data)
379                                .map_err(|e| SyncError::Database(e.to_string()))?;
380                            stats.updated += 1;
381                        } else {
382                            stats.skipped += 1;
383                        }
384                    }
385                    MergeStrategy::PreferLocal => {
386                        stats.skipped += 1;
387                    }
388                    MergeStrategy::PreferExternal => {
389                        self.storage
390                            .upsert_plan(&rec.data)
391                            .map_err(|e| SyncError::Database(e.to_string()))?;
392                        stats.updated += 1;
393                    }
394                }
395            }
396            None => {
397                self.storage
398                    .upsert_plan(&rec.data)
399                    .map_err(|e| SyncError::Database(e.to_string()))?;
400                stats.created += 1;
401            }
402        }
403
404        Ok(())
405    }
406
407    fn import_time_entry(
408        &mut self,
409        rec: TimeEntryRecord,
410        stats: &mut EntityStats,
411    ) -> SyncResult<()> {
412        let existing = self
413            .storage
414            .get_time_entry(&rec.data.id, None)
415            .map_err(|e| SyncError::Database(e.to_string()))?;
416
417        match existing {
418            Some(local) => {
419                let local_hash = content_hash(&local);
420                if local_hash == rec.content_hash {
421                    stats.skipped += 1;
422                    return Ok(());
423                }
424
425                match self.strategy {
426                    MergeStrategy::PreferNewer => {
427                        if rec.data.updated_at > local.updated_at {
428                            self.storage
429                                .upsert_time_entry(&rec.data)
430                                .map_err(|e| SyncError::Database(e.to_string()))?;
431                            stats.updated += 1;
432                        } else {
433                            stats.skipped += 1;
434                        }
435                    }
436                    MergeStrategy::PreferLocal => {
437                        stats.skipped += 1;
438                    }
439                    MergeStrategy::PreferExternal => {
440                        self.storage
441                            .upsert_time_entry(&rec.data)
442                            .map_err(|e| SyncError::Database(e.to_string()))?;
443                        stats.updated += 1;
444                    }
445                }
446            }
447            None => {
448                self.storage
449                    .upsert_time_entry(&rec.data)
450                    .map_err(|e| SyncError::Database(e.to_string()))?;
451                stats.created += 1;
452            }
453        }
454
455        Ok(())
456    }
457}
458
459/// Merge import stats from one operation into accumulated stats.
460fn merge_stats(total: &mut ImportStats, add: &ImportStats) {
461    total.sessions.created += add.sessions.created;
462    total.sessions.updated += add.sessions.updated;
463    total.sessions.skipped += add.sessions.skipped;
464    total.sessions.conflicts += add.sessions.conflicts;
465
466    total.issues.created += add.issues.created;
467    total.issues.updated += add.issues.updated;
468    total.issues.skipped += add.issues.skipped;
469    total.issues.conflicts += add.issues.conflicts;
470
471    total.context_items.created += add.context_items.created;
472    total.context_items.updated += add.context_items.updated;
473    total.context_items.skipped += add.context_items.skipped;
474    total.context_items.conflicts += add.context_items.conflicts;
475
476    total.memories.created += add.memories.created;
477    total.memories.updated += add.memories.updated;
478    total.memories.skipped += add.memories.skipped;
479    total.memories.conflicts += add.memories.conflicts;
480
481    total.checkpoints.created += add.checkpoints.created;
482    total.checkpoints.updated += add.checkpoints.updated;
483    total.checkpoints.skipped += add.checkpoints.skipped;
484    total.checkpoints.conflicts += add.checkpoints.conflicts;
485
486    total.plans.created += add.plans.created;
487    total.plans.updated += add.plans.updated;
488    total.plans.skipped += add.plans.skipped;
489    total.plans.conflicts += add.plans.conflicts;
490
491    total.time_entries.created += add.time_entries.created;
492    total.time_entries.updated += add.time_entries.updated;
493    total.time_entries.skipped += add.time_entries.skipped;
494    total.time_entries.conflicts += add.time_entries.conflicts;
495}
496
497#[cfg(test)]
498mod tests {
499    use super::*;
500    use crate::storage::sqlite::Session;
501    use crate::sync::file::write_jsonl;
502    use tempfile::TempDir;
503
504    fn make_session(id: &str, updated_at: i64) -> Session {
505        Session {
506            id: id.to_string(),
507            name: "Test".to_string(),
508            description: None,
509            branch: None,
510            channel: None,
511            project_path: Some("/test".to_string()),
512            status: "active".to_string(),
513            ended_at: None,
514            created_at: 1000,
515            updated_at,
516        }
517    }
518
519    #[test]
520    fn test_import_new_session() {
521        let temp_dir = TempDir::new().unwrap();
522        let db_path = temp_dir.path().join("test.db");
523        let mut storage = SqliteStorage::open(&db_path).unwrap();
524
525        // Create JSONL file with session
526        let session = make_session("sess_1", 1000);
527        let record = SyncRecord::Session(SessionRecord {
528            data: session.clone(),
529            content_hash: content_hash(&session),
530            exported_at: "2025-01-20T00:00:00Z".to_string(),
531        });
532        let jsonl_path = temp_dir.path().join("sessions.jsonl");
533        write_jsonl(&jsonl_path, &[record]).unwrap();
534
535        // Import
536        let mut importer = Importer::new(&mut storage, MergeStrategy::PreferNewer);
537        let stats = importer.import(&jsonl_path).unwrap();
538
539        assert_eq!(stats.sessions.created, 1);
540        assert_eq!(stats.sessions.updated, 0);
541
542        // Verify session exists
543        let imported = storage.get_session("sess_1").unwrap();
544        assert!(imported.is_some());
545    }
546
547    #[test]
548    fn test_import_prefer_newer() {
549        let temp_dir = TempDir::new().unwrap();
550        let db_path = temp_dir.path().join("test.db");
551        let mut storage = SqliteStorage::open(&db_path).unwrap();
552
553        // Create local session with older timestamp
554        storage
555            .create_session("sess_1", "Local", None, Some("/test"), None, "test")
556            .unwrap();
557
558        // Create JSONL with newer session
559        let newer_session = Session {
560            id: "sess_1".to_string(),
561            name: "External".to_string(),
562            description: None,
563            branch: None,
564            channel: None,
565            project_path: Some("/test".to_string()),
566            status: "active".to_string(),
567            ended_at: None,
568            created_at: 1000,
569            updated_at: chrono::Utc::now().timestamp_millis() + 10000, // Future timestamp
570        };
571        let record = SyncRecord::Session(SessionRecord {
572            data: newer_session.clone(),
573            content_hash: content_hash(&newer_session),
574            exported_at: "2025-01-20T00:00:00Z".to_string(),
575        });
576        let jsonl_path = temp_dir.path().join("sessions.jsonl");
577        write_jsonl(&jsonl_path, &[record]).unwrap();
578
579        // Import with PreferNewer
580        let mut importer = Importer::new(&mut storage, MergeStrategy::PreferNewer);
581        let stats = importer.import(&jsonl_path).unwrap();
582
583        assert_eq!(stats.sessions.updated, 1);
584
585        // Verify name was updated
586        let imported = storage.get_session("sess_1").unwrap().unwrap();
587        assert_eq!(imported.name, "External");
588    }
589
590    #[test]
591    fn test_import_prefer_local() {
592        let temp_dir = TempDir::new().unwrap();
593        let db_path = temp_dir.path().join("test.db");
594        let mut storage = SqliteStorage::open(&db_path).unwrap();
595
596        // Create local session
597        storage
598            .create_session("sess_1", "Local", None, Some("/test"), None, "test")
599            .unwrap();
600
601        // Create JSONL with different session
602        let external_session = Session {
603            id: "sess_1".to_string(),
604            name: "External".to_string(),
605            description: None,
606            branch: None,
607            channel: None,
608            project_path: Some("/test".to_string()),
609            status: "active".to_string(),
610            ended_at: None,
611            created_at: 1000,
612            updated_at: chrono::Utc::now().timestamp_millis() + 10000,
613        };
614        let record = SyncRecord::Session(SessionRecord {
615            data: external_session.clone(),
616            content_hash: content_hash(&external_session),
617            exported_at: "2025-01-20T00:00:00Z".to_string(),
618        });
619        let jsonl_path = temp_dir.path().join("sessions.jsonl");
620        write_jsonl(&jsonl_path, &[record]).unwrap();
621
622        // Import with PreferLocal
623        let mut importer = Importer::new(&mut storage, MergeStrategy::PreferLocal);
624        let stats = importer.import(&jsonl_path).unwrap();
625
626        assert_eq!(stats.sessions.skipped, 1);
627
628        // Verify name was NOT updated
629        let imported = storage.get_session("sess_1").unwrap().unwrap();
630        assert_eq!(imported.name, "Local");
631    }
632}