use std::io::{BufReader, BufWriter};
use std::process::{Command, Stdio};
use anyhow::{Context, Result};
use second_brain_core::kuzu_store::KuzuStore;
use second_brain_core::schema::SyncState;
use crate::import::ImportStats;
pub struct SshTransport {
pub host: String,
pub user: Option<String>,
pub sb_binary: String,
}
impl SshTransport {
pub fn new(host: String) -> Self {
Self {
host,
user: None,
sb_binary: "sb".to_string(),
}
}
pub fn with_binary(mut self, path: String) -> Self {
self.sb_binary = path;
self
}
pub fn with_user(mut self, user: String) -> Self {
self.user = Some(user);
self
}
pub fn pull(&self, store: &KuzuStore) -> Result<ImportStats> {
let sync_state = store.get_sync_state(&self.host)?;
let after_seq = sync_state.map(|s| s.last_seq).unwrap_or(0);
let mut child = Command::new("ssh")
.arg("-o")
.arg("StrictHostKeyChecking=no")
.arg("-o")
.arg("ServerAliveInterval=30")
.arg(self.ssh_target())
.arg(&self.sb_binary)
.arg("sync-export")
.arg("--after-seq")
.arg(after_seq.to_string())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.context("spawning ssh for pull")?;
let stdout = child.stdout.take().unwrap();
let reader = BufReader::new(stdout);
let (stats, max_seq) = crate::import::import_changes(store, reader)?;
let status = child.wait()?;
if !status.success() {
anyhow::bail!("ssh pull exited with {}", status);
}
if max_seq > after_seq {
store.set_sync_state(&SyncState {
peer_id: self.host.clone(),
last_seq: max_seq,
last_sync_at: chrono::Utc::now(),
})?;
}
Ok(stats)
}
pub fn push(&self, store: &KuzuStore) -> Result<u64> {
let push_peer = format!("push:{}", self.host);
let sync_state = store.get_sync_state(&push_peer)?;
let after_seq = sync_state.map(|s| s.last_seq).unwrap_or(0);
let mut child = Command::new("ssh")
.arg("-o")
.arg("StrictHostKeyChecking=no")
.arg("-o")
.arg("ServerAliveInterval=30")
.arg(self.ssh_target())
.arg(&self.sb_binary)
.arg("sync-import")
.stdin(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.context("spawning ssh for push")?;
let stdin = child.stdin.take().unwrap();
let mut writer = BufWriter::new(stdin);
let max_seq = crate::export::export_changes(store, after_seq, &mut writer)?;
drop(writer);
let status = child.wait()?;
if !status.success() {
anyhow::bail!("ssh push exited with {}", status);
}
if max_seq > after_seq {
store.set_sync_state(&SyncState {
peer_id: push_peer,
last_seq: max_seq,
last_sync_at: chrono::Utc::now(),
})?;
}
Ok(max_seq)
}
fn ssh_target(&self) -> String {
match &self.user {
Some(u) => format!("{}@{}", u, self.host),
None => self.host.clone(),
}
}
}