second_brain_sync/
transport.rs1use std::io::{BufReader, BufWriter};
2use std::process::{Command, Stdio};
3
4use anyhow::{Context, Result};
5
6use second_brain_core::kuzu_store::KuzuStore;
7use second_brain_core::schema::SyncState;
8
9use crate::import::ImportStats;
10
11pub struct SshTransport {
12 pub host: String,
13 pub user: Option<String>,
14 pub sb_binary: String,
15}
16
17impl SshTransport {
18 pub fn new(host: String) -> Self {
19 Self {
20 host,
21 user: None,
22 sb_binary: "sb".to_string(),
23 }
24 }
25
26 pub fn with_binary(mut self, path: String) -> Self {
27 self.sb_binary = path;
28 self
29 }
30
31 pub fn with_user(mut self, user: String) -> Self {
32 self.user = Some(user);
33 self
34 }
35
36 pub fn pull(&self, store: &KuzuStore) -> Result<ImportStats> {
37 let sync_state = store.get_sync_state(&self.host)?;
38 let after_seq = sync_state.map(|s| s.last_seq).unwrap_or(0);
39
40 let mut child = Command::new("ssh")
41 .arg("-o")
42 .arg("StrictHostKeyChecking=no")
43 .arg("-o")
44 .arg("ServerAliveInterval=30")
45 .arg(self.ssh_target())
46 .arg(&self.sb_binary)
47 .arg("sync-export")
48 .arg("--after-seq")
49 .arg(after_seq.to_string())
50 .stdout(Stdio::piped())
51 .stderr(Stdio::inherit())
52 .spawn()
53 .context("spawning ssh for pull")?;
54
55 let stdout = child.stdout.take().unwrap();
56 let reader = BufReader::new(stdout);
57
58 let (stats, max_seq) = crate::import::import_changes(store, reader)?;
59
60 let status = child.wait()?;
61 if !status.success() {
62 anyhow::bail!("ssh pull exited with {}", status);
63 }
64
65 if max_seq > after_seq {
66 store.set_sync_state(&SyncState {
67 peer_id: self.host.clone(),
68 last_seq: max_seq,
69 last_sync_at: chrono::Utc::now(),
70 })?;
71 }
72
73 Ok(stats)
74 }
75
76 pub fn push(&self, store: &KuzuStore) -> Result<u64> {
77 let push_peer = format!("push:{}", self.host);
78 let sync_state = store.get_sync_state(&push_peer)?;
79 let after_seq = sync_state.map(|s| s.last_seq).unwrap_or(0);
80
81 let mut child = Command::new("ssh")
82 .arg("-o")
83 .arg("StrictHostKeyChecking=no")
84 .arg("-o")
85 .arg("ServerAliveInterval=30")
86 .arg(self.ssh_target())
87 .arg(&self.sb_binary)
88 .arg("sync-import")
89 .stdin(Stdio::piped())
90 .stderr(Stdio::inherit())
91 .spawn()
92 .context("spawning ssh for push")?;
93
94 let stdin = child.stdin.take().unwrap();
95 let mut writer = BufWriter::new(stdin);
96
97 let max_seq = crate::export::export_changes(store, after_seq, &mut writer)?;
98 drop(writer);
99
100 let status = child.wait()?;
101 if !status.success() {
102 anyhow::bail!("ssh push exited with {}", status);
103 }
104
105 if max_seq > after_seq {
106 store.set_sync_state(&SyncState {
107 peer_id: push_peer,
108 last_seq: max_seq,
109 last_sync_at: chrono::Utc::now(),
110 })?;
111 }
112
113 Ok(max_seq)
114 }
115
116 fn ssh_target(&self) -> String {
117 match &self.user {
118 Some(u) => format!("{}@{}", u, self.host),
119 None => self.host.clone(),
120 }
121 }
122}