Skip to main content

sc/sync/
import.rs

1//! JSONL import functionality.
2//!
3//! This module handles importing records from JSONL files with merge support.
4//! It uses content hashing and timestamps to resolve conflicts between
5//! local and external records.
6
7use std::path::Path;
8
9use std::fs::File;
10use std::io::{BufRead, BufReader};
11
12use crate::storage::sqlite::SqliteStorage;
13use crate::sync::file::read_jsonl;
14use crate::sync::hash::content_hash;
15use crate::sync::types::{
16    CheckpointRecord, ContextItemRecord, DeletionRecord, EntityStats, ImportStats, IssueRecord,
17    MemoryRecord, MergeStrategy, PlanRecord, SessionRecord, SyncError, SyncRecord, SyncResult,
18};
19
20/// Importer for JSONL sync files.
21///
22/// The importer reads records from JSONL files and merges them into the
23/// local database using the specified merge strategy.
24pub struct Importer<'a> {
25    storage: &'a mut SqliteStorage,
26    strategy: MergeStrategy,
27}
28
29impl<'a> Importer<'a> {
30    /// Create a new importer with the specified merge strategy.
31    #[must_use]
32    pub fn new(storage: &'a mut SqliteStorage, strategy: MergeStrategy) -> Self {
33        Self { storage, strategy }
34    }
35
36    /// Import records from a JSONL file.
37    ///
38    /// Each line in the file is parsed and merged into the local database.
39    /// The merge strategy determines how conflicts are resolved.
40    ///
41    /// # Errors
42    ///
43    /// Returns an error if the file cannot be read or records are invalid.
44    pub fn import(&mut self, path: &Path) -> SyncResult<ImportStats> {
45        let records = read_jsonl(path)?;
46        let mut stats = ImportStats::default();
47
48        for record in records {
49            match record {
50                SyncRecord::Session(rec) => self.import_session(rec, &mut stats.sessions)?,
51                SyncRecord::Issue(rec) => self.import_issue(rec, &mut stats.issues)?,
52                SyncRecord::ContextItem(rec) => {
53                    self.import_context_item(rec, &mut stats.context_items)?;
54                }
55                SyncRecord::Memory(rec) => self.import_memory(rec, &mut stats.memories)?,
56                SyncRecord::Checkpoint(rec) => {
57                    self.import_checkpoint(rec, &mut stats.checkpoints)?;
58                }
59                SyncRecord::Plan(rec) => {
60                    self.import_plan(rec, &mut stats.plans)?;
61                }
62            }
63        }
64
65        Ok(stats)
66    }
67
68    /// Import all JSONL files from a directory.
69    ///
70    /// Imports files in order: sessions, issues, context_items, memories, checkpoints.
71    /// Then applies deletions last (to handle records that were created then deleted).
72    /// Files that don't exist are skipped.
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if any file cannot be read.
77    pub fn import_all(&mut self, dir: &Path) -> SyncResult<ImportStats> {
78        let mut total_stats = ImportStats::default();
79
80        // Import data records in dependency order
81        let files = [
82            ("sessions.jsonl", "sessions"),
83            ("issues.jsonl", "issues"),
84            ("context_items.jsonl", "context_items"),
85            ("memories.jsonl", "memories"),
86            ("checkpoints.jsonl", "checkpoints"),
87            ("plans.jsonl", "plans"),
88        ];
89
90        for (filename, _entity) in files {
91            let path = dir.join(filename);
92            if path.exists() {
93                let stats = self.import(&path)?;
94                merge_stats(&mut total_stats, &stats);
95            }
96        }
97
98        // Apply deletions last (after importing any records that might be deleted)
99        let deletions_path = dir.join("deletions.jsonl");
100        if deletions_path.exists() {
101            self.import_deletions(&deletions_path)?;
102        }
103
104        Ok(total_stats)
105    }
106
107    /// Import deletions from a JSONL file.
108    ///
109    /// Deletions are applied to the local database by removing the specified entities.
110    /// This ensures that records deleted on one machine are deleted on all machines.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the file cannot be read or deletions cannot be applied.
115    pub fn import_deletions(&mut self, path: &Path) -> SyncResult<usize> {
116        let file = File::open(path)?;
117        let reader = BufReader::new(file);
118        let mut deleted_count = 0;
119
120        for (line_num, line) in reader.lines().enumerate() {
121            let line = line?;
122            if line.trim().is_empty() {
123                continue;
124            }
125
126            let deletion: DeletionRecord = serde_json::from_str(&line).map_err(|e| {
127                SyncError::InvalidRecord {
128                    line: line_num + 1,
129                    message: e.to_string(),
130                }
131            })?;
132
133            // Apply the deletion
134            let entity_type = deletion.entity_type.to_string();
135            let was_deleted = self
136                .storage
137                .apply_deletion(&entity_type, &deletion.entity_id)
138                .map_err(|e| SyncError::Database(e.to_string()))?;
139
140            if was_deleted {
141                deleted_count += 1;
142            }
143        }
144
145        Ok(deleted_count)
146    }
147
148    /// Import a session record with merge.
149    fn import_session(&mut self, rec: SessionRecord, stats: &mut EntityStats) -> SyncResult<()> {
150        let existing = self
151            .storage
152            .get_session(&rec.data.id)
153            .map_err(|e| SyncError::Database(e.to_string()))?;
154
155        match existing {
156            Some(local) => {
157                // Compare content hashes
158                let local_hash = content_hash(&local);
159                if local_hash == rec.content_hash {
160                    // No change
161                    stats.skipped += 1;
162                    return Ok(());
163                }
164
165                // Apply merge strategy
166                match self.strategy {
167                    MergeStrategy::PreferNewer => {
168                        if rec.data.updated_at > local.updated_at {
169                            self.storage
170                                .upsert_session(&rec.data)
171                                .map_err(|e| SyncError::Database(e.to_string()))?;
172                            stats.updated += 1;
173                        } else {
174                            stats.skipped += 1;
175                        }
176                    }
177                    MergeStrategy::PreferLocal => {
178                        stats.skipped += 1;
179                    }
180                    MergeStrategy::PreferExternal => {
181                        self.storage
182                            .upsert_session(&rec.data)
183                            .map_err(|e| SyncError::Database(e.to_string()))?;
184                        stats.updated += 1;
185                    }
186                }
187            }
188            None => {
189                // New record
190                self.storage
191                    .upsert_session(&rec.data)
192                    .map_err(|e| SyncError::Database(e.to_string()))?;
193                stats.created += 1;
194            }
195        }
196
197        Ok(())
198    }
199
200    /// Import an issue record with merge.
201    fn import_issue(&mut self, rec: IssueRecord, stats: &mut EntityStats) -> SyncResult<()> {
202        let existing = self
203            .storage
204            .get_issue(&rec.data.id, None)
205            .map_err(|e| SyncError::Database(e.to_string()))?;
206
207        match existing {
208            Some(local) => {
209                let local_hash = content_hash(&local);
210                if local_hash == rec.content_hash {
211                    stats.skipped += 1;
212                    return Ok(());
213                }
214
215                match self.strategy {
216                    MergeStrategy::PreferNewer => {
217                        if rec.data.updated_at > local.updated_at {
218                            self.storage
219                                .upsert_issue(&rec.data)
220                                .map_err(|e| SyncError::Database(e.to_string()))?;
221                            stats.updated += 1;
222                        } else {
223                            stats.skipped += 1;
224                        }
225                    }
226                    MergeStrategy::PreferLocal => {
227                        stats.skipped += 1;
228                    }
229                    MergeStrategy::PreferExternal => {
230                        self.storage
231                            .upsert_issue(&rec.data)
232                            .map_err(|e| SyncError::Database(e.to_string()))?;
233                        stats.updated += 1;
234                    }
235                }
236            }
237            None => {
238                self.storage
239                    .upsert_issue(&rec.data)
240                    .map_err(|e| SyncError::Database(e.to_string()))?;
241                stats.created += 1;
242            }
243        }
244
245        Ok(())
246    }
247
248    /// Import a context item record with merge.
249    fn import_context_item(
250        &mut self,
251        rec: ContextItemRecord,
252        stats: &mut EntityStats,
253    ) -> SyncResult<()> {
254        let existing = self
255            .storage
256            .get_context_item(&rec.data.id)
257            .map_err(|e| SyncError::Database(e.to_string()))?;
258
259        match existing {
260            Some(local) => {
261                let local_hash = content_hash(&local);
262                if local_hash == rec.content_hash {
263                    stats.skipped += 1;
264                    return Ok(());
265                }
266
267                match self.strategy {
268                    MergeStrategy::PreferNewer => {
269                        if rec.data.updated_at > local.updated_at {
270                            self.storage
271                                .upsert_context_item(&rec.data)
272                                .map_err(|e| SyncError::Database(e.to_string()))?;
273                            stats.updated += 1;
274                        } else {
275                            stats.skipped += 1;
276                        }
277                    }
278                    MergeStrategy::PreferLocal => {
279                        stats.skipped += 1;
280                    }
281                    MergeStrategy::PreferExternal => {
282                        self.storage
283                            .upsert_context_item(&rec.data)
284                            .map_err(|e| SyncError::Database(e.to_string()))?;
285                        stats.updated += 1;
286                    }
287                }
288            }
289            None => {
290                self.storage
291                    .upsert_context_item(&rec.data)
292                    .map_err(|e| SyncError::Database(e.to_string()))?;
293                stats.created += 1;
294            }
295        }
296
297        Ok(())
298    }
299
300    /// Import a memory record with merge.
301    fn import_memory(&mut self, rec: MemoryRecord, stats: &mut EntityStats) -> SyncResult<()> {
302        // Memory items don't have a get_by_id, so we always upsert
303        // The ON CONFLICT handles deduplication by (project_path, key)
304        self.storage
305            .upsert_memory(&rec.data)
306            .map_err(|e| SyncError::Database(e.to_string()))?;
307        stats.created += 1;
308        Ok(())
309    }
310
311    /// Import a checkpoint record with merge.
312    fn import_checkpoint(
313        &mut self,
314        rec: CheckpointRecord,
315        stats: &mut EntityStats,
316    ) -> SyncResult<()> {
317        let existing = self
318            .storage
319            .get_checkpoint(&rec.data.id)
320            .map_err(|e| SyncError::Database(e.to_string()))?;
321
322        match existing {
323            Some(local) => {
324                let local_hash = content_hash(&local);
325                if local_hash == rec.content_hash {
326                    stats.skipped += 1;
327                    return Ok(());
328                }
329
330                // Checkpoints are immutable in nature, but we allow updates
331                match self.strategy {
332                    MergeStrategy::PreferNewer | MergeStrategy::PreferExternal => {
333                        self.storage
334                            .upsert_checkpoint(&rec.data)
335                            .map_err(|e| SyncError::Database(e.to_string()))?;
336                        stats.updated += 1;
337                    }
338                    MergeStrategy::PreferLocal => {
339                        stats.skipped += 1;
340                    }
341                }
342            }
343            None => {
344                self.storage
345                    .upsert_checkpoint(&rec.data)
346                    .map_err(|e| SyncError::Database(e.to_string()))?;
347                stats.created += 1;
348            }
349        }
350
351        Ok(())
352    }
353
354    /// Import a plan record with merge.
355    fn import_plan(&mut self, rec: PlanRecord, stats: &mut EntityStats) -> SyncResult<()> {
356        let existing = self
357            .storage
358            .get_plan(&rec.data.id)
359            .map_err(|e| SyncError::Database(e.to_string()))?;
360
361        match existing {
362            Some(local) => {
363                let local_hash = content_hash(&local);
364                if local_hash == rec.content_hash {
365                    stats.skipped += 1;
366                    return Ok(());
367                }
368
369                match self.strategy {
370                    MergeStrategy::PreferNewer => {
371                        if rec.data.updated_at > local.updated_at {
372                            self.storage
373                                .upsert_plan(&rec.data)
374                                .map_err(|e| SyncError::Database(e.to_string()))?;
375                            stats.updated += 1;
376                        } else {
377                            stats.skipped += 1;
378                        }
379                    }
380                    MergeStrategy::PreferLocal => {
381                        stats.skipped += 1;
382                    }
383                    MergeStrategy::PreferExternal => {
384                        self.storage
385                            .upsert_plan(&rec.data)
386                            .map_err(|e| SyncError::Database(e.to_string()))?;
387                        stats.updated += 1;
388                    }
389                }
390            }
391            None => {
392                self.storage
393                    .upsert_plan(&rec.data)
394                    .map_err(|e| SyncError::Database(e.to_string()))?;
395                stats.created += 1;
396            }
397        }
398
399        Ok(())
400    }
401}
402
403/// Merge import stats from one operation into accumulated stats.
404fn merge_stats(total: &mut ImportStats, add: &ImportStats) {
405    total.sessions.created += add.sessions.created;
406    total.sessions.updated += add.sessions.updated;
407    total.sessions.skipped += add.sessions.skipped;
408    total.sessions.conflicts += add.sessions.conflicts;
409
410    total.issues.created += add.issues.created;
411    total.issues.updated += add.issues.updated;
412    total.issues.skipped += add.issues.skipped;
413    total.issues.conflicts += add.issues.conflicts;
414
415    total.context_items.created += add.context_items.created;
416    total.context_items.updated += add.context_items.updated;
417    total.context_items.skipped += add.context_items.skipped;
418    total.context_items.conflicts += add.context_items.conflicts;
419
420    total.memories.created += add.memories.created;
421    total.memories.updated += add.memories.updated;
422    total.memories.skipped += add.memories.skipped;
423    total.memories.conflicts += add.memories.conflicts;
424
425    total.checkpoints.created += add.checkpoints.created;
426    total.checkpoints.updated += add.checkpoints.updated;
427    total.checkpoints.skipped += add.checkpoints.skipped;
428    total.checkpoints.conflicts += add.checkpoints.conflicts;
429
430    total.plans.created += add.plans.created;
431    total.plans.updated += add.plans.updated;
432    total.plans.skipped += add.plans.skipped;
433    total.plans.conflicts += add.plans.conflicts;
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439    use crate::storage::sqlite::Session;
440    use crate::sync::file::write_jsonl;
441    use tempfile::TempDir;
442
443    fn make_session(id: &str, updated_at: i64) -> Session {
444        Session {
445            id: id.to_string(),
446            name: "Test".to_string(),
447            description: None,
448            branch: None,
449            channel: None,
450            project_path: Some("/test".to_string()),
451            status: "active".to_string(),
452            ended_at: None,
453            created_at: 1000,
454            updated_at,
455        }
456    }
457
458    #[test]
459    fn test_import_new_session() {
460        let temp_dir = TempDir::new().unwrap();
461        let db_path = temp_dir.path().join("test.db");
462        let mut storage = SqliteStorage::open(&db_path).unwrap();
463
464        // Create JSONL file with session
465        let session = make_session("sess_1", 1000);
466        let record = SyncRecord::Session(SessionRecord {
467            data: session.clone(),
468            content_hash: content_hash(&session),
469            exported_at: "2025-01-20T00:00:00Z".to_string(),
470        });
471        let jsonl_path = temp_dir.path().join("sessions.jsonl");
472        write_jsonl(&jsonl_path, &[record]).unwrap();
473
474        // Import
475        let mut importer = Importer::new(&mut storage, MergeStrategy::PreferNewer);
476        let stats = importer.import(&jsonl_path).unwrap();
477
478        assert_eq!(stats.sessions.created, 1);
479        assert_eq!(stats.sessions.updated, 0);
480
481        // Verify session exists
482        let imported = storage.get_session("sess_1").unwrap();
483        assert!(imported.is_some());
484    }
485
486    #[test]
487    fn test_import_prefer_newer() {
488        let temp_dir = TempDir::new().unwrap();
489        let db_path = temp_dir.path().join("test.db");
490        let mut storage = SqliteStorage::open(&db_path).unwrap();
491
492        // Create local session with older timestamp
493        storage
494            .create_session("sess_1", "Local", None, Some("/test"), None, "test")
495            .unwrap();
496
497        // Create JSONL with newer session
498        let newer_session = Session {
499            id: "sess_1".to_string(),
500            name: "External".to_string(),
501            description: None,
502            branch: None,
503            channel: None,
504            project_path: Some("/test".to_string()),
505            status: "active".to_string(),
506            ended_at: None,
507            created_at: 1000,
508            updated_at: chrono::Utc::now().timestamp_millis() + 10000, // Future timestamp
509        };
510        let record = SyncRecord::Session(SessionRecord {
511            data: newer_session.clone(),
512            content_hash: content_hash(&newer_session),
513            exported_at: "2025-01-20T00:00:00Z".to_string(),
514        });
515        let jsonl_path = temp_dir.path().join("sessions.jsonl");
516        write_jsonl(&jsonl_path, &[record]).unwrap();
517
518        // Import with PreferNewer
519        let mut importer = Importer::new(&mut storage, MergeStrategy::PreferNewer);
520        let stats = importer.import(&jsonl_path).unwrap();
521
522        assert_eq!(stats.sessions.updated, 1);
523
524        // Verify name was updated
525        let imported = storage.get_session("sess_1").unwrap().unwrap();
526        assert_eq!(imported.name, "External");
527    }
528
529    #[test]
530    fn test_import_prefer_local() {
531        let temp_dir = TempDir::new().unwrap();
532        let db_path = temp_dir.path().join("test.db");
533        let mut storage = SqliteStorage::open(&db_path).unwrap();
534
535        // Create local session
536        storage
537            .create_session("sess_1", "Local", None, Some("/test"), None, "test")
538            .unwrap();
539
540        // Create JSONL with different session
541        let external_session = Session {
542            id: "sess_1".to_string(),
543            name: "External".to_string(),
544            description: None,
545            branch: None,
546            channel: None,
547            project_path: Some("/test".to_string()),
548            status: "active".to_string(),
549            ended_at: None,
550            created_at: 1000,
551            updated_at: chrono::Utc::now().timestamp_millis() + 10000,
552        };
553        let record = SyncRecord::Session(SessionRecord {
554            data: external_session.clone(),
555            content_hash: content_hash(&external_session),
556            exported_at: "2025-01-20T00:00:00Z".to_string(),
557        });
558        let jsonl_path = temp_dir.path().join("sessions.jsonl");
559        write_jsonl(&jsonl_path, &[record]).unwrap();
560
561        // Import with PreferLocal
562        let mut importer = Importer::new(&mut storage, MergeStrategy::PreferLocal);
563        let stats = importer.import(&jsonl_path).unwrap();
564
565        assert_eq!(stats.sessions.skipped, 1);
566
567        // Verify name was NOT updated
568        let imported = storage.get_session("sess_1").unwrap().unwrap();
569        assert_eq!(imported.name, "Local");
570    }
571}