second-brain-sync 0.5.1

Bidirectional sync for second-brain: SSH transport, JSONL change log, conflict resolution
Documentation
use std::io::BufRead;

use anyhow::Result;

use second_brain_core::kuzu_store::{ApplyOutcome, KuzuStore};
use second_brain_core::schema::{Conversation, Entity, Memory, Relation, SyncNodeType, SyncOp};

use crate::export::SyncRecord;

pub struct ImportStats {
    pub created: u64,
    pub updated: u64,
    pub deleted: u64,
    pub skipped: u64,
    pub errors: u64,
}

pub fn import_changes<R: BufRead>(store: &KuzuStore, reader: R) -> Result<(ImportStats, u64)> {
    let mut stats = ImportStats {
        created: 0,
        updated: 0,
        deleted: 0,
        skipped: 0,
        errors: 0,
    };
    let mut max_seq = 0u64;
    let mut errored_seqs: Vec<u64> = Vec::new();

    let mut records: Vec<SyncRecord> = Vec::new();
    for line in reader.lines() {
        let line = line?;
        if line.trim().is_empty() {
            continue;
        }
        match serde_json::from_str::<SyncRecord>(&line) {
            Ok(r) => records.push(r),
            Err(e) => {
                tracing::warn!("skipping malformed sync record: {e}");
                stats.errors += 1;
            }
        }
    }

    for record in &records {
        if record.local_seq > max_seq {
            max_seq = record.local_seq;
        }
    }

    // Apply consecutive Create records inside a single held write transaction so a
    // whole batch commits with one fsync instead of one per record (per-record commits
    // throttle import to a few records/sec on slow storage). Non-create ops (update,
    // delete) flush the open create run and apply per-record through the existing path
    // to preserve their conflict-resolution and cache-eviction semantics unchanged.
    const TX_CHUNK: usize = 500;
    let mut i = 0;
    while i < records.len() {
        let record = &records[i];
        if matches!(record.op, SyncOp::Create) {
            let end = (i + TX_CHUNK).min(records.len());
            let mut j = i;
            while j < end && matches!(records[j].op, SyncOp::Create) {
                j += 1;
            }
            let chunk = &records[i..j];
            match apply_create_chunk(store, chunk, &mut stats) {
                Ok(()) => {}
                Err(_) => {
                    for rec in chunk {
                        record_outcome(store, rec, &mut stats, &mut errored_seqs);
                    }
                }
            }
            i = j;
        } else {
            record_outcome(store, record, &mut stats, &mut errored_seqs);
            i += 1;
        }
    }

    let watermark = errored_seqs
        .iter()
        .min()
        .map(|m| m.saturating_sub(1))
        .unwrap_or(max_seq);

    Ok((stats, watermark))
}

fn record_outcome(
    store: &KuzuStore,
    record: &SyncRecord,
    stats: &mut ImportStats,
    errored_seqs: &mut Vec<u64>,
) {
    match apply_with_retry(store, record) {
        Ok(outcome) => match outcome {
            ApplyOutcome::Created => stats.created += 1,
            ApplyOutcome::Updated => stats.updated += 1,
            ApplyOutcome::Deleted => stats.deleted += 1,
            ApplyOutcome::Skipped => stats.skipped += 1,
        },
        Err(e) => {
            tracing::warn!(
                "error applying sync record local_seq={}: {e}",
                record.local_seq
            );
            stats.errors += 1;
            errored_seqs.push(record.local_seq);
        }
    }
}

// Applies a run of Create records in one transaction. On any error the whole chunk
// rolls back so the caller can fall back to applying each record individually; that
// keeps the batched fast path from silently dropping records when one is malformed.
fn apply_create_chunk(
    store: &KuzuStore,
    chunk: &[SyncRecord],
    stats: &mut ImportStats,
) -> Result<()> {
    let mut relation_evictions: Vec<second_brain_core::schema::Relation> = Vec::new();
    let outcomes = store.with_write_transaction(|conn| {
        let mut outcomes = Vec::with_capacity(chunk.len());
        for record in chunk {
            let origin_mid = &record.origin_machine_id;
            let origin_seq = record.origin_seq as i64;
            let outcome = match &record.node_type {
                SyncNodeType::Memory => {
                    let mem: Memory = serde_json::from_value(
                        record
                            .data
                            .clone()
                            .ok_or_else(|| anyhow::anyhow!("missing data for memory create"))?,
                    )?;
                    store.apply_create_memory_on_tx(conn, &mem, origin_mid, origin_seq)?
                }
                SyncNodeType::Conversation => {
                    let conv: Conversation = serde_json::from_value(
                        record.data.clone().ok_or_else(|| {
                            anyhow::anyhow!("missing data for conversation create")
                        })?,
                    )?;
                    store.apply_create_conversation_on_tx(conn, &conv, origin_mid, origin_seq)?
                }
                SyncNodeType::Entity => {
                    let entity: Entity = serde_json::from_value(
                        record
                            .data
                            .clone()
                            .ok_or_else(|| anyhow::anyhow!("missing data for entity create"))?,
                    )?;
                    store.apply_create_entity_on_tx(conn, &entity, origin_mid, origin_seq)?
                }
                SyncNodeType::Relation => {
                    let rel: Relation = serde_json::from_value(
                        record
                            .data
                            .clone()
                            .ok_or_else(|| anyhow::anyhow!("missing data for relation create"))?,
                    )?;
                    let outcome =
                        store.apply_create_relation_on_tx(conn, &rel, origin_mid, origin_seq)?;
                    if matches!(outcome, ApplyOutcome::Created) {
                        relation_evictions.push(rel);
                    }
                    outcome
                }
            };
            outcomes.push(outcome);
        }
        Ok(outcomes)
    })?;

    // The relations cache must only be evicted after the transaction commits, so a
    // rollback never leaves the cache reflecting writes that did not land.
    for rel in &relation_evictions {
        store.note_relation_created(rel.from_id);
    }

    for outcome in outcomes {
        match outcome {
            ApplyOutcome::Created => stats.created += 1,
            ApplyOutcome::Updated => stats.updated += 1,
            ApplyOutcome::Deleted => stats.deleted += 1,
            ApplyOutcome::Skipped => stats.skipped += 1,
        }
    }
    Ok(())
}

