1use std::io::BufRead;
2
3use anyhow::Result;
4
5use second_brain_core::kuzu_store::{ApplyOutcome, KuzuStore};
6use second_brain_core::schema::{Conversation, Entity, Memory, Relation, SyncNodeType, SyncOp};
7
8use crate::export::SyncRecord;
9
10pub struct ImportStats {
11 pub created: u64,
12 pub updated: u64,
13 pub deleted: u64,
14 pub skipped: u64,
15 pub errors: u64,
16}
17
18pub fn import_changes<R: BufRead>(store: &KuzuStore, reader: R) -> Result<(ImportStats, u64)> {
19 let mut stats = ImportStats {
20 created: 0,
21 updated: 0,
22 deleted: 0,
23 skipped: 0,
24 errors: 0,
25 };
26 let mut max_seq = 0u64;
27 let mut errored_seqs: Vec<u64> = Vec::new();
28
29 let mut records: Vec<SyncRecord> = Vec::new();
30 for line in reader.lines() {
31 let line = line?;
32 if line.trim().is_empty() {
33 continue;
34 }
35 match serde_json::from_str::<SyncRecord>(&line) {
36 Ok(r) => records.push(r),
37 Err(e) => {
38 tracing::warn!("skipping malformed sync record: {e}");
39 stats.errors += 1;
40 }
41 }
42 }
43
44 for record in &records {
45 if record.local_seq > max_seq {
46 max_seq = record.local_seq;
47 }
48 }
49
50 const TX_CHUNK: usize = 500;
56 let mut i = 0;
57 while i < records.len() {
58 let record = &records[i];
59 if matches!(record.op, SyncOp::Create) {
60 let end = (i + TX_CHUNK).min(records.len());
61 let mut j = i;
62 while j < end && matches!(records[j].op, SyncOp::Create) {
63 j += 1;
64 }
65 let chunk = &records[i..j];
66 match apply_create_chunk(store, chunk, &mut stats) {
67 Ok(()) => {}
68 Err(_) => {
69 for rec in chunk {
70 record_outcome(store, rec, &mut stats, &mut errored_seqs);
71 }
72 }
73 }
74 i = j;
75 } else {
76 record_outcome(store, record, &mut stats, &mut errored_seqs);
77 i += 1;
78 }
79 }
80
81 let watermark = errored_seqs
82 .iter()
83 .min()
84 .map(|m| m.saturating_sub(1))
85 .unwrap_or(max_seq);
86
87 Ok((stats, watermark))
88}
89
90fn record_outcome(
91 store: &KuzuStore,
92 record: &SyncRecord,
93 stats: &mut ImportStats,
94 errored_seqs: &mut Vec<u64>,
95) {
96 match apply_with_retry(store, record) {
97 Ok(outcome) => match outcome {
98 ApplyOutcome::Created => stats.created += 1,
99 ApplyOutcome::Updated => stats.updated += 1,
100 ApplyOutcome::Deleted => stats.deleted += 1,
101 ApplyOutcome::Skipped => stats.skipped += 1,
102 },
103 Err(e) => {
104 tracing::warn!(
105 "error applying sync record local_seq={}: {e}",
106 record.local_seq
107 );
108 stats.errors += 1;
109 errored_seqs.push(record.local_seq);
110 }
111 }
112}
113
114fn apply_create_chunk(
118 store: &KuzuStore,
119 chunk: &[SyncRecord],
120 stats: &mut ImportStats,
121) -> Result<()> {
122 let mut relation_evictions: Vec<second_brain_core::schema::Relation> = Vec::new();
123 let outcomes = store.with_write_transaction(|conn| {
124 let mut outcomes = Vec::with_capacity(chunk.len());
125 for record in chunk {
126 let origin_mid = &record.origin_machine_id;
127 let origin_seq = record.origin_seq as i64;
128 let outcome = match &record.node_type {
129 SyncNodeType::Memory => {
130 let mem: Memory = serde_json::from_value(
131 record
132 .data
133 .clone()
134 .ok_or_else(|| anyhow::anyhow!("missing data for memory create"))?,
135 )?;
136 store.apply_create_memory_on_tx(conn, &mem, origin_mid, origin_seq)?
137 }
138 SyncNodeType::Conversation => {
139 let conv: Conversation = serde_json::from_value(
140 record.data.clone().ok_or_else(|| {
141 anyhow::anyhow!("missing data for conversation create")
142 })?,
143 )?;
144 store.apply_create_conversation_on_tx(conn, &conv, origin_mid, origin_seq)?
145 }
146 SyncNodeType::Entity => {
147 let entity: Entity = serde_json::from_value(
148 record
149 .data
150 .clone()
151 .ok_or_else(|| anyhow::anyhow!("missing data for entity create"))?,
152 )?;
153 store.apply_create_entity_on_tx(conn, &entity, origin_mid, origin_seq)?
154 }
155 SyncNodeType::Relation => {
156 let rel: Relation = serde_json::from_value(
157 record
158 .data
159 .clone()
160 .ok_or_else(|| anyhow::anyhow!("missing data for relation create"))?,
161 )?;
162 let outcome =
163 store.apply_create_relation_on_tx(conn, &rel, origin_mid, origin_seq)?;
164 if matches!(outcome, ApplyOutcome::Created) {
165 relation_evictions.push(rel);
166 }
167 outcome
168 }
169 };
170 outcomes.push(outcome);
171 }
172 Ok(outcomes)
173 })?;
174
175 for rel in &relation_evictions {
178 store.note_relation_created(rel.from_id);
179 }
180
181 for outcome in outcomes {
182 match outcome {
183 ApplyOutcome::Created => stats.created += 1,
184 ApplyOutcome::Updated => stats.updated += 1,
185 ApplyOutcome::Deleted => stats.deleted += 1,
186 ApplyOutcome::Skipped => stats.skipped += 1,
187 }
188 }
189 Ok(())
190}
191
192fn apply_with_retry(store: &KuzuStore, record: &SyncRecord) -> Result<ApplyOutcome> {
193 const MAX_ATTEMPTS: u32 = 5;
194 let mut last_err = None;
195 for attempt in 0..MAX_ATTEMPTS {
196 match apply_record(store, record) {
197 Ok(outcome) => return Ok(outcome),
198 Err(e) => {
199 tracing::debug!(
200 "transient error on attempt {} for local_seq={}: {e}",
201 attempt + 1,
202 record.local_seq,
203 );
204 last_err = Some(e);
205 std::thread::sleep(std::time::Duration::from_millis(50 * (1 << attempt)));
206 }
207 }
208 }
209 Err(last_err.unwrap())
210}
211
212fn apply_record(store: &KuzuStore, record: &SyncRecord) -> Result<ApplyOutcome> {
213 let origin_mid = &record.origin_machine_id;
214 let origin_seq = record.origin_seq as i64;
215
216 match (&record.op, &record.node_type) {
217 (SyncOp::Create, SyncNodeType::Memory) => {
218 let mem: Memory = serde_json::from_value(
219 record
220 .data
221 .clone()
222 .ok_or_else(|| anyhow::anyhow!("missing data for memory create"))?,
223 )?;
224 store.apply_create_memory(&mem, origin_mid, origin_seq)
225 }
226
227 (SyncOp::Update, SyncNodeType::Memory) => {
228 let mem: Memory = serde_json::from_value(
229 record
230 .data
231 .clone()
232 .ok_or_else(|| anyhow::anyhow!("missing data for memory update"))?,
233 )?;
234 let (outcome, _) = store.apply_update_memory(&mem, origin_mid, origin_seq)?;
235 Ok(outcome)
236 }
237
238 (SyncOp::Delete, SyncNodeType::Memory) => {
239 store.apply_delete_memory(&record.node_id, origin_mid, origin_seq)
240 }
241
242 (SyncOp::Create, SyncNodeType::Conversation) => {
243 let conv: Conversation = serde_json::from_value(
244 record
245 .data
246 .clone()
247 .ok_or_else(|| anyhow::anyhow!("missing data for conversation create"))?,
248 )?;
249 store.apply_create_conversation(&conv, origin_mid, origin_seq)
250 }
251
252 (SyncOp::Update, SyncNodeType::Conversation) => {
253 let conv: Conversation = serde_json::from_value(
254 record
255 .data
256 .clone()
257 .ok_or_else(|| anyhow::anyhow!("missing data for conversation update"))?,
258 )?;
259 store.apply_upsert_conversation(&conv, origin_mid, origin_seq)
260 }
261
262 (SyncOp::Delete, SyncNodeType::Conversation) => {
263 store.apply_delete_conversation(&record.node_id, origin_mid, origin_seq)
264 }
265
266 (SyncOp::Create, SyncNodeType::Entity) => {
267 let entity: Entity = serde_json::from_value(
268 record
269 .data
270 .clone()
271 .ok_or_else(|| anyhow::anyhow!("missing data for entity create"))?,
272 )?;
273 store.apply_create_entity(&entity, origin_mid, origin_seq)
274 }
275
276 (SyncOp::Update, SyncNodeType::Entity) => {
277 let entity: Entity = serde_json::from_value(
278 record
279 .data
280 .clone()
281 .ok_or_else(|| anyhow::anyhow!("missing data for entity update"))?,
282 )?;
283 store.apply_upsert_entity(&entity, origin_mid, origin_seq)
284 }
285
286 (SyncOp::Delete, SyncNodeType::Entity) => {
287 store.apply_delete_entity(&record.node_id, origin_mid, origin_seq)
288 }
289
290 (SyncOp::Create, SyncNodeType::Relation) => {
291 let rel: Relation = serde_json::from_value(
292 record
293 .data
294 .clone()
295 .ok_or_else(|| anyhow::anyhow!("missing data for relation create"))?,
296 )?;
297 store.apply_create_relation(&rel, origin_mid, origin_seq)
298 }
299
300 (SyncOp::Update, SyncNodeType::Relation) => {
301 Ok(ApplyOutcome::Skipped)
304 }
305
306 (SyncOp::Delete, SyncNodeType::Relation) => {
307 let relation = record
308 .data
309 .as_ref()
310 .and_then(|d| serde_json::from_value::<Relation>(d.clone()).ok());
311 store.apply_delete_relation(&record.node_id, relation.as_ref(), origin_mid, origin_seq)
312 }
313 }
314}