second-brain-sync 0.3.1

Bidirectional sync for second-brain: SSH transport, JSONL change log, conflict resolution
Documentation
use std::io::{BufReader, BufWriter};
use std::process::{Command, Stdio};

use anyhow::{Context, Result};

use second_brain_core::kuzu_store::KuzuStore;
use second_brain_core::schema::SyncState;

use crate::import::ImportStats;

pub struct SshTransport {
    pub host: String,
    pub user: Option<String>,
    pub sb_binary: String,
}

impl SshTransport {
    pub fn new(host: String) -> Self {
        Self {
            host,
            user: None,
            sb_binary: "sb".to_string(),
        }
    }

    pub fn with_binary(mut self, path: String) -> Self {
        self.sb_binary = path;
        self
    }

    pub fn with_user(mut self, user: String) -> Self {
        self.user = Some(user);
        self
    }

    pub fn pull(&self, store: &KuzuStore) -> Result<ImportStats> {
        let sync_state = store.get_sync_state(&self.host)?;
        let after_seq = sync_state.map(|s| s.last_seq).unwrap_or(0);

        let mut child = Command::new("ssh")
            .arg("-o")
            .arg("StrictHostKeyChecking=no")
            .arg("-o")
            .arg("ServerAliveInterval=30")
            .arg(self.ssh_target())
            .arg(&self.sb_binary)
            .arg("sync-export")
            .arg("--after-seq")
            .arg(after_seq.to_string())
            .stdout(Stdio::piped())
            .stderr(Stdio::inherit())
            .spawn()
            .context("spawning ssh for pull")?;

        let stdout = child.stdout.take().unwrap();
        let reader = BufReader::new(stdout);

        let (stats, max_seq) = crate::import::import_changes(store, reader)?;

        let status = child.wait()?;
        if !status.success() {
            anyhow::bail!("ssh pull exited with {}", status);
        }

        if max_seq > after_seq {
            store.set_sync_state(&SyncState {
                peer_id: self.host.clone(),
                last_seq: max_seq,
                last_sync_at: chrono::Utc::now(),
            })?;
        }

        Ok(stats)
    }

    pub fn push(&self, store: &KuzuStore) -> Result<u64> {
        let push_peer = format!("push:{}", self.host);
        let sync_state = store.get_sync_state(&push_peer)?;
        let after_seq = sync_state.map(|s| s.last_seq).unwrap_or(0);

        let mut child = Command::new("ssh")
            .arg("-o")
            .arg("StrictHostKeyChecking=no")
            .arg("-o")
            .arg("ServerAliveInterval=30")
            .arg(self.ssh_target())
            .arg(&self.sb_binary)
            .arg("sync-import")
            .stdin(Stdio::piped())
            .stderr(Stdio::inherit())
            .spawn()
            .context("spawning ssh for push")?;

        let stdin = child.stdin.take().unwrap();
        let mut writer = BufWriter::new(stdin);

        let max_seq = crate::export::export_changes(store, after_seq, &mut writer)?;
        drop(writer);

        let status = child.wait()?;
        if !status.success() {
            anyhow::bail!("ssh push exited with {}", status);
        }

        if max_seq > after_seq {
            store.set_sync_state(&SyncState {
                peer_id: push_peer,
                last_seq: max_seq,
                last_sync_at: chrono::Utc::now(),
            })?;
        }

        Ok(max_seq)
    }

    fn ssh_target(&self) -> String {
        match &self.user {
            Some(u) => format!("{}@{}", u, self.host),
            None => self.host.clone(),
        }
    }
}