use std::io::BufRead;
use anyhow::Result;
use uuid::Uuid;
use second_brain_core::kuzu_store::KuzuStore;
use second_brain_core::schema::{Conversation, Entity, Memory, Relation, SyncNodeType, SyncOp};
use second_brain_core::store::Store;
use crate::export::SyncRecord;
pub struct ImportStats {
pub created: u64,
pub updated: u64,
pub deleted: u64,
pub skipped: u64,
pub errors: u64,
}
pub fn import_changes<R: BufRead>(store: &KuzuStore, reader: R) -> Result<(ImportStats, u64)> {
let mut stats = ImportStats {
created: 0,
updated: 0,
deleted: 0,
skipped: 0,
errors: 0,
};
let mut max_seq = 0u64;
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let record: SyncRecord = match serde_json::from_str(&line) {
Ok(r) => r,
Err(e) => {
tracing::warn!("skipping malformed sync record: {e}");
stats.errors += 1;
continue;
}
};
if record.seq > max_seq {
max_seq = record.seq;
}
match apply_record(store, &record) {
Ok(action) => match action {
ApplyAction::Created => stats.created += 1,
ApplyAction::Updated => stats.updated += 1,
ApplyAction::Deleted => stats.deleted += 1,
ApplyAction::Skipped => stats.skipped += 1,
},
Err(e) => {
tracing::warn!("error applying sync record seq={}: {e}", record.seq);
stats.errors += 1;
}
}
}
Ok((stats, max_seq))
}
enum ApplyAction {
Created,
Updated,
Deleted,
Skipped,
}
fn apply_record(store: &KuzuStore, record: &SyncRecord) -> Result<ApplyAction> {
match (&record.op, &record.node_type) {
(SyncOp::Create, SyncNodeType::Conversation) => {
let conv: Conversation = serde_json::from_value(
record
.data
.clone()
.ok_or_else(|| anyhow::anyhow!("missing data for conversation create"))?,
)?;
if store.get_conversation(conv.id)?.is_some() {
return Ok(ApplyAction::Skipped);
}
store.import_conversation(&conv)?;
Ok(ApplyAction::Created)
}
(SyncOp::Create, SyncNodeType::Memory) => {
let mem: Memory = serde_json::from_value(
record
.data
.clone()
.ok_or_else(|| anyhow::anyhow!("missing data for memory create"))?,
)?;
if store.get_memory(mem.id)?.is_some() {
return Ok(ApplyAction::Skipped);
}
store.import_memory(&mem)?;
Ok(ApplyAction::Created)
}
(SyncOp::Create, SyncNodeType::Entity) => {
let entity: Entity = serde_json::from_value(
record
.data
.clone()
.ok_or_else(|| anyhow::anyhow!("missing data for entity create"))?,
)?;
if store.get_entity(entity.id)?.is_some() {
return Ok(ApplyAction::Skipped);
}
store.import_entity(&entity)?;
Ok(ApplyAction::Created)
}
(SyncOp::Create, SyncNodeType::Relation) => {
let rel: Relation = serde_json::from_value(
record
.data
.clone()
.ok_or_else(|| anyhow::anyhow!("missing data for relation create"))?,
)?;
match store.import_relation(&rel) {
Ok(()) => Ok(ApplyAction::Created),
Err(_) => Ok(ApplyAction::Skipped),
}
}
(SyncOp::Update, SyncNodeType::Memory) => {
let incoming: Memory = serde_json::from_value(
record
.data
.clone()
.ok_or_else(|| anyhow::anyhow!("missing data for memory update"))?,
)?;
match store.get_memory(incoming.id)? {
Some(existing) => {
if incoming.last_accessed > existing.last_accessed
|| (incoming.last_accessed == existing.last_accessed
&& incoming.confidence > existing.confidence)
{
store.import_or_update_memory(&incoming)?;
Ok(ApplyAction::Updated)
} else {
Ok(ApplyAction::Skipped)
}
}
None => {
store.import_memory(&incoming)?;
Ok(ApplyAction::Created)
}
}
}
(SyncOp::Delete, SyncNodeType::Memory) => {
let id = Uuid::parse_str(&record.node_id)?;
if store.get_memory(id)?.is_some() {
store.import_delete_memory(id)?;
Ok(ApplyAction::Deleted)
} else {
Ok(ApplyAction::Skipped)
}
}
_ => {
tracing::debug!("unhandled sync op: {:?}/{:?}", record.op, record.node_type);
Ok(ApplyAction::Skipped)
}
}
}