second_brain_sync/
import.rs1use std::io::BufRead;
2
3use anyhow::Result;
4
5use second_brain_core::kuzu_store::{ApplyOutcome, KuzuStore};
6use second_brain_core::schema::{Conversation, Entity, Memory, Relation, SyncNodeType, SyncOp};
7
8use crate::export::SyncRecord;
9
10pub struct ImportStats {
11 pub created: u64,
12 pub updated: u64,
13 pub deleted: u64,
14 pub skipped: u64,
15 pub errors: u64,
16}
17
18pub fn import_changes<R: BufRead>(store: &KuzuStore, reader: R) -> Result<(ImportStats, u64)> {
19 let mut stats = ImportStats {
20 created: 0,
21 updated: 0,
22 deleted: 0,
23 skipped: 0,
24 errors: 0,
25 };
26 let mut max_seq = 0u64;
27 let mut errored_seqs: Vec<u64> = Vec::new();
28
29 for line in reader.lines() {
30 let line = line?;
31 if line.trim().is_empty() {
32 continue;
33 }
34
35 let record: SyncRecord = match serde_json::from_str(&line) {
36 Ok(r) => r,
37 Err(e) => {
38 tracing::warn!("skipping malformed sync record: {e}");
39 stats.errors += 1;
40 continue;
41 }
42 };
43
44 if record.local_seq > max_seq {
45 max_seq = record.local_seq;
46 }
47
48 match apply_with_retry(store, &record) {
49 Ok(outcome) => match outcome {
50 ApplyOutcome::Created => stats.created += 1,
51 ApplyOutcome::Updated => stats.updated += 1,
52 ApplyOutcome::Deleted => stats.deleted += 1,
53 ApplyOutcome::Skipped => stats.skipped += 1,
54 },
55 Err(e) => {
56 tracing::warn!(
57 "error applying sync record local_seq={}: {e}",
58 record.local_seq
59 );
60 stats.errors += 1;
61 errored_seqs.push(record.local_seq);
62 }
63 }
64 }
65
66 let watermark = errored_seqs
67 .iter()
68 .min()
69 .map(|m| m.saturating_sub(1))
70 .unwrap_or(max_seq);
71
72 Ok((stats, watermark))
73}
74
75fn apply_with_retry(store: &KuzuStore, record: &SyncRecord) -> Result<ApplyOutcome> {
76 const MAX_ATTEMPTS: u32 = 5;
77 let mut last_err = None;
78 for attempt in 0..MAX_ATTEMPTS {
79 match apply_record(store, record) {
80 Ok(outcome) => return Ok(outcome),
81 Err(e) => {
82 tracing::debug!(
83 "transient error on attempt {} for local_seq={}: {e}",
84 attempt + 1,
85 record.local_seq,
86 );
87 last_err = Some(e);
88 std::thread::sleep(std::time::Duration::from_millis(50 * (1 << attempt)));
89 }
90 }
91 }
92 Err(last_err.unwrap())
93}
94
95fn apply_record(store: &KuzuStore, record: &SyncRecord) -> Result<ApplyOutcome> {
96 let origin_mid = &record.origin_machine_id;
97 let origin_seq = record.origin_seq as i64;
98
99 match (&record.op, &record.node_type) {
100 (SyncOp::Create, SyncNodeType::Memory) => {
101 let mem: Memory = serde_json::from_value(
102 record
103 .data
104 .clone()
105 .ok_or_else(|| anyhow::anyhow!("missing data for memory create"))?,
106 )?;
107 store.apply_create_memory(&mem, origin_mid, origin_seq)
108 }
109
110 (SyncOp::Update, SyncNodeType::Memory) => {
111 let mem: Memory = serde_json::from_value(
112 record
113 .data
114 .clone()
115 .ok_or_else(|| anyhow::anyhow!("missing data for memory update"))?,
116 )?;
117 let (outcome, _) = store.apply_update_memory(&mem, origin_mid, origin_seq)?;
118 Ok(outcome)
119 }
120
121 (SyncOp::Delete, SyncNodeType::Memory) => {
122 store.apply_delete_memory(&record.node_id, origin_mid, origin_seq)
123 }
124
125 (SyncOp::Create, SyncNodeType::Conversation) => {
126 let conv: Conversation = serde_json::from_value(
127 record
128 .data
129 .clone()
130 .ok_or_else(|| anyhow::anyhow!("missing data for conversation create"))?,
131 )?;
132 store.apply_create_conversation(&conv, origin_mid, origin_seq)
133 }
134
135 (SyncOp::Update, SyncNodeType::Conversation) => {
136 let conv: Conversation = serde_json::from_value(
137 record
138 .data
139 .clone()
140 .ok_or_else(|| anyhow::anyhow!("missing data for conversation update"))?,
141 )?;
142 store.apply_upsert_conversation(&conv, origin_mid, origin_seq)
143 }
144
145 (SyncOp::Delete, SyncNodeType::Conversation) => {
146 store.apply_delete_conversation(&record.node_id, origin_mid, origin_seq)
147 }
148
149 (SyncOp::Create, SyncNodeType::Entity) => {
150 let entity: Entity = serde_json::from_value(
151 record
152 .data
153 .clone()
154 .ok_or_else(|| anyhow::anyhow!("missing data for entity create"))?,
155 )?;
156 store.apply_create_entity(&entity, origin_mid, origin_seq)
157 }
158
159 (SyncOp::Update, SyncNodeType::Entity) => {
160 let entity: Entity = serde_json::from_value(
161 record
162 .data
163 .clone()
164 .ok_or_else(|| anyhow::anyhow!("missing data for entity update"))?,
165 )?;
166 store.apply_upsert_entity(&entity, origin_mid, origin_seq)
167 }
168
169 (SyncOp::Delete, SyncNodeType::Entity) => {
170 store.apply_delete_entity(&record.node_id, origin_mid, origin_seq)
171 }
172
173 (SyncOp::Create, SyncNodeType::Relation) => {
174 let rel: Relation = serde_json::from_value(
175 record
176 .data
177 .clone()
178 .ok_or_else(|| anyhow::anyhow!("missing data for relation create"))?,
179 )?;
180 store.apply_create_relation(&rel, origin_mid, origin_seq)
181 }
182
183 (SyncOp::Update, SyncNodeType::Relation) => {
184 Ok(ApplyOutcome::Skipped)
187 }
188
189 (SyncOp::Delete, SyncNodeType::Relation) => {
190 let relation = record
191 .data
192 .as_ref()
193 .and_then(|d| serde_json::from_value::<Relation>(d.clone()).ok());
194 store.apply_delete_relation(&record.node_id, relation.as_ref(), origin_mid, origin_seq)
195 }
196 }
197}