Skip to main content

second_brain_sync/
transport.rs

1use std::io::{BufReader, BufWriter};
2use std::process::{Command, Stdio};
3
4use anyhow::{Context, Result};
5
6use second_brain_core::kuzu_store::KuzuStore;
7use second_brain_core::schema::SyncState;
8
9use crate::import::ImportStats;
10
11pub struct SshTransport {
12    pub host: String,
13    pub user: Option<String>,
14    pub sb_binary: String,
15}
16
17impl SshTransport {
18    pub fn new(host: String) -> Self {
19        Self {
20            host,
21            user: None,
22            sb_binary: "sb".to_string(),
23        }
24    }
25
26    pub fn with_binary(mut self, path: String) -> Self {
27        self.sb_binary = path;
28        self
29    }
30
31    pub fn with_user(mut self, user: String) -> Self {
32        self.user = Some(user);
33        self
34    }
35
36    pub fn pull(&self, store: &KuzuStore) -> Result<ImportStats> {
37        let sync_state = store.get_sync_state(&self.host)?;
38        let after_seq = sync_state.map(|s| s.last_seq).unwrap_or(0);
39
40        let mut child = Command::new("ssh")
41            .arg("-o")
42            .arg("StrictHostKeyChecking=no")
43            .arg("-o")
44            .arg("ServerAliveInterval=30")
45            .arg(self.ssh_target())
46            .arg(&self.sb_binary)
47            .arg("sync-export")
48            .arg("--after-seq")
49            .arg(after_seq.to_string())
50            .stdout(Stdio::piped())
51            .stderr(Stdio::inherit())
52            .spawn()
53            .context("spawning ssh for pull")?;
54
55        let stdout = child.stdout.take().unwrap();
56        let reader = BufReader::new(stdout);
57
58        let (stats, max_seq) = crate::import::import_changes(store, reader)?;
59
60        let status = child.wait()?;
61        if !status.success() {
62            anyhow::bail!("ssh pull exited with {}", status);
63        }
64
65        if max_seq > after_seq {
66            store.set_sync_state(&SyncState {
67                peer_id: self.host.clone(),
68                last_seq: max_seq,
69                last_sync_at: chrono::Utc::now(),
70            })?;
71        }
72
73        Ok(stats)
74    }
75
76    pub fn push(&self, store: &KuzuStore) -> Result<u64> {
77        let push_peer = format!("push:{}", self.host);
78        let sync_state = store.get_sync_state(&push_peer)?;
79        let after_seq = sync_state.map(|s| s.last_seq).unwrap_or(0);
80
81        let mut child = Command::new("ssh")
82            .arg("-o")
83            .arg("StrictHostKeyChecking=no")
84            .arg("-o")
85            .arg("ServerAliveInterval=30")
86            .arg(self.ssh_target())
87            .arg(&self.sb_binary)
88            .arg("sync-import")
89            .stdin(Stdio::piped())
90            .stderr(Stdio::inherit())
91            .spawn()
92            .context("spawning ssh for push")?;
93
94        let stdin = child.stdin.take().unwrap();
95        let mut writer = BufWriter::new(stdin);
96
97        let max_seq = crate::export::export_changes(store, after_seq, &mut writer)?;
98        drop(writer);
99
100        let status = child.wait()?;
101        if !status.success() {
102            anyhow::bail!("ssh push exited with {}", status);
103        }
104
105        if max_seq > after_seq {
106            store.set_sync_state(&SyncState {
107                peer_id: push_peer,
108                last_seq: max_seq,
109                last_sync_at: chrono::Utc::now(),
110            })?;
111        }
112
113        Ok(max_seq)
114    }
115
116    fn ssh_target(&self) -> String {
117        match &self.user {
118            Some(u) => format!("{}@{}", u, self.host),
119            None => self.host.clone(),
120        }
121    }
122}