use std::io::BufRead;
use anyhow::Result;
use second_brain_core::kuzu_store::{ApplyOutcome, KuzuStore};
use second_brain_core::schema::{Conversation, Entity, Memory, Relation, SyncNodeType, SyncOp};
use crate::export::SyncRecord;
pub struct ImportStats {
pub created: u64,
pub updated: u64,
pub deleted: u64,
pub skipped: u64,
pub errors: u64,
}
pub fn import_changes<R: BufRead>(store: &KuzuStore, reader: R) -> Result<(ImportStats, u64)> {
let mut stats = ImportStats {
created: 0,
updated: 0,
deleted: 0,
skipped: 0,
errors: 0,
};
let mut max_seq = 0u64;
let mut errored_seqs: Vec<u64> = Vec::new();
let mut records: Vec<SyncRecord> = Vec::new();
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<SyncRecord>(&line) {
Ok(r) => records.push(r),
Err(e) => {
tracing::warn!("skipping malformed sync record: {e}");
stats.errors += 1;
}
}
}
for record in &records {
if record.local_seq > max_seq {
max_seq = record.local_seq;
}
}
const TX_CHUNK: usize = 500;
let mut i = 0;
while i < records.len() {
let record = &records[i];
if matches!(record.op, SyncOp::Create) {
let end = (i + TX_CHUNK).min(records.len());
let mut j = i;
while j < end && matches!(records[j].op, SyncOp::Create) {
j += 1;
}
let chunk = &records[i..j];
match apply_create_chunk(store, chunk, &mut stats) {
Ok(()) => {}
Err(_) => {
for rec in chunk {
record_outcome(store, rec, &mut stats, &mut errored_seqs);
}
}
}
i = j;
} else {
record_outcome(store, record, &mut stats, &mut errored_seqs);
i += 1;
}
}
let watermark = errored_seqs
.iter()
.min()
.map(|m| m.saturating_sub(1))
.unwrap_or(max_seq);
Ok((stats, watermark))
}
fn record_outcome(
store: &KuzuStore,
record: &SyncRecord,
stats: &mut ImportStats,
errored_seqs: &mut Vec<u64>,
) {
match apply_with_retry(store, record) {
Ok(outcome) => match outcome {
ApplyOutcome::Created => stats.created += 1,
ApplyOutcome::Updated => stats.updated += 1,
ApplyOutcome::Deleted => stats.deleted += 1,
ApplyOutcome::Skipped => stats.skipped += 1,
},
Err(e) => {
tracing::warn!(
"error applying sync record local_seq={}: {e}",
record.local_seq
);
stats.errors += 1;
errored_seqs.push(record.local_seq);
}
}
}
fn apply_create_chunk(
store: &KuzuStore,
chunk: &[SyncRecord],
stats: &mut ImportStats,
) -> Result<()> {
let mut relation_evictions: Vec<second_brain_core::schema::Relation> = Vec::new();
let outcomes = store.with_write_transaction(|conn| {
let mut outcomes = Vec::with_capacity(chunk.len());
for record in chunk {
let origin_mid = &record.origin_machine_id;
let origin_seq = record.origin_seq as i64;
let outcome = match &record.node_type {
SyncNodeType::Memory => {
let mem: Memory = serde_json::from_value(
record
.data
.clone()
.ok_or_else(|| anyhow::anyhow!("missing data for memory create"))?,
)?;
store.apply_create_memory_on_tx(conn, &mem, origin_mid, origin_seq)?
}
SyncNodeType::Conversation => {
let conv: Conversation = serde_json::from_value(
record.data.clone().ok_or_else(|| {
anyhow::anyhow!("missing data for conversation create")
})?,
)?;
store.apply_create_conversation_on_tx(conn, &conv, origin_mid, origin_seq)?
}
SyncNodeType::Entity => {
let entity: Entity = serde_json::from_value(
record
.data
.clone()
.ok_or_else(|| anyhow::anyhow!("missing data for entity create"))?,
)?;
store.apply_create_entity_on_tx(conn, &entity, origin_mid, origin_seq)?
}
SyncNodeType::Relation => {
let rel: Relation = serde_json::from_value(
record
.data
.clone()
.ok_or_else(|| anyhow::anyhow!("missing data for relation create"))?,
)?;
let outcome =
store.apply_create_relation_on_tx(conn, &rel, origin_mid, origin_seq)?;
if matches!(outcome, ApplyOutcome::Created) {
relation_evictions.push(rel);
}
outcome
}
};
outcomes.push(outcome);
}
Ok(outcomes)
})?;
for rel in &relation_evictions {
store.note_relation_created(rel.from_id);
}
for outcome in outcomes {
match outcome {
ApplyOutcome::Created => stats.created += 1,
ApplyOutcome::Updated => stats.updated += 1,
ApplyOutcome::Deleted => stats.deleted += 1,
ApplyOutcome::Skipped => stats.skipped += 1,
}
}
Ok(())
}
fn apply_with_retry(store: &KuzuStore, record: &SyncRecord) -> Result<ApplyOutcome> {
const MAX_ATTEMPTS: u32 = 5;
let mut last_err = None;
for attempt in 0..MAX_ATTEMPTS {
match apply_record(store, record) {
Ok(outcome) => return Ok(outcome),
Err(e) => {
tracing::debug!(
"transient error on attempt {} for local_seq={}: {e}",
attempt + 1,
record.local_seq,
);
last_err = Some(e);
std::thread::sleep(std::time::Duration::from_millis(50 * (1 << attempt)));
}
}
}
Err(last_err.unwrap())
}
fn apply_record(store: &KuzuStore, record: &SyncRecord) -> Result<ApplyOutcome> {
let origin_mid = &record.origin_machine_id;
let origin_seq = record.origin_seq as i64;
match (&record.op, &record.node_type) {
(SyncOp::Create, SyncNodeType::Memory) => {
let mem: Memory = serde_json::from_value(
record
.data
.clone()
.ok_or_else(|| anyhow::anyhow!("missing data for memory create"))?,
)?;
store.apply_create_memory(&mem, origin_mid, origin_seq)
}
(SyncOp::Update, SyncNodeType::Memory) => {
let mem: Memory = serde_json::from_value(
record
.data
.clone()
.ok_or_else(|| anyhow::anyhow!("missing data for memory update"))?,
)?;
let (outcome, _) = store.apply_update_memory(&mem, origin_mid, origin_seq)?;
Ok(outcome)
}
(SyncOp::Delete, SyncNodeType::Memory) => {
store.apply_delete_memory(&record.node_id, origin_mid, origin_seq)
}
(SyncOp::Create, SyncNodeType::Conversation) => {
let conv: Conversation = serde_json::from_value(
record
.data
.clone()
.ok_or_else(|| anyhow::anyhow!("missing data for conversation create"))?,
)?;
store.apply_create_conversation(&conv, origin_mid, origin_seq)
}
(SyncOp::Update, SyncNodeType::Conversation) => {
let conv: Conversation = serde_json::from_value(
record
.data
.clone()
.ok_or_else(|| anyhow::anyhow!("missing data for conversation update"))?,
)?;
store.apply_upsert_conversation(&conv, origin_mid, origin_seq)
}
(SyncOp::Delete, SyncNodeType::Conversation) => {
store.apply_delete_conversation(&record.node_id, origin_mid, origin_seq)
}
(SyncOp::Create, SyncNodeType::Entity) => {
let entity: Entity = serde_json::from_value(
record
.data
.clone()
.ok_or_else(|| anyhow::anyhow!("missing data for entity create"))?,
)?;
store.apply_create_entity(&entity, origin_mid, origin_seq)
}
(SyncOp::Update, SyncNodeType::Entity) => {
let entity: Entity = serde_json::from_value(
record
.data
.clone()
.ok_or_else(|| anyhow::anyhow!("missing data for entity update"))?,
)?;
store.apply_upsert_entity(&entity, origin_mid, origin_seq)
}
(SyncOp::Delete, SyncNodeType::Entity) => {
store.apply_delete_entity(&record.node_id, origin_mid, origin_seq)
}
(SyncOp::Create, SyncNodeType::Relation) => {
let rel: Relation = serde_json::from_value(
record
.data
.clone()
.ok_or_else(|| anyhow::anyhow!("missing data for relation create"))?,
)?;
store.apply_create_relation(&rel, origin_mid, origin_seq)
}
(SyncOp::Update, SyncNodeType::Relation) => {
Ok(ApplyOutcome::Skipped)
}
(SyncOp::Delete, SyncNodeType::Relation) => {
let relation = record
.data
.as_ref()
.and_then(|d| serde_json::from_value::<Relation>(d.clone()).ok());
store.apply_delete_relation(&record.node_id, relation.as_ref(), origin_mid, origin_seq)
}
}
}