Skip to main content

sc/sync/
import.rs

1//! JSONL import functionality.
2//!
3//! This module handles importing records from JSONL files with merge support.
4//! It uses content hashing and timestamps to resolve conflicts between
5//! local and external records.
6
7use std::path::Path;
8
9use std::fs::File;
10use std::io::{BufRead, BufReader};
11
12use crate::storage::sqlite::SqliteStorage;
13use crate::sync::file::read_jsonl;
14use crate::sync::hash::content_hash;
15use crate::sync::types::{
16    CheckpointRecord, ContextItemRecord, DeletionRecord, EntityStats, ImportStats, IssueRecord,
17    MemoryRecord, MergeStrategy, SessionRecord, SyncError, SyncRecord, SyncResult,
18};
19
20/// Importer for JSONL sync files.
21///
22/// The importer reads records from JSONL files and merges them into the
23/// local database using the specified merge strategy.
24pub struct Importer<'a> {
25    storage: &'a mut SqliteStorage,
26    strategy: MergeStrategy,
27}
28
29impl<'a> Importer<'a> {
30    /// Create a new importer with the specified merge strategy.
31    #[must_use]
32    pub fn new(storage: &'a mut SqliteStorage, strategy: MergeStrategy) -> Self {
33        Self { storage, strategy }
34    }
35
36    /// Import records from a JSONL file.
37    ///
38    /// Each line in the file is parsed and merged into the local database.
39    /// The merge strategy determines how conflicts are resolved.
40    ///
41    /// # Errors
42    ///
43    /// Returns an error if the file cannot be read or records are invalid.
44    pub fn import(&mut self, path: &Path) -> SyncResult<ImportStats> {
45        let records = read_jsonl(path)?;
46        let mut stats = ImportStats::default();
47
48        for record in records {
49            match record {
50                SyncRecord::Session(rec) => self.import_session(rec, &mut stats.sessions)?,
51                SyncRecord::Issue(rec) => self.import_issue(rec, &mut stats.issues)?,
52                SyncRecord::ContextItem(rec) => {
53                    self.import_context_item(rec, &mut stats.context_items)?;
54                }
55                SyncRecord::Memory(rec) => self.import_memory(rec, &mut stats.memories)?,
56                SyncRecord::Checkpoint(rec) => {
57                    self.import_checkpoint(rec, &mut stats.checkpoints)?;
58                }
59            }
60        }
61
62        Ok(stats)
63    }
64
65    /// Import all JSONL files from a directory.
66    ///
67    /// Imports files in order: sessions, issues, context_items, memories, checkpoints.
68    /// Then applies deletions last (to handle records that were created then deleted).
69    /// Files that don't exist are skipped.
70    ///
71    /// # Errors
72    ///
73    /// Returns an error if any file cannot be read.
74    pub fn import_all(&mut self, dir: &Path) -> SyncResult<ImportStats> {
75        let mut total_stats = ImportStats::default();
76
77        // Import data records in dependency order
78        let files = [
79            ("sessions.jsonl", "sessions"),
80            ("issues.jsonl", "issues"),
81            ("context_items.jsonl", "context_items"),
82            ("memories.jsonl", "memories"),
83            ("checkpoints.jsonl", "checkpoints"),
84        ];
85
86        for (filename, _entity) in files {
87            let path = dir.join(filename);
88            if path.exists() {
89                let stats = self.import(&path)?;
90                merge_stats(&mut total_stats, &stats);
91            }
92        }
93
94        // Apply deletions last (after importing any records that might be deleted)
95        let deletions_path = dir.join("deletions.jsonl");
96        if deletions_path.exists() {
97            self.import_deletions(&deletions_path)?;
98        }
99
100        Ok(total_stats)
101    }
102
103    /// Import deletions from a JSONL file.
104    ///
105    /// Deletions are applied to the local database by removing the specified entities.
106    /// This ensures that records deleted on one machine are deleted on all machines.
107    ///
108    /// # Errors
109    ///
110    /// Returns an error if the file cannot be read or deletions cannot be applied.
111    pub fn import_deletions(&mut self, path: &Path) -> SyncResult<usize> {
112        let file = File::open(path)?;
113        let reader = BufReader::new(file);
114        let mut deleted_count = 0;
115
116        for (line_num, line) in reader.lines().enumerate() {
117            let line = line?;
118            if line.trim().is_empty() {
119                continue;
120            }
121
122            let deletion: DeletionRecord = serde_json::from_str(&line).map_err(|e| {
123                SyncError::InvalidRecord {
124                    line: line_num + 1,
125                    message: e.to_string(),
126                }
127            })?;
128
129            // Apply the deletion
130            let entity_type = deletion.entity_type.to_string();
131            let was_deleted = self
132                .storage
133                .apply_deletion(&entity_type, &deletion.entity_id)
134                .map_err(|e| SyncError::Database(e.to_string()))?;
135
136            if was_deleted {
137                deleted_count += 1;
138            }
139        }
140
141        Ok(deleted_count)
142    }
143
144    /// Import a session record with merge.
145    fn import_session(&mut self, rec: SessionRecord, stats: &mut EntityStats) -> SyncResult<()> {
146        let existing = self
147            .storage
148            .get_session(&rec.data.id)
149            .map_err(|e| SyncError::Database(e.to_string()))?;
150
151        match existing {
152            Some(local) => {
153                // Compare content hashes
154                let local_hash = content_hash(&local);
155                if local_hash == rec.content_hash {
156                    // No change
157                    stats.skipped += 1;
158                    return Ok(());
159                }
160
161                // Apply merge strategy
162                match self.strategy {
163                    MergeStrategy::PreferNewer => {
164                        if rec.data.updated_at > local.updated_at {
165                            self.storage
166                                .upsert_session(&rec.data)
167                                .map_err(|e| SyncError::Database(e.to_string()))?;
168                            stats.updated += 1;
169                        } else {
170                            stats.skipped += 1;
171                        }
172                    }
173                    MergeStrategy::PreferLocal => {
174                        stats.skipped += 1;
175                    }
176                    MergeStrategy::PreferExternal => {
177                        self.storage
178                            .upsert_session(&rec.data)
179                            .map_err(|e| SyncError::Database(e.to_string()))?;
180                        stats.updated += 1;
181                    }
182                }
183            }
184            None => {
185                // New record
186                self.storage
187                    .upsert_session(&rec.data)
188                    .map_err(|e| SyncError::Database(e.to_string()))?;
189                stats.created += 1;
190            }
191        }
192
193        Ok(())
194    }
195
196    /// Import an issue record with merge.
197    fn import_issue(&mut self, rec: IssueRecord, stats: &mut EntityStats) -> SyncResult<()> {
198        let existing = self
199            .storage
200            .get_issue(&rec.data.id, None)
201            .map_err(|e| SyncError::Database(e.to_string()))?;
202
203        match existing {
204            Some(local) => {
205                let local_hash = content_hash(&local);
206                if local_hash == rec.content_hash {
207                    stats.skipped += 1;
208                    return Ok(());
209                }
210
211                match self.strategy {
212                    MergeStrategy::PreferNewer => {
213                        if rec.data.updated_at > local.updated_at {
214                            self.storage
215                                .upsert_issue(&rec.data)
216                                .map_err(|e| SyncError::Database(e.to_string()))?;
217                            stats.updated += 1;
218                        } else {
219                            stats.skipped += 1;
220                        }
221                    }
222                    MergeStrategy::PreferLocal => {
223                        stats.skipped += 1;
224                    }
225                    MergeStrategy::PreferExternal => {
226                        self.storage
227                            .upsert_issue(&rec.data)
228                            .map_err(|e| SyncError::Database(e.to_string()))?;
229                        stats.updated += 1;
230                    }
231                }
232            }
233            None => {
234                self.storage
235                    .upsert_issue(&rec.data)
236                    .map_err(|e| SyncError::Database(e.to_string()))?;
237                stats.created += 1;
238            }
239        }
240
241        Ok(())
242    }
243
244    /// Import a context item record with merge.
245    fn import_context_item(
246        &mut self,
247        rec: ContextItemRecord,
248        stats: &mut EntityStats,
249    ) -> SyncResult<()> {
250        let existing = self
251            .storage
252            .get_context_item(&rec.data.id)
253            .map_err(|e| SyncError::Database(e.to_string()))?;
254
255        match existing {
256            Some(local) => {
257                let local_hash = content_hash(&local);
258                if local_hash == rec.content_hash {
259                    stats.skipped += 1;
260                    return Ok(());
261                }
262
263                match self.strategy {
264                    MergeStrategy::PreferNewer => {
265                        if rec.data.updated_at > local.updated_at {
266                            self.storage
267                                .upsert_context_item(&rec.data)
268                                .map_err(|e| SyncError::Database(e.to_string()))?;
269                            stats.updated += 1;
270                        } else {
271                            stats.skipped += 1;
272                        }
273                    }
274                    MergeStrategy::PreferLocal => {
275                        stats.skipped += 1;
276                    }
277                    MergeStrategy::PreferExternal => {
278                        self.storage
279                            .upsert_context_item(&rec.data)
280                            .map_err(|e| SyncError::Database(e.to_string()))?;
281                        stats.updated += 1;
282                    }
283                }
284            }
285            None => {
286                self.storage
287                    .upsert_context_item(&rec.data)
288                    .map_err(|e| SyncError::Database(e.to_string()))?;
289                stats.created += 1;
290            }
291        }
292
293        Ok(())
294    }
295
296    /// Import a memory record with merge.
297    fn import_memory(&mut self, rec: MemoryRecord, stats: &mut EntityStats) -> SyncResult<()> {
298        // Memory items don't have a get_by_id, so we always upsert
299        // The ON CONFLICT handles deduplication by (project_path, key)
300        self.storage
301            .upsert_memory(&rec.data)
302            .map_err(|e| SyncError::Database(e.to_string()))?;
303        stats.created += 1;
304        Ok(())
305    }
306
307    /// Import a checkpoint record with merge.
308    fn import_checkpoint(
309        &mut self,
310        rec: CheckpointRecord,
311        stats: &mut EntityStats,
312    ) -> SyncResult<()> {
313        let existing = self
314            .storage
315            .get_checkpoint(&rec.data.id)
316            .map_err(|e| SyncError::Database(e.to_string()))?;
317
318        match existing {
319            Some(local) => {
320                let local_hash = content_hash(&local);
321                if local_hash == rec.content_hash {
322                    stats.skipped += 1;
323                    return Ok(());
324                }
325
326                // Checkpoints are immutable in nature, but we allow updates
327                match self.strategy {
328                    MergeStrategy::PreferNewer | MergeStrategy::PreferExternal => {
329                        self.storage
330                            .upsert_checkpoint(&rec.data)
331                            .map_err(|e| SyncError::Database(e.to_string()))?;
332                        stats.updated += 1;
333                    }
334                    MergeStrategy::PreferLocal => {
335                        stats.skipped += 1;
336                    }
337                }
338            }
339            None => {
340                self.storage
341                    .upsert_checkpoint(&rec.data)
342                    .map_err(|e| SyncError::Database(e.to_string()))?;
343                stats.created += 1;
344            }
345        }
346
347        Ok(())
348    }
349}
350
351/// Merge import stats from one operation into accumulated stats.
352fn merge_stats(total: &mut ImportStats, add: &ImportStats) {
353    total.sessions.created += add.sessions.created;
354    total.sessions.updated += add.sessions.updated;
355    total.sessions.skipped += add.sessions.skipped;
356    total.sessions.conflicts += add.sessions.conflicts;
357
358    total.issues.created += add.issues.created;
359    total.issues.updated += add.issues.updated;
360    total.issues.skipped += add.issues.skipped;
361    total.issues.conflicts += add.issues.conflicts;
362
363    total.context_items.created += add.context_items.created;
364    total.context_items.updated += add.context_items.updated;
365    total.context_items.skipped += add.context_items.skipped;
366    total.context_items.conflicts += add.context_items.conflicts;
367
368    total.memories.created += add.memories.created;
369    total.memories.updated += add.memories.updated;
370    total.memories.skipped += add.memories.skipped;
371    total.memories.conflicts += add.memories.conflicts;
372
373    total.checkpoints.created += add.checkpoints.created;
374    total.checkpoints.updated += add.checkpoints.updated;
375    total.checkpoints.skipped += add.checkpoints.skipped;
376    total.checkpoints.conflicts += add.checkpoints.conflicts;
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382    use crate::storage::sqlite::Session;
383    use crate::sync::file::write_jsonl;
384    use tempfile::TempDir;
385
386    fn make_session(id: &str, updated_at: i64) -> Session {
387        Session {
388            id: id.to_string(),
389            name: "Test".to_string(),
390            description: None,
391            branch: None,
392            channel: None,
393            project_path: Some("/test".to_string()),
394            status: "active".to_string(),
395            ended_at: None,
396            created_at: 1000,
397            updated_at,
398        }
399    }
400
401    #[test]
402    fn test_import_new_session() {
403        let temp_dir = TempDir::new().unwrap();
404        let db_path = temp_dir.path().join("test.db");
405        let mut storage = SqliteStorage::open(&db_path).unwrap();
406
407        // Create JSONL file with session
408        let session = make_session("sess_1", 1000);
409        let record = SyncRecord::Session(SessionRecord {
410            data: session.clone(),
411            content_hash: content_hash(&session),
412            exported_at: "2025-01-20T00:00:00Z".to_string(),
413        });
414        let jsonl_path = temp_dir.path().join("sessions.jsonl");
415        write_jsonl(&jsonl_path, &[record]).unwrap();
416
417        // Import
418        let mut importer = Importer::new(&mut storage, MergeStrategy::PreferNewer);
419        let stats = importer.import(&jsonl_path).unwrap();
420
421        assert_eq!(stats.sessions.created, 1);
422        assert_eq!(stats.sessions.updated, 0);
423
424        // Verify session exists
425        let imported = storage.get_session("sess_1").unwrap();
426        assert!(imported.is_some());
427    }
428
429    #[test]
430    fn test_import_prefer_newer() {
431        let temp_dir = TempDir::new().unwrap();
432        let db_path = temp_dir.path().join("test.db");
433        let mut storage = SqliteStorage::open(&db_path).unwrap();
434
435        // Create local session with older timestamp
436        storage
437            .create_session("sess_1", "Local", None, Some("/test"), None, "test")
438            .unwrap();
439
440        // Create JSONL with newer session
441        let newer_session = Session {
442            id: "sess_1".to_string(),
443            name: "External".to_string(),
444            description: None,
445            branch: None,
446            channel: None,
447            project_path: Some("/test".to_string()),
448            status: "active".to_string(),
449            ended_at: None,
450            created_at: 1000,
451            updated_at: chrono::Utc::now().timestamp_millis() + 10000, // Future timestamp
452        };
453        let record = SyncRecord::Session(SessionRecord {
454            data: newer_session.clone(),
455            content_hash: content_hash(&newer_session),
456            exported_at: "2025-01-20T00:00:00Z".to_string(),
457        });
458        let jsonl_path = temp_dir.path().join("sessions.jsonl");
459        write_jsonl(&jsonl_path, &[record]).unwrap();
460
461        // Import with PreferNewer
462        let mut importer = Importer::new(&mut storage, MergeStrategy::PreferNewer);
463        let stats = importer.import(&jsonl_path).unwrap();
464
465        assert_eq!(stats.sessions.updated, 1);
466
467        // Verify name was updated
468        let imported = storage.get_session("sess_1").unwrap().unwrap();
469        assert_eq!(imported.name, "External");
470    }
471
472    #[test]
473    fn test_import_prefer_local() {
474        let temp_dir = TempDir::new().unwrap();
475        let db_path = temp_dir.path().join("test.db");
476        let mut storage = SqliteStorage::open(&db_path).unwrap();
477
478        // Create local session
479        storage
480            .create_session("sess_1", "Local", None, Some("/test"), None, "test")
481            .unwrap();
482
483        // Create JSONL with different session
484        let external_session = Session {
485            id: "sess_1".to_string(),
486            name: "External".to_string(),
487            description: None,
488            branch: None,
489            channel: None,
490            project_path: Some("/test".to_string()),
491            status: "active".to_string(),
492            ended_at: None,
493            created_at: 1000,
494            updated_at: chrono::Utc::now().timestamp_millis() + 10000,
495        };
496        let record = SyncRecord::Session(SessionRecord {
497            data: external_session.clone(),
498            content_hash: content_hash(&external_session),
499            exported_at: "2025-01-20T00:00:00Z".to_string(),
500        });
501        let jsonl_path = temp_dir.path().join("sessions.jsonl");
502        write_jsonl(&jsonl_path, &[record]).unwrap();
503
504        // Import with PreferLocal
505        let mut importer = Importer::new(&mut storage, MergeStrategy::PreferLocal);
506        let stats = importer.import(&jsonl_path).unwrap();
507
508        assert_eq!(stats.sessions.skipped, 1);
509
510        // Verify name was NOT updated
511        let imported = storage.get_session("sess_1").unwrap().unwrap();
512        assert_eq!(imported.name, "Local");
513    }
514}