Skip to main content

second_brain_sync/
import.rs

1use std::io::BufRead;
2
3use anyhow::Result;
4use uuid::Uuid;
5
6use second_brain_core::kuzu_store::KuzuStore;
7use second_brain_core::schema::{Conversation, Entity, Memory, Relation, SyncNodeType, SyncOp};
8use second_brain_core::store::Store;
9
10use crate::export::SyncRecord;
11
12pub struct ImportStats {
13    pub created: u64,
14    pub updated: u64,
15    pub deleted: u64,
16    pub skipped: u64,
17    pub errors: u64,
18}
19
20pub fn import_changes<R: BufRead>(store: &KuzuStore, reader: R) -> Result<(ImportStats, u64)> {
21    let mut stats = ImportStats {
22        created: 0,
23        updated: 0,
24        deleted: 0,
25        skipped: 0,
26        errors: 0,
27    };
28    let mut max_seq = 0u64;
29
30    for line in reader.lines() {
31        let line = line?;
32        if line.trim().is_empty() {
33            continue;
34        }
35
36        let record: SyncRecord = match serde_json::from_str(&line) {
37            Ok(r) => r,
38            Err(e) => {
39                tracing::warn!("skipping malformed sync record: {e}");
40                stats.errors += 1;
41                continue;
42            }
43        };
44
45        if record.seq > max_seq {
46            max_seq = record.seq;
47        }
48
49        match apply_record(store, &record) {
50            Ok(action) => match action {
51                ApplyAction::Created => stats.created += 1,
52                ApplyAction::Updated => stats.updated += 1,
53                ApplyAction::Deleted => stats.deleted += 1,
54                ApplyAction::Skipped => stats.skipped += 1,
55            },
56            Err(e) => {
57                tracing::warn!("error applying sync record seq={}: {e}", record.seq);
58                stats.errors += 1;
59            }
60        }
61    }
62
63    Ok((stats, max_seq))
64}
65
66enum ApplyAction {
67    Created,
68    Updated,
69    Deleted,
70    Skipped,
71}
72
73fn apply_record(store: &KuzuStore, record: &SyncRecord) -> Result<ApplyAction> {
74    match (&record.op, &record.node_type) {
75        (SyncOp::Create, SyncNodeType::Conversation) => {
76            let conv: Conversation = serde_json::from_value(
77                record
78                    .data
79                    .clone()
80                    .ok_or_else(|| anyhow::anyhow!("missing data for conversation create"))?,
81            )?;
82            if store.get_conversation(conv.id)?.is_some() {
83                return Ok(ApplyAction::Skipped);
84            }
85            store.import_conversation(&conv)?;
86            Ok(ApplyAction::Created)
87        }
88        (SyncOp::Create, SyncNodeType::Memory) => {
89            let mem: Memory = serde_json::from_value(
90                record
91                    .data
92                    .clone()
93                    .ok_or_else(|| anyhow::anyhow!("missing data for memory create"))?,
94            )?;
95            if store.get_memory(mem.id)?.is_some() {
96                return Ok(ApplyAction::Skipped);
97            }
98            store.import_memory(&mem)?;
99            Ok(ApplyAction::Created)
100        }
101        (SyncOp::Create, SyncNodeType::Entity) => {
102            let entity: Entity = serde_json::from_value(
103                record
104                    .data
105                    .clone()
106                    .ok_or_else(|| anyhow::anyhow!("missing data for entity create"))?,
107            )?;
108            if store.get_entity(entity.id)?.is_some() {
109                return Ok(ApplyAction::Skipped);
110            }
111            store.import_entity(&entity)?;
112            Ok(ApplyAction::Created)
113        }
114        (SyncOp::Create, SyncNodeType::Relation) => {
115            let rel: Relation = serde_json::from_value(
116                record
117                    .data
118                    .clone()
119                    .ok_or_else(|| anyhow::anyhow!("missing data for relation create"))?,
120            )?;
121            match store.import_relation(&rel) {
122                Ok(()) => Ok(ApplyAction::Created),
123                Err(_) => Ok(ApplyAction::Skipped),
124            }
125        }
126        (SyncOp::Update, SyncNodeType::Memory) => {
127            let incoming: Memory = serde_json::from_value(
128                record
129                    .data
130                    .clone()
131                    .ok_or_else(|| anyhow::anyhow!("missing data for memory update"))?,
132            )?;
133            match store.get_memory(incoming.id)? {
134                Some(existing) => {
135                    if incoming.last_accessed > existing.last_accessed
136                        || (incoming.last_accessed == existing.last_accessed
137                            && incoming.confidence > existing.confidence)
138                    {
139                        store.import_or_update_memory(&incoming)?;
140                        Ok(ApplyAction::Updated)
141                    } else {
142                        Ok(ApplyAction::Skipped)
143                    }
144                }
145                None => {
146                    store.import_memory(&incoming)?;
147                    Ok(ApplyAction::Created)
148                }
149            }
150        }
151        (SyncOp::Delete, SyncNodeType::Memory) => {
152            let id = Uuid::parse_str(&record.node_id)?;
153            if store.get_memory(id)?.is_some() {
154                store.import_delete_memory(id)?;
155                Ok(ApplyAction::Deleted)
156            } else {
157                Ok(ApplyAction::Skipped)
158            }
159        }
160        _ => {
161            tracing::debug!("unhandled sync op: {:?}/{:?}", record.op, record.node_type);
162            Ok(ApplyAction::Skipped)
163        }
164    }
165}