Skip to main content

second_brain_sync/
import.rs

1use std::io::BufRead;
2
3use anyhow::Result;
4
5use second_brain_core::kuzu_store::{ApplyOutcome, KuzuStore};
6use second_brain_core::schema::{Conversation, Entity, Memory, Relation, SyncNodeType, SyncOp};
7
8use crate::export::SyncRecord;
9
10pub struct ImportStats {
11    pub created: u64,
12    pub updated: u64,
13    pub deleted: u64,
14    pub skipped: u64,
15    pub errors: u64,
16}
17
18pub fn import_changes<R: BufRead>(store: &KuzuStore, reader: R) -> Result<(ImportStats, u64)> {
19    let mut stats = ImportStats {
20        created: 0,
21        updated: 0,
22        deleted: 0,
23        skipped: 0,
24        errors: 0,
25    };
26    let mut max_seq = 0u64;
27    let mut errored_seqs: Vec<u64> = Vec::new();
28
29    let mut records: Vec<SyncRecord> = Vec::new();
30    for line in reader.lines() {
31        let line = line?;
32        if line.trim().is_empty() {
33            continue;
34        }
35        match serde_json::from_str::<SyncRecord>(&line) {
36            Ok(r) => records.push(r),
37            Err(e) => {
38                tracing::warn!("skipping malformed sync record: {e}");
39                stats.errors += 1;
40            }
41        }
42    }
43
44    for record in &records {
45        if record.local_seq > max_seq {
46            max_seq = record.local_seq;
47        }
48    }
49
50    // Apply consecutive Create records inside a single held write transaction so a
51    // whole batch commits with one fsync instead of one per record (per-record commits
52    // throttle import to a few records/sec on slow storage). Non-create ops (update,
53    // delete) flush the open create run and apply per-record through the existing path
54    // to preserve their conflict-resolution and cache-eviction semantics unchanged.
55    const TX_CHUNK: usize = 500;
56    let mut i = 0;
57    while i < records.len() {
58        let record = &records[i];
59        if matches!(record.op, SyncOp::Create) {
60            let end = (i + TX_CHUNK).min(records.len());
61            let mut j = i;
62            while j < end && matches!(records[j].op, SyncOp::Create) {
63                j += 1;
64            }
65            let chunk = &records[i..j];
66            match apply_create_chunk(store, chunk, &mut stats) {
67                Ok(()) => {}
68                Err(_) => {
69                    for rec in chunk {
70                        record_outcome(store, rec, &mut stats, &mut errored_seqs);
71                    }
72                }
73            }
74            i = j;
75        } else {
76            record_outcome(store, record, &mut stats, &mut errored_seqs);
77            i += 1;
78        }
79    }
80
81    let watermark = errored_seqs
82        .iter()
83        .min()
84        .map(|m| m.saturating_sub(1))
85        .unwrap_or(max_seq);
86
87    Ok((stats, watermark))
88}
89
90fn record_outcome(
91    store: &KuzuStore,
92    record: &SyncRecord,
93    stats: &mut ImportStats,
94    errored_seqs: &mut Vec<u64>,
95) {
96    match apply_with_retry(store, record) {
97        Ok(outcome) => match outcome {
98            ApplyOutcome::Created => stats.created += 1,
99            ApplyOutcome::Updated => stats.updated += 1,
100            ApplyOutcome::Deleted => stats.deleted += 1,
101            ApplyOutcome::Skipped => stats.skipped += 1,
102        },
103        Err(e) => {
104            tracing::warn!(
105                "error applying sync record local_seq={}: {e}",
106                record.local_seq
107            );
108            stats.errors += 1;
109            errored_seqs.push(record.local_seq);
110        }
111    }
112}
113
114// Applies a run of Create records in one transaction. On any error the whole chunk
115// rolls back so the caller can fall back to applying each record individually; that
116// keeps the batched fast path from silently dropping records when one is malformed.
117fn apply_create_chunk(
118    store: &KuzuStore,
119    chunk: &[SyncRecord],
120    stats: &mut ImportStats,
121) -> Result<()> {
122    let mut relation_evictions: Vec<second_brain_core::schema::Relation> = Vec::new();
123    let outcomes = store.with_write_transaction(|conn| {
124        let mut outcomes = Vec::with_capacity(chunk.len());
125        for record in chunk {
126            let origin_mid = &record.origin_machine_id;
127            let origin_seq = record.origin_seq as i64;
128            let outcome = match &record.node_type {
129                SyncNodeType::Memory => {
130                    let mem: Memory = serde_json::from_value(
131                        record
132                            .data
133                            .clone()
134                            .ok_or_else(|| anyhow::anyhow!("missing data for memory create"))?,
135                    )?;
136                    store.apply_create_memory_on_tx(conn, &mem, origin_mid, origin_seq)?
137                }
138                SyncNodeType::Conversation => {
139                    let conv: Conversation = serde_json::from_value(
140                        record.data.clone().ok_or_else(|| {
141                            anyhow::anyhow!("missing data for conversation create")
142                        })?,
143                    )?;
144                    store.apply_create_conversation_on_tx(conn, &conv, origin_mid, origin_seq)?
145                }
146                SyncNodeType::Entity => {
147                    let entity: Entity = serde_json::from_value(
148                        record
149                            .data
150                            .clone()
151                            .ok_or_else(|| anyhow::anyhow!("missing data for entity create"))?,
152                    )?;
153                    store.apply_create_entity_on_tx(conn, &entity, origin_mid, origin_seq)?
154                }
155                SyncNodeType::Relation => {
156                    let rel: Relation = serde_json::from_value(
157                        record
158                            .data
159                            .clone()
160                            .ok_or_else(|| anyhow::anyhow!("missing data for relation create"))?,
161                    )?;
162                    let outcome =
163                        store.apply_create_relation_on_tx(conn, &rel, origin_mid, origin_seq)?;
164                    if matches!(outcome, ApplyOutcome::Created) {
165                        relation_evictions.push(rel);
166                    }
167                    outcome
168                }
169            };
170            outcomes.push(outcome);
171        }
172        Ok(outcomes)
173    })?;
174
175    // The relations cache must only be evicted after the transaction commits, so a
176    // rollback never leaves the cache reflecting writes that did not land.
177    for rel in &relation_evictions {
178        store.note_relation_created(rel.from_id);
179    }
180
181    for outcome in outcomes {
182        match outcome {
183            ApplyOutcome::Created => stats.created += 1,
184            ApplyOutcome::Updated => stats.updated += 1,
185            ApplyOutcome::Deleted => stats.deleted += 1,
186            ApplyOutcome::Skipped => stats.skipped += 1,
187        }
188    }
189    Ok(())
190}
191
192fn apply_with_retry(store: &KuzuStore, record: &SyncRecord) -> Result<ApplyOutcome> {
193    const MAX_ATTEMPTS: u32 = 5;
194    let mut last_err = None;
195    for attempt in 0..MAX_ATTEMPTS {
196        match apply_record(store, record) {
197            Ok(outcome) => return Ok(outcome),
198            Err(e) => {
199                tracing::debug!(
200                    "transient error on attempt {} for local_seq={}: {e}",
201                    attempt + 1,
202                    record.local_seq,
203                );
204                last_err = Some(e);
205                std::thread::sleep(std::time::Duration::from_millis(50 * (1 << attempt)));
206            }
207        }
208    }
209    Err(last_err.unwrap())
210}
211
212fn apply_record(store: &KuzuStore, record: &SyncRecord) -> Result<ApplyOutcome> {
213    let origin_mid = &record.origin_machine_id;
214    let origin_seq = record.origin_seq as i64;
215
216    match (&record.op, &record.node_type) {
217        (SyncOp::Create, SyncNodeType::Memory) => {
218            let mem: Memory = serde_json::from_value(
219                record
220                    .data
221                    .clone()
222                    .ok_or_else(|| anyhow::anyhow!("missing data for memory create"))?,
223            )?;
224            store.apply_create_memory(&mem, origin_mid, origin_seq)
225        }
226
227        (SyncOp::Update, SyncNodeType::Memory) => {
228            let mem: Memory = serde_json::from_value(
229                record
230                    .data
231                    .clone()
232                    .ok_or_else(|| anyhow::anyhow!("missing data for memory update"))?,
233            )?;
234            let (outcome, _) = store.apply_update_memory(&mem, origin_mid, origin_seq)?;
235            Ok(outcome)
236        }
237
238        (SyncOp::Delete, SyncNodeType::Memory) => {
239            store.apply_delete_memory(&record.node_id, origin_mid, origin_seq)
240        }
241
242        (SyncOp::Create, SyncNodeType::Conversation) => {
243            let conv: Conversation = serde_json::from_value(
244                record
245                    .data
246                    .clone()
247                    .ok_or_else(|| anyhow::anyhow!("missing data for conversation create"))?,
248            )?;
249            store.apply_create_conversation(&conv, origin_mid, origin_seq)
250        }
251
252        (SyncOp::Update, SyncNodeType::Conversation) => {
253            let conv: Conversation = serde_json::from_value(
254                record
255                    .data
256                    .clone()
257                    .ok_or_else(|| anyhow::anyhow!("missing data for conversation update"))?,
258            )?;
259            store.apply_upsert_conversation(&conv, origin_mid, origin_seq)
260        }
261
262        (SyncOp::Delete, SyncNodeType::Conversation) => {
263            store.apply_delete_conversation(&record.node_id, origin_mid, origin_seq)
264        }
265
266        (SyncOp::Create, SyncNodeType::Entity) => {
267            let entity: Entity = serde_json::from_value(
268                record
269                    .data
270                    .clone()
271                    .ok_or_else(|| anyhow::anyhow!("missing data for entity create"))?,
272            )?;
273            store.apply_create_entity(&entity, origin_mid, origin_seq)
274        }
275
276        (SyncOp::Update, SyncNodeType::Entity) => {
277            let entity: Entity = serde_json::from_value(
278                record
279                    .data
280                    .clone()
281                    .ok_or_else(|| anyhow::anyhow!("missing data for entity update"))?,
282            )?;
283            store.apply_upsert_entity(&entity, origin_mid, origin_seq)
284        }
285
286        (SyncOp::Delete, SyncNodeType::Entity) => {
287            store.apply_delete_entity(&record.node_id, origin_mid, origin_seq)
288        }
289
290        (SyncOp::Create, SyncNodeType::Relation) => {
291            let rel: Relation = serde_json::from_value(
292                record
293                    .data
294                    .clone()
295                    .ok_or_else(|| anyhow::anyhow!("missing data for relation create"))?,
296            )?;
297            store.apply_create_relation(&rel, origin_mid, origin_seq)
298        }
299
300        (SyncOp::Update, SyncNodeType::Relation) => {
301            // Relations are immutable, so an Update Relation record is not a valid operation
302            // and is skipped rather than mislogged as a Create on relay.
303            Ok(ApplyOutcome::Skipped)
304        }
305
306        (SyncOp::Delete, SyncNodeType::Relation) => {
307            let relation = record
308                .data
309                .as_ref()
310                .and_then(|d| serde_json::from_value::<Relation>(d.clone()).ok());
311            store.apply_delete_relation(&record.node_id, relation.as_ref(), origin_mid, origin_seq)
312        }
313    }
314}