second-brain-sync 0.2.0

Bidirectional sync for second-brain: SSH transport, JSONL change log, conflict resolution
Documentation
use std::io::BufRead;

use anyhow::Result;
use uuid::Uuid;

use second_brain_core::kuzu_store::KuzuStore;
use second_brain_core::schema::{Conversation, Entity, Memory, Relation, SyncNodeType, SyncOp};
use second_brain_core::store::Store;

use crate::export::SyncRecord;

pub struct ImportStats {
    pub created: u64,
    pub updated: u64,
    pub deleted: u64,
    pub skipped: u64,
    pub errors: u64,
}

pub fn import_changes<R: BufRead>(store: &KuzuStore, reader: R) -> Result<(ImportStats, u64)> {
    let mut stats = ImportStats {
        created: 0,
        updated: 0,
        deleted: 0,
        skipped: 0,
        errors: 0,
    };
    let mut max_seq = 0u64;

    for line in reader.lines() {
        let line = line?;
        if line.trim().is_empty() {
            continue;
        }

        let record: SyncRecord = match serde_json::from_str(&line) {
            Ok(r) => r,
            Err(e) => {
                tracing::warn!("skipping malformed sync record: {e}");
                stats.errors += 1;
                continue;
            }
        };

        if record.seq > max_seq {
            max_seq = record.seq;
        }

        match apply_record(store, &record) {
            Ok(action) => match action {
                ApplyAction::Created => stats.created += 1,
                ApplyAction::Updated => stats.updated += 1,
                ApplyAction::Deleted => stats.deleted += 1,
                ApplyAction::Skipped => stats.skipped += 1,
            },
            Err(e) => {
                tracing::warn!("error applying sync record seq={}: {e}", record.seq);
                stats.errors += 1;
            }
        }
    }

    Ok((stats, max_seq))
}

enum ApplyAction {
    Created,
    Updated,
    Deleted,
    Skipped,
}

fn apply_record(store: &KuzuStore, record: &SyncRecord) -> Result<ApplyAction> {
    match (&record.op, &record.node_type) {
        (SyncOp::Create, SyncNodeType::Conversation) => {
            let conv: Conversation = serde_json::from_value(
                record
                    .data
                    .clone()
                    .ok_or_else(|| anyhow::anyhow!("missing data for conversation create"))?,
            )?;
            if store.get_conversation(conv.id)?.is_some() {
                return Ok(ApplyAction::Skipped);
            }
            store.import_conversation(&conv)?;
            Ok(ApplyAction::Created)
        }
        (SyncOp::Create, SyncNodeType::Memory) => {
            let mem: Memory = serde_json::from_value(
                record
                    .data
                    .clone()
                    .ok_or_else(|| anyhow::anyhow!("missing data for memory create"))?,
            )?;
            if store.get_memory(mem.id)?.is_some() {
                return Ok(ApplyAction::Skipped);
            }
            store.import_memory(&mem)?;
            Ok(ApplyAction::Created)
        }
        (SyncOp::Create, SyncNodeType::Entity) => {
            let entity: Entity = serde_json::from_value(
                record
                    .data
                    .clone()
                    .ok_or_else(|| anyhow::anyhow!("missing data for entity create"))?,
            )?;
            if store.get_entity(entity.id)?.is_some() {
                return Ok(ApplyAction::Skipped);
            }
            store.import_entity(&entity)?;
            Ok(ApplyAction::Created)
        }
        (SyncOp::Create, SyncNodeType::Relation) => {
            let rel: Relation = serde_json::from_value(
                record
                    .data
                    .clone()
                    .ok_or_else(|| anyhow::anyhow!("missing data for relation create"))?,
            )?;
            match store.import_relation(&rel) {
                Ok(()) => Ok(ApplyAction::Created),
                Err(_) => Ok(ApplyAction::Skipped),
            }
        }
        (SyncOp::Update, SyncNodeType::Memory) => {
            let incoming: Memory = serde_json::from_value(
                record
                    .data
                    .clone()
                    .ok_or_else(|| anyhow::anyhow!("missing data for memory update"))?,
            )?;
            match store.get_memory(incoming.id)? {
                Some(existing) => {
                    if incoming.last_accessed > existing.last_accessed
                        || (incoming.last_accessed == existing.last_accessed
                            && incoming.confidence > existing.confidence)
                    {
                        store.import_or_update_memory(&incoming)?;
                        Ok(ApplyAction::Updated)
                    } else {
                        Ok(ApplyAction::Skipped)
                    }
                }
                None => {
                    store.import_memory(&incoming)?;
                    Ok(ApplyAction::Created)
                }
            }
        }
        (SyncOp::Delete, SyncNodeType::Memory) => {
            let id = Uuid::parse_str(&record.node_id)?;
            if store.get_memory(id)?.is_some() {
                store.import_delete_memory(id)?;
                Ok(ApplyAction::Deleted)
            } else {
                Ok(ApplyAction::Skipped)
            }
        }
        _ => {
            tracing::debug!("unhandled sync op: {:?}/{:?}", record.op, record.node_type);
            Ok(ApplyAction::Skipped)
        }
    }
}