second_brain_sync/
import.rs1use std::io::BufRead;
2
3use anyhow::Result;
4use uuid::Uuid;
5
6use second_brain_core::kuzu_store::KuzuStore;
7use second_brain_core::schema::{Conversation, Entity, Memory, Relation, SyncNodeType, SyncOp};
8use second_brain_core::store::Store;
9
10use crate::export::SyncRecord;
11
12pub struct ImportStats {
13 pub created: u64,
14 pub updated: u64,
15 pub deleted: u64,
16 pub skipped: u64,
17 pub errors: u64,
18}
19
20pub fn import_changes<R: BufRead>(store: &KuzuStore, reader: R) -> Result<(ImportStats, u64)> {
21 let mut stats = ImportStats {
22 created: 0,
23 updated: 0,
24 deleted: 0,
25 skipped: 0,
26 errors: 0,
27 };
28 let mut max_seq = 0u64;
29
30 for line in reader.lines() {
31 let line = line?;
32 if line.trim().is_empty() {
33 continue;
34 }
35
36 let record: SyncRecord = match serde_json::from_str(&line) {
37 Ok(r) => r,
38 Err(e) => {
39 tracing::warn!("skipping malformed sync record: {e}");
40 stats.errors += 1;
41 continue;
42 }
43 };
44
45 if record.seq > max_seq {
46 max_seq = record.seq;
47 }
48
49 match apply_record(store, &record) {
50 Ok(action) => match action {
51 ApplyAction::Created => stats.created += 1,
52 ApplyAction::Updated => stats.updated += 1,
53 ApplyAction::Deleted => stats.deleted += 1,
54 ApplyAction::Skipped => stats.skipped += 1,
55 },
56 Err(e) => {
57 tracing::warn!("error applying sync record seq={}: {e}", record.seq);
58 stats.errors += 1;
59 }
60 }
61 }
62
63 Ok((stats, max_seq))
64}
65
66enum ApplyAction {
67 Created,
68 Updated,
69 Deleted,
70 Skipped,
71}
72
73fn apply_record(store: &KuzuStore, record: &SyncRecord) -> Result<ApplyAction> {
74 match (&record.op, &record.node_type) {
75 (SyncOp::Create, SyncNodeType::Conversation) => {
76 let conv: Conversation = serde_json::from_value(
77 record
78 .data
79 .clone()
80 .ok_or_else(|| anyhow::anyhow!("missing data for conversation create"))?,
81 )?;
82 if store.get_conversation(conv.id)?.is_some() {
83 return Ok(ApplyAction::Skipped);
84 }
85 store.import_conversation(&conv)?;
86 Ok(ApplyAction::Created)
87 }
88 (SyncOp::Create, SyncNodeType::Memory) => {
89 let mem: Memory = serde_json::from_value(
90 record
91 .data
92 .clone()
93 .ok_or_else(|| anyhow::anyhow!("missing data for memory create"))?,
94 )?;
95 if store.get_memory(mem.id)?.is_some() {
96 return Ok(ApplyAction::Skipped);
97 }
98 store.import_memory(&mem)?;
99 Ok(ApplyAction::Created)
100 }
101 (SyncOp::Create, SyncNodeType::Entity) => {
102 let entity: Entity = serde_json::from_value(
103 record
104 .data
105 .clone()
106 .ok_or_else(|| anyhow::anyhow!("missing data for entity create"))?,
107 )?;
108 if store.get_entity(entity.id)?.is_some() {
109 return Ok(ApplyAction::Skipped);
110 }
111 store.import_entity(&entity)?;
112 Ok(ApplyAction::Created)
113 }
114 (SyncOp::Create, SyncNodeType::Relation) => {
115 let rel: Relation = serde_json::from_value(
116 record
117 .data
118 .clone()
119 .ok_or_else(|| anyhow::anyhow!("missing data for relation create"))?,
120 )?;
121 match store.import_relation(&rel) {
122 Ok(()) => Ok(ApplyAction::Created),
123 Err(_) => Ok(ApplyAction::Skipped),
124 }
125 }
126 (SyncOp::Update, SyncNodeType::Memory) => {
127 let incoming: Memory = serde_json::from_value(
128 record
129 .data
130 .clone()
131 .ok_or_else(|| anyhow::anyhow!("missing data for memory update"))?,
132 )?;
133 match store.get_memory(incoming.id)? {
134 Some(existing) => {
135 if incoming.last_accessed > existing.last_accessed
136 || (incoming.last_accessed == existing.last_accessed
137 && incoming.confidence > existing.confidence)
138 {
139 store.import_or_update_memory(&incoming)?;
140 Ok(ApplyAction::Updated)
141 } else {
142 Ok(ApplyAction::Skipped)
143 }
144 }
145 None => {
146 store.import_memory(&incoming)?;
147 Ok(ApplyAction::Created)
148 }
149 }
150 }
151 (SyncOp::Delete, SyncNodeType::Memory) => {
152 let id = Uuid::parse_str(&record.node_id)?;
153 if store.get_memory(id)?.is_some() {
154 store.import_delete_memory(id)?;
155 Ok(ApplyAction::Deleted)
156 } else {
157 Ok(ApplyAction::Skipped)
158 }
159 }
160 _ => {
161 tracing::debug!("unhandled sync op: {:?}/{:?}", record.op, record.node_type);
162 Ok(ApplyAction::Skipped)
163 }
164 }
165}