Skip to main content

second_brain_sync/
import.rs

1use std::io::BufRead;
2
3use anyhow::Result;
4
5use second_brain_core::kuzu_store::{ApplyOutcome, KuzuStore};
6use second_brain_core::schema::{Conversation, Entity, Memory, Relation, SyncNodeType, SyncOp};
7
8use crate::export::SyncRecord;
9
10pub struct ImportStats {
11    pub created: u64,
12    pub updated: u64,
13    pub deleted: u64,
14    pub skipped: u64,
15    pub errors: u64,
16}
17
18pub fn import_changes<R: BufRead>(store: &KuzuStore, reader: R) -> Result<(ImportStats, u64)> {
19    let mut stats = ImportStats {
20        created: 0,
21        updated: 0,
22        deleted: 0,
23        skipped: 0,
24        errors: 0,
25    };
26    let mut max_seq = 0u64;
27    let mut errored_seqs: Vec<u64> = Vec::new();
28
29    for line in reader.lines() {
30        let line = line?;
31        if line.trim().is_empty() {
32            continue;
33        }
34
35        let record: SyncRecord = match serde_json::from_str(&line) {
36            Ok(r) => r,
37            Err(e) => {
38                tracing::warn!("skipping malformed sync record: {e}");
39                stats.errors += 1;
40                continue;
41            }
42        };
43
44        if record.local_seq > max_seq {
45            max_seq = record.local_seq;
46        }
47
48        match apply_with_retry(store, &record) {
49            Ok(outcome) => match outcome {
50                ApplyOutcome::Created => stats.created += 1,
51                ApplyOutcome::Updated => stats.updated += 1,
52                ApplyOutcome::Deleted => stats.deleted += 1,
53                ApplyOutcome::Skipped => stats.skipped += 1,
54            },
55            Err(e) => {
56                tracing::warn!(
57                    "error applying sync record local_seq={}: {e}",
58                    record.local_seq
59                );
60                stats.errors += 1;
61                errored_seqs.push(record.local_seq);
62            }
63        }
64    }
65
66    let watermark = errored_seqs
67        .iter()
68        .min()
69        .map(|m| m.saturating_sub(1))
70        .unwrap_or(max_seq);
71
72    Ok((stats, watermark))
73}
74
75fn apply_with_retry(store: &KuzuStore, record: &SyncRecord) -> Result<ApplyOutcome> {
76    const MAX_ATTEMPTS: u32 = 5;
77    let mut last_err = None;
78    for attempt in 0..MAX_ATTEMPTS {
79        match apply_record(store, record) {
80            Ok(outcome) => return Ok(outcome),
81            Err(e) => {
82                tracing::debug!(
83                    "transient error on attempt {} for local_seq={}: {e}",
84                    attempt + 1,
85                    record.local_seq,
86                );
87                last_err = Some(e);
88                std::thread::sleep(std::time::Duration::from_millis(50 * (1 << attempt)));
89            }
90        }
91    }
92    Err(last_err.unwrap())
93}
94
95fn apply_record(store: &KuzuStore, record: &SyncRecord) -> Result<ApplyOutcome> {
96    let origin_mid = &record.origin_machine_id;
97    let origin_seq = record.origin_seq as i64;
98
99    match (&record.op, &record.node_type) {
100        (SyncOp::Create, SyncNodeType::Memory) => {
101            let mem: Memory = serde_json::from_value(
102                record
103                    .data
104                    .clone()
105                    .ok_or_else(|| anyhow::anyhow!("missing data for memory create"))?,
106            )?;
107            store.apply_create_memory(&mem, origin_mid, origin_seq)
108        }
109
110        (SyncOp::Update, SyncNodeType::Memory) => {
111            let mem: Memory = serde_json::from_value(
112                record
113                    .data
114                    .clone()
115                    .ok_or_else(|| anyhow::anyhow!("missing data for memory update"))?,
116            )?;
117            let (outcome, _) = store.apply_update_memory(&mem, origin_mid, origin_seq)?;
118            Ok(outcome)
119        }
120
121        (SyncOp::Delete, SyncNodeType::Memory) => {
122            store.apply_delete_memory(&record.node_id, origin_mid, origin_seq)
123        }
124
125        (SyncOp::Create, SyncNodeType::Conversation) => {
126            let conv: Conversation = serde_json::from_value(
127                record
128                    .data
129                    .clone()
130                    .ok_or_else(|| anyhow::anyhow!("missing data for conversation create"))?,
131            )?;
132            store.apply_create_conversation(&conv, origin_mid, origin_seq)
133        }
134
135        (SyncOp::Update, SyncNodeType::Conversation) => {
136            let conv: Conversation = serde_json::from_value(
137                record
138                    .data
139                    .clone()
140                    .ok_or_else(|| anyhow::anyhow!("missing data for conversation update"))?,
141            )?;
142            store.apply_upsert_conversation(&conv, origin_mid, origin_seq)
143        }
144
145        (SyncOp::Delete, SyncNodeType::Conversation) => {
146            store.apply_delete_conversation(&record.node_id, origin_mid, origin_seq)
147        }
148
149        (SyncOp::Create, SyncNodeType::Entity) => {
150            let entity: Entity = serde_json::from_value(
151                record
152                    .data
153                    .clone()
154                    .ok_or_else(|| anyhow::anyhow!("missing data for entity create"))?,
155            )?;
156            store.apply_create_entity(&entity, origin_mid, origin_seq)
157        }
158
159        (SyncOp::Update, SyncNodeType::Entity) => {
160            let entity: Entity = serde_json::from_value(
161                record
162                    .data
163                    .clone()
164                    .ok_or_else(|| anyhow::anyhow!("missing data for entity update"))?,
165            )?;
166            store.apply_upsert_entity(&entity, origin_mid, origin_seq)
167        }
168
169        (SyncOp::Delete, SyncNodeType::Entity) => {
170            store.apply_delete_entity(&record.node_id, origin_mid, origin_seq)
171        }
172
173        (SyncOp::Create, SyncNodeType::Relation) => {
174            let rel: Relation = serde_json::from_value(
175                record
176                    .data
177                    .clone()
178                    .ok_or_else(|| anyhow::anyhow!("missing data for relation create"))?,
179            )?;
180            store.apply_create_relation(&rel, origin_mid, origin_seq)
181        }
182
183        (SyncOp::Update, SyncNodeType::Relation) => {
184            // Relations are immutable, so an Update Relation record is not a valid operation
185            // and is skipped rather than mislogged as a Create on relay.
186            Ok(ApplyOutcome::Skipped)
187        }
188
189        (SyncOp::Delete, SyncNodeType::Relation) => {
190            let relation = record
191                .data
192                .as_ref()
193                .and_then(|d| serde_json::from_value::<Relation>(d.clone()).ok());
194            store.apply_delete_relation(&record.node_id, relation.as_ref(), origin_mid, origin_seq)
195        }
196    }
197}