fn apply_with_retry(store: &KuzuStore, record: &SyncRecord) -> Result<ApplyOutcome> {
    const MAX_ATTEMPTS: u32 = 5;
    let mut last_err = None;
    for attempt in 0..MAX_ATTEMPTS {
        match apply_record(store, record) {
            Ok(outcome) => return Ok(outcome),
            Err(e) => {
                tracing::debug!(
                    "transient error on attempt {} for local_seq={}: {e}",
                    attempt + 1,
                    record.local_seq,
                );
                last_err = Some(e);
                std::thread::sleep(std::time::Duration::from_millis(50 * (1 << attempt)));
            }
        }
    }
    Err(last_err.unwrap())
}

fn apply_record(store: &KuzuStore, record: &SyncRecord) -> Result<ApplyOutcome> {
    let origin_mid = &record.origin_machine_id;
    let origin_seq = record.origin_seq as i64;

    match (&record.op, &record.node_type) {
        (SyncOp::Create, SyncNodeType::Memory) => {
            let mem: Memory = serde_json::from_value(
                record
                    .data
                    .clone()
                    .ok_or_else(|| anyhow::anyhow!("missing data for memory create"))?,
            )?;
            store.apply_create_memory(&mem, origin_mid, origin_seq)
        }

        (SyncOp::Update, SyncNodeType::Memory) => {
            let mem: Memory = serde_json::from_value(
                record
                    .data
                    .clone()
                    .ok_or_else(|| anyhow::anyhow!("missing data for memory update"))?,
            )?;
            let (outcome, _) = store.apply_update_memory(&mem, origin_mid, origin_seq)?;
            Ok(outcome)
        }

        (SyncOp::Delete, SyncNodeType::Memory) => {
            store.apply_delete_memory(&record.node_id, origin_mid, origin_seq)
        }

        (SyncOp::Create, SyncNodeType::Conversation) => {
            let conv: Conversation = serde_json::from_value(
                record
                    .data
                    .clone()
                    .ok_or_else(|| anyhow::anyhow!("missing data for conversation create"))?,
            )?;
            store.apply_create_conversation(&conv, origin_mid, origin_seq)
        }

        (SyncOp::Update, SyncNodeType::Conversation) => {
            let conv: Conversation = serde_json::from_value(
                record
                    .data
                    .clone()
                    .ok_or_else(|| anyhow::anyhow!("missing data for conversation update"))?,
            )?;
            store.apply_upsert_conversation(&conv, origin_mid, origin_seq)
        }

        (SyncOp::Delete, SyncNodeType::Conversation) => {
            store.apply_delete_conversation(&record.node_id, origin_mid, origin_seq)
        }

        (SyncOp::Create, SyncNodeType::Entity) => {
            let entity: Entity = serde_json::from_value(
                record
                    .data
                    .clone()
                    .ok_or_else(|| anyhow::anyhow!("missing data for entity create"))?,
            )?;
            store.apply_create_entity(&entity, origin_mid, origin_seq)
        }

        (SyncOp::Update, SyncNodeType::Entity) => {
            let entity: Entity = serde_json::from_value(
                record
                    .data
                    .clone()
                    .ok_or_else(|| anyhow::anyhow!("missing data for entity update"))?,
            )?;
            store.apply_upsert_entity(&entity, origin_mid, origin_seq)
        }

        (SyncOp::Delete, SyncNodeType::Entity) => {
            store.apply_delete_entity(&record.node_id, origin_mid, origin_seq)
        }

        (SyncOp::Create, SyncNodeType::Relation) => {
            let rel: Relation = serde_json::from_value(
                record
                    .data
                    .clone()
                    .ok_or_else(|| anyhow::anyhow!("missing data for relation create"))?,
            )?;
            store.apply_create_relation(&rel, origin_mid, origin_seq)
        }

        (SyncOp::Update, SyncNodeType::Relation) => {
            // Relations are immutable, so an Update Relation record is not a valid operation
            // and is skipped rather than mislogged as a Create on relay.
            Ok(ApplyOutcome::Skipped)
        }

        (SyncOp::Delete, SyncNodeType::Relation) => {
            let relation = record
                .data
                .as_ref()
                .and_then(|d| serde_json::from_value::<Relation>(d.clone()).ok());
            store.apply_delete_relation(&record.node_id, relation.as_ref(), origin_mid, origin_seq)
        }
    }
}