1use std::path::Path;
8
9use std::fs::File;
10use std::io::{BufRead, BufReader};
11
12use crate::storage::sqlite::SqliteStorage;
13use crate::sync::file::read_jsonl;
14use crate::sync::hash::content_hash;
15use crate::sync::types::{
16 CheckpointRecord, ContextItemRecord, DeletionRecord, EntityStats, ImportStats, IssueRecord,
17 MemoryRecord, MergeStrategy, PlanRecord, SessionRecord, SyncError, SyncRecord, SyncResult,
18};
19
20pub struct Importer<'a> {
25 storage: &'a mut SqliteStorage,
26 strategy: MergeStrategy,
27}
28
29impl<'a> Importer<'a> {
30 #[must_use]
32 pub fn new(storage: &'a mut SqliteStorage, strategy: MergeStrategy) -> Self {
33 Self { storage, strategy }
34 }
35
36 pub fn import(&mut self, path: &Path) -> SyncResult<ImportStats> {
45 let records = read_jsonl(path)?;
46 let mut stats = ImportStats::default();
47
48 for record in records {
49 match record {
50 SyncRecord::Session(rec) => self.import_session(rec, &mut stats.sessions)?,
51 SyncRecord::Issue(rec) => self.import_issue(rec, &mut stats.issues)?,
52 SyncRecord::ContextItem(rec) => {
53 self.import_context_item(rec, &mut stats.context_items)?;
54 }
55 SyncRecord::Memory(rec) => self.import_memory(rec, &mut stats.memories)?,
56 SyncRecord::Checkpoint(rec) => {
57 self.import_checkpoint(rec, &mut stats.checkpoints)?;
58 }
59 SyncRecord::Plan(rec) => {
60 self.import_plan(rec, &mut stats.plans)?;
61 }
62 }
63 }
64
65 Ok(stats)
66 }
67
68 pub fn import_all(&mut self, dir: &Path) -> SyncResult<ImportStats> {
78 let mut total_stats = ImportStats::default();
79
80 let files = [
82 ("sessions.jsonl", "sessions"),
83 ("issues.jsonl", "issues"),
84 ("context_items.jsonl", "context_items"),
85 ("memories.jsonl", "memories"),
86 ("checkpoints.jsonl", "checkpoints"),
87 ("plans.jsonl", "plans"),
88 ];
89
90 for (filename, _entity) in files {
91 let path = dir.join(filename);
92 if path.exists() {
93 let stats = self.import(&path)?;
94 merge_stats(&mut total_stats, &stats);
95 }
96 }
97
98 let deletions_path = dir.join("deletions.jsonl");
100 if deletions_path.exists() {
101 self.import_deletions(&deletions_path)?;
102 }
103
104 Ok(total_stats)
105 }
106
107 pub fn import_deletions(&mut self, path: &Path) -> SyncResult<usize> {
116 let file = File::open(path)?;
117 let reader = BufReader::new(file);
118 let mut deleted_count = 0;
119
120 for (line_num, line) in reader.lines().enumerate() {
121 let line = line?;
122 if line.trim().is_empty() {
123 continue;
124 }
125
126 let deletion: DeletionRecord = serde_json::from_str(&line).map_err(|e| {
127 SyncError::InvalidRecord {
128 line: line_num + 1,
129 message: e.to_string(),
130 }
131 })?;
132
133 let entity_type = deletion.entity_type.to_string();
135 let was_deleted = self
136 .storage
137 .apply_deletion(&entity_type, &deletion.entity_id)
138 .map_err(|e| SyncError::Database(e.to_string()))?;
139
140 if was_deleted {
141 deleted_count += 1;
142 }
143 }
144
145 Ok(deleted_count)
146 }
147
148 fn import_session(&mut self, rec: SessionRecord, stats: &mut EntityStats) -> SyncResult<()> {
150 let existing = self
151 .storage
152 .get_session(&rec.data.id)
153 .map_err(|e| SyncError::Database(e.to_string()))?;
154
155 match existing {
156 Some(local) => {
157 let local_hash = content_hash(&local);
159 if local_hash == rec.content_hash {
160 stats.skipped += 1;
162 return Ok(());
163 }
164
165 match self.strategy {
167 MergeStrategy::PreferNewer => {
168 if rec.data.updated_at > local.updated_at {
169 self.storage
170 .upsert_session(&rec.data)
171 .map_err(|e| SyncError::Database(e.to_string()))?;
172 stats.updated += 1;
173 } else {
174 stats.skipped += 1;
175 }
176 }
177 MergeStrategy::PreferLocal => {
178 stats.skipped += 1;
179 }
180 MergeStrategy::PreferExternal => {
181 self.storage
182 .upsert_session(&rec.data)
183 .map_err(|e| SyncError::Database(e.to_string()))?;
184 stats.updated += 1;
185 }
186 }
187 }
188 None => {
189 self.storage
191 .upsert_session(&rec.data)
192 .map_err(|e| SyncError::Database(e.to_string()))?;
193 stats.created += 1;
194 }
195 }
196
197 Ok(())
198 }
199
200 fn import_issue(&mut self, rec: IssueRecord, stats: &mut EntityStats) -> SyncResult<()> {
202 let existing = self
203 .storage
204 .get_issue(&rec.data.id, None)
205 .map_err(|e| SyncError::Database(e.to_string()))?;
206
207 match existing {
208 Some(local) => {
209 let local_hash = content_hash(&local);
210 if local_hash == rec.content_hash {
211 stats.skipped += 1;
212 return Ok(());
213 }
214
215 match self.strategy {
216 MergeStrategy::PreferNewer => {
217 if rec.data.updated_at > local.updated_at {
218 self.storage
219 .upsert_issue(&rec.data)
220 .map_err(|e| SyncError::Database(e.to_string()))?;
221 stats.updated += 1;
222 } else {
223 stats.skipped += 1;
224 }
225 }
226 MergeStrategy::PreferLocal => {
227 stats.skipped += 1;
228 }
229 MergeStrategy::PreferExternal => {
230 self.storage
231 .upsert_issue(&rec.data)
232 .map_err(|e| SyncError::Database(e.to_string()))?;
233 stats.updated += 1;
234 }
235 }
236 }
237 None => {
238 self.storage
239 .upsert_issue(&rec.data)
240 .map_err(|e| SyncError::Database(e.to_string()))?;
241 stats.created += 1;
242 }
243 }
244
245 Ok(())
246 }
247
248 fn import_context_item(
250 &mut self,
251 rec: ContextItemRecord,
252 stats: &mut EntityStats,
253 ) -> SyncResult<()> {
254 let existing = self
255 .storage
256 .get_context_item(&rec.data.id)
257 .map_err(|e| SyncError::Database(e.to_string()))?;
258
259 match existing {
260 Some(local) => {
261 let local_hash = content_hash(&local);
262 if local_hash == rec.content_hash {
263 stats.skipped += 1;
264 return Ok(());
265 }
266
267 match self.strategy {
268 MergeStrategy::PreferNewer => {
269 if rec.data.updated_at > local.updated_at {
270 self.storage
271 .upsert_context_item(&rec.data)
272 .map_err(|e| SyncError::Database(e.to_string()))?;
273 stats.updated += 1;
274 } else {
275 stats.skipped += 1;
276 }
277 }
278 MergeStrategy::PreferLocal => {
279 stats.skipped += 1;
280 }
281 MergeStrategy::PreferExternal => {
282 self.storage
283 .upsert_context_item(&rec.data)
284 .map_err(|e| SyncError::Database(e.to_string()))?;
285 stats.updated += 1;
286 }
287 }
288 }
289 None => {
290 self.storage
291 .upsert_context_item(&rec.data)
292 .map_err(|e| SyncError::Database(e.to_string()))?;
293 stats.created += 1;
294 }
295 }
296
297 Ok(())
298 }
299
300 fn import_memory(&mut self, rec: MemoryRecord, stats: &mut EntityStats) -> SyncResult<()> {
302 self.storage
305 .upsert_memory(&rec.data)
306 .map_err(|e| SyncError::Database(e.to_string()))?;
307 stats.created += 1;
308 Ok(())
309 }
310
311 fn import_checkpoint(
313 &mut self,
314 rec: CheckpointRecord,
315 stats: &mut EntityStats,
316 ) -> SyncResult<()> {
317 let existing = self
318 .storage
319 .get_checkpoint(&rec.data.id)
320 .map_err(|e| SyncError::Database(e.to_string()))?;
321
322 match existing {
323 Some(local) => {
324 let local_hash = content_hash(&local);
325 if local_hash == rec.content_hash {
326 stats.skipped += 1;
327 return Ok(());
328 }
329
330 match self.strategy {
332 MergeStrategy::PreferNewer | MergeStrategy::PreferExternal => {
333 self.storage
334 .upsert_checkpoint(&rec.data)
335 .map_err(|e| SyncError::Database(e.to_string()))?;
336 stats.updated += 1;
337 }
338 MergeStrategy::PreferLocal => {
339 stats.skipped += 1;
340 }
341 }
342 }
343 None => {
344 self.storage
345 .upsert_checkpoint(&rec.data)
346 .map_err(|e| SyncError::Database(e.to_string()))?;
347 stats.created += 1;
348 }
349 }
350
351 Ok(())
352 }
353
354 fn import_plan(&mut self, rec: PlanRecord, stats: &mut EntityStats) -> SyncResult<()> {
356 let existing = self
357 .storage
358 .get_plan(&rec.data.id)
359 .map_err(|e| SyncError::Database(e.to_string()))?;
360
361 match existing {
362 Some(local) => {
363 let local_hash = content_hash(&local);
364 if local_hash == rec.content_hash {
365 stats.skipped += 1;
366 return Ok(());
367 }
368
369 match self.strategy {
370 MergeStrategy::PreferNewer => {
371 if rec.data.updated_at > local.updated_at {
372 self.storage
373 .upsert_plan(&rec.data)
374 .map_err(|e| SyncError::Database(e.to_string()))?;
375 stats.updated += 1;
376 } else {
377 stats.skipped += 1;
378 }
379 }
380 MergeStrategy::PreferLocal => {
381 stats.skipped += 1;
382 }
383 MergeStrategy::PreferExternal => {
384 self.storage
385 .upsert_plan(&rec.data)
386 .map_err(|e| SyncError::Database(e.to_string()))?;
387 stats.updated += 1;
388 }
389 }
390 }
391 None => {
392 self.storage
393 .upsert_plan(&rec.data)
394 .map_err(|e| SyncError::Database(e.to_string()))?;
395 stats.created += 1;
396 }
397 }
398
399 Ok(())
400 }
401}
402
403fn merge_stats(total: &mut ImportStats, add: &ImportStats) {
405 total.sessions.created += add.sessions.created;
406 total.sessions.updated += add.sessions.updated;
407 total.sessions.skipped += add.sessions.skipped;
408 total.sessions.conflicts += add.sessions.conflicts;
409
410 total.issues.created += add.issues.created;
411 total.issues.updated += add.issues.updated;
412 total.issues.skipped += add.issues.skipped;
413 total.issues.conflicts += add.issues.conflicts;
414
415 total.context_items.created += add.context_items.created;
416 total.context_items.updated += add.context_items.updated;
417 total.context_items.skipped += add.context_items.skipped;
418 total.context_items.conflicts += add.context_items.conflicts;
419
420 total.memories.created += add.memories.created;
421 total.memories.updated += add.memories.updated;
422 total.memories.skipped += add.memories.skipped;
423 total.memories.conflicts += add.memories.conflicts;
424
425 total.checkpoints.created += add.checkpoints.created;
426 total.checkpoints.updated += add.checkpoints.updated;
427 total.checkpoints.skipped += add.checkpoints.skipped;
428 total.checkpoints.conflicts += add.checkpoints.conflicts;
429
430 total.plans.created += add.plans.created;
431 total.plans.updated += add.plans.updated;
432 total.plans.skipped += add.plans.skipped;
433 total.plans.conflicts += add.plans.conflicts;
434}
435
436#[cfg(test)]
437mod tests {
438 use super::*;
439 use crate::storage::sqlite::Session;
440 use crate::sync::file::write_jsonl;
441 use tempfile::TempDir;
442
443 fn make_session(id: &str, updated_at: i64) -> Session {
444 Session {
445 id: id.to_string(),
446 name: "Test".to_string(),
447 description: None,
448 branch: None,
449 channel: None,
450 project_path: Some("/test".to_string()),
451 status: "active".to_string(),
452 ended_at: None,
453 created_at: 1000,
454 updated_at,
455 }
456 }
457
458 #[test]
459 fn test_import_new_session() {
460 let temp_dir = TempDir::new().unwrap();
461 let db_path = temp_dir.path().join("test.db");
462 let mut storage = SqliteStorage::open(&db_path).unwrap();
463
464 let session = make_session("sess_1", 1000);
466 let record = SyncRecord::Session(SessionRecord {
467 data: session.clone(),
468 content_hash: content_hash(&session),
469 exported_at: "2025-01-20T00:00:00Z".to_string(),
470 });
471 let jsonl_path = temp_dir.path().join("sessions.jsonl");
472 write_jsonl(&jsonl_path, &[record]).unwrap();
473
474 let mut importer = Importer::new(&mut storage, MergeStrategy::PreferNewer);
476 let stats = importer.import(&jsonl_path).unwrap();
477
478 assert_eq!(stats.sessions.created, 1);
479 assert_eq!(stats.sessions.updated, 0);
480
481 let imported = storage.get_session("sess_1").unwrap();
483 assert!(imported.is_some());
484 }
485
486 #[test]
487 fn test_import_prefer_newer() {
488 let temp_dir = TempDir::new().unwrap();
489 let db_path = temp_dir.path().join("test.db");
490 let mut storage = SqliteStorage::open(&db_path).unwrap();
491
492 storage
494 .create_session("sess_1", "Local", None, Some("/test"), None, "test")
495 .unwrap();
496
497 let newer_session = Session {
499 id: "sess_1".to_string(),
500 name: "External".to_string(),
501 description: None,
502 branch: None,
503 channel: None,
504 project_path: Some("/test".to_string()),
505 status: "active".to_string(),
506 ended_at: None,
507 created_at: 1000,
508 updated_at: chrono::Utc::now().timestamp_millis() + 10000, };
510 let record = SyncRecord::Session(SessionRecord {
511 data: newer_session.clone(),
512 content_hash: content_hash(&newer_session),
513 exported_at: "2025-01-20T00:00:00Z".to_string(),
514 });
515 let jsonl_path = temp_dir.path().join("sessions.jsonl");
516 write_jsonl(&jsonl_path, &[record]).unwrap();
517
518 let mut importer = Importer::new(&mut storage, MergeStrategy::PreferNewer);
520 let stats = importer.import(&jsonl_path).unwrap();
521
522 assert_eq!(stats.sessions.updated, 1);
523
524 let imported = storage.get_session("sess_1").unwrap().unwrap();
526 assert_eq!(imported.name, "External");
527 }
528
529 #[test]
530 fn test_import_prefer_local() {
531 let temp_dir = TempDir::new().unwrap();
532 let db_path = temp_dir.path().join("test.db");
533 let mut storage = SqliteStorage::open(&db_path).unwrap();
534
535 storage
537 .create_session("sess_1", "Local", None, Some("/test"), None, "test")
538 .unwrap();
539
540 let external_session = Session {
542 id: "sess_1".to_string(),
543 name: "External".to_string(),
544 description: None,
545 branch: None,
546 channel: None,
547 project_path: Some("/test".to_string()),
548 status: "active".to_string(),
549 ended_at: None,
550 created_at: 1000,
551 updated_at: chrono::Utc::now().timestamp_millis() + 10000,
552 };
553 let record = SyncRecord::Session(SessionRecord {
554 data: external_session.clone(),
555 content_hash: content_hash(&external_session),
556 exported_at: "2025-01-20T00:00:00Z".to_string(),
557 });
558 let jsonl_path = temp_dir.path().join("sessions.jsonl");
559 write_jsonl(&jsonl_path, &[record]).unwrap();
560
561 let mut importer = Importer::new(&mut storage, MergeStrategy::PreferLocal);
563 let stats = importer.import(&jsonl_path).unwrap();
564
565 assert_eq!(stats.sessions.skipped, 1);
566
567 let imported = storage.get_session("sess_1").unwrap().unwrap();
569 assert_eq!(imported.name, "Local");
570 }
571}