1use std::collections::HashMap;
2use std::env;
3use std::path::PathBuf;
4use std::sync::{Arc, Mutex};
5
6use anyhow::{anyhow, bail, ensure, Context as _, Result};
7use bifrostlink::declarative::RemoteEndpoints;
8use bifrostlink::{Remote, Rpc, Rtt};
9use camino::{Utf8Path, Utf8PathBuf};
10use remowt_link_shared::plugin::PluginEndpointsClient;
11use remowt_link_shared::port::child_port;
12use remowt_link_shared::{Address, BifConfig};
13use russh::client::{connect, Config, Handle, Handler, Msg, Session};
14use russh::keys::agent::client::AgentClient;
15use russh::keys::agent::AgentIdentity;
16use russh::keys::check_known_hosts;
17use russh::keys::ssh_key::PublicKey;
18use russh::Channel;
19use tempfile::TempDir;
20use tokio::io::AsyncRead;
21use tokio::net::UnixListener;
22use tokio::sync::oneshot;
23use tokio::task::JoinHandle;
24use tokio::{
25 fs,
26 io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader},
27};
28use tracing::{debug, info, warn};
29use uuid::Uuid;
30
31pub mod editor;
32mod forwarded;
33mod shell;
34mod ssh_exec;
35mod subprocess;
36
37use self::ssh_exec::SshExecChild;
38pub use self::subprocess::{RemowtChild, SpawnOptions, StderrMode, StdioMode};
39pub use forwarded::{RemowtListener, RemowtStream};
40pub use shell::{RemowtShell, RemowtShellResizer};
41
42type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;
43
44fn sh_quote(s: impl AsRef<str>) -> String {
45 format!("'{}'", s.as_ref().replace('\'', "'\\''"))
46}
47
48const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];
49
50pub struct AgentBundle {
51 dir: PathBuf,
52 hashes: HashMap<String, String>,
53}
54
55impl AgentBundle {
56 pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {
57 let dir = dir.into();
58 let hashes_path = dir.join("hashes");
59 let raw = std::fs::read_to_string(&hashes_path)
60 .with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;
61 let mut hashes = HashMap::new();
62 for line in raw.lines() {
63 let line = line.trim();
64 if line.is_empty() {
65 continue;
66 }
67 let (arch, hash) = line
68 .split_once(char::is_whitespace)
69 .ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;
70 hashes.insert(arch.to_owned(), hash.trim().to_owned());
71 }
72 ensure!(
73 !hashes.is_empty(),
74 "agent bundle {} has no hashes",
75 dir.display()
76 );
77 Ok(Self { dir, hashes })
78 }
79
80 fn binary(&self, arch: &str) -> PathBuf {
81 self.dir.join(format!("remowt-agent-{arch}"))
82 }
83
84 fn local_binary(&self) -> Result<PathBuf> {
85 let arch = env::consts::ARCH;
86 let path = self.binary(arch);
87 ensure!(
88 path.is_file(),
89 "no local remowt-agent build for arch {arch} in bundle {}",
90 self.dir.display()
91 );
92 Ok(path)
93 }
94}
95
96async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {
97 let ch = sess.channel_open_session().await?;
98 ch.exec(true, cmd).await?;
99
100 let mut child = SshExecChild::from_exec(ch);
101 drop(child.stdin);
102 drain_to_tracing(child.stderr, cmd.to_owned(), true);
103
104 let mut out = Vec::new();
105 child.stdout.read_to_end(&mut out).await?;
106 let code = child.exit.await.ok().flatten();
107 Ok((code, out))
108}
109
110async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {
111 let (code, mut out) = run(sess, cmd).await?;
112 ensure!(
113 code == Some(0),
114 "remote command failed (exit {code:?}): {cmd}"
115 );
116 if !out.is_empty() {
117 ensure!(
118 out.ends_with(b"\n"),
119 "remote command was not newline-terminated: {cmd}: {out:?}"
120 );
121 out.pop();
122 }
123 String::from_utf8(out).context("expected utf8 output for command")
124}
125
126async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {
127 debug!("uname -a");
128 let arch = run_string_ok(sess, "uname -m").await?;
129 let hash = bundle
130 .hashes
131 .get(&arch)
132 .ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;
133
134 debug!("get dir");
135 let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"").await?;
136 let dir = if cache.is_empty() {
137 let home = run_string_ok(sess, "echo \"$HOME\"").await?;
138 ensure!(
139 !home.is_empty(),
140 "remote $HOME and $XDG_CACHE_HOME both empty"
141 );
142 Utf8PathBuf::from(home).join(".cache/remowt")
143 } else {
144 Utf8PathBuf::from(cache).join("remowt")
145 };
146 let path = dir.join(hash);
147
148 debug!("presence");
149 let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;
150 if present != Some(0) {
151 let bin = bundle.binary(&arch);
152 debug!("read");
153 let bytes = fs::read(&bin)
154 .await
155 .with_context(|| format!("reading agent binary {}", bin.display()))?;
156 debug!("upload");
157 upload_agent(sess, &dir, &path, bytes).await?;
158 }
159 Ok(path)
160}
161
162async fn upload_agent(
163 sess: &Handle<SshHandler>,
164 dir: &Utf8Path,
165 path: &Utf8Path,
166 bytes: Vec<u8>,
167) -> Result<()> {
168 debug!("mkdirp");
169 run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;
170
171 let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));
172 let ch = sess.channel_open_session().await?;
173 debug!("cat");
174 ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;
175
176 let mut child = SshExecChild::from_exec(ch);
177 child
178 .stdin
179 .write_all(&bytes)
180 .await
181 .context("sending agent binary")?;
182 child
183 .stdin
184 .shutdown()
185 .await
186 .context("sending agent binary")?;
187 let code = child.wait().await;
188 ensure!(code == Some(0), "agent upload failed (exit {code:?})");
189
190 debug!("chmod");
191 run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;
192 run_string_ok(
193 sess,
194 &format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),
195 )
196 .await?;
197 Ok(())
198}
199
200pub struct SshHandler {
201 host: String,
202 port: u16,
203 subs: Subs,
204}
205impl Handler for SshHandler {
206 type Error = russh::Error;
207 async fn check_server_key(
208 &mut self,
209 server_public_key: &PublicKey,
210 ) -> Result<bool, Self::Error> {
211 Ok(check_known_hosts(&self.host, self.port, server_public_key)?)
212 }
213 async fn server_channel_open_forwarded_streamlocal(
214 &mut self,
215 channel: Channel<Msg>,
216 socket_path: &str,
217 _session: &mut Session,
218 ) -> Result<(), Self::Error> {
219 let Some(ch) = self
220 .subs
221 .lock()
222 .expect("lock")
223 .remove(&Utf8PathBuf::from(socket_path))
224 else {
225 return Err(russh::Error::WrongChannel);
226 };
227 let _ = ch.send(channel);
228 Ok(())
229 }
230}
231
232enum Transport {
233 Ssh {
234 sess: Arc<Handle<SshHandler>>,
235 subs: Subs,
236 runtime_dir: Utf8PathBuf,
237 agent_path: Utf8PathBuf,
238 },
239 Local {
240 agent_path: PathBuf,
241 runtime_dir: Utf8PathBuf,
242 },
243}
244
245struct RemowtInner {
246 transport: Transport,
247 rpc: Rpc<BifConfig>,
248 elevated: tokio::sync::OnceCell<()>,
249 #[allow(dead_code)]
250 children: Mutex<Vec<tokio::process::Child>>,
251 _runtime_tmp: Option<TempDir>,
252 user: String,
253}
254
255#[derive(Clone)]
256pub struct Remowt(Arc<RemowtInner>);
257
258pub type RemowtRemote = Remote<BifConfig>;
259
260impl Remowt {
261 pub async fn connect(host: &str, bundle: &AgentBundle, remowt_user: String) -> Result<Self> {
264 let conf = russh_config::parse_home(host)?;
265 let port = conf.host_config.port.or(conf.port).unwrap_or(22);
266 let hostname = conf
267 .host_config
268 .hostname
269 .clone()
270 .unwrap_or_else(|| conf.host_name.clone());
271 let user = conf
272 .user
273 .clone()
274 .unwrap_or_else(|| env::var("USER").unwrap_or_else(|_| "root".to_owned()));
275
276 let subs: Subs = Arc::new(Mutex::new(HashMap::new()));
277 let mut sess = connect(
278 Arc::new(Config::default()),
279 (hostname.clone(), port),
280 SshHandler {
281 host: hostname,
282 port,
283 subs: subs.clone(),
284 },
285 )
286 .await?;
287
288 let mut agent = AgentClient::connect_env().await?;
289 let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();
290 let mut authenticated = false;
291 for ident in agent.request_identities().await? {
292 let AgentIdentity::PublicKey { key, .. } = ident else {
293 continue;
294 };
295 if sess
296 .authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)
297 .await?
298 .success()
299 {
300 authenticated = true;
301 break;
302 }
303 }
304 ensure!(authenticated, "ssh authentication failed");
305
306 let sess = Arc::new(sess);
307
308 debug!("deploying agent");
309 let agent_path = deploy_agent(&sess, bundle).await?;
310
311 debug!("runtime dir");
312 let runtime_dir = remote_runtime_dir(&sess).await?;
313
314 let rpc = Rpc::<BifConfig>::new(Address::User);
315
316 let cmd_chan = sess.channel_open_session().await?;
317 debug!("starting agent");
318 cmd_chan
319 .exec(true, format!("{} real-agent", sh_quote(&agent_path)))
320 .await?;
321
322 let child = SshExecChild::from_exec(cmd_chan);
323 drain_to_tracing(child.stderr, "agent".to_owned(), true);
324 rpc.add_direct(
325 Address::Agent,
326 child_port(child.stdout, child.stdin),
327 Rtt(0),
328 );
329
330 Ok(Self(Arc::new(RemowtInner {
331 transport: Transport::Ssh {
332 sess,
333 subs,
334 runtime_dir,
335 agent_path,
336 },
337 rpc,
338 elevated: tokio::sync::OnceCell::new(),
339 children: Mutex::new(Vec::new()),
340 _runtime_tmp: None,
341 user: remowt_user,
342 })))
343 }
344
345 pub async fn connect_local(bundle: &AgentBundle, user: String) -> Result<Self> {
347 let agent_path = bundle.local_binary()?;
348 let mut child = tokio::process::Command::new(&agent_path)
349 .arg("real-agent")
350 .arg("--local")
351 .stdin(std::process::Stdio::piped())
352 .stdout(std::process::Stdio::piped())
353 .kill_on_drop(true)
354 .spawn()
355 .with_context(|| format!("spawning agent binary {}", agent_path.display()))?;
356 let stdin = child.stdin.take().expect("stdin piped");
357 let stdout = child.stdout.take().expect("stdout piped");
358
359 let rpc = Rpc::<BifConfig>::new(Address::User);
360 rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));
361
362 let (runtime_dir, runtime_tmp) = local_runtime_dir()?;
363
364 Ok(Self(Arc::new(RemowtInner {
365 transport: Transport::Local {
366 agent_path,
367 runtime_dir,
368 },
369 rpc,
370 elevated: tokio::sync::OnceCell::new(),
371 children: Mutex::new(vec![child]),
372 _runtime_tmp: runtime_tmp,
373 user,
374 })))
375 }
376
377 pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {
379 match &self.0.transport {
380 Transport::Ssh { sess, .. } => Some(sess.clone()),
381 Transport::Local { .. } => None,
382 }
383 }
384
385 pub fn rpc(&self) -> Rpc<BifConfig> {
386 self.0.rpc.clone()
387 }
388
389 pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {
390 let client: PluginEndpointsClient<BifConfig> = self.endpoints();
391 client
392 .load_plugin(id, name.to_owned())
393 .await?
394 .map_err(|e| anyhow!("agent failed to load plugin: {e}"))
395 }
396 pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {
397 self.ensure_escalated().await?;
398 let client: PluginEndpointsClient<BifConfig> =
399 PluginEndpointsClient::wrap(self.0.rpc.remote(Address::AgentPrivileged));
400 client
401 .load_plugin_path(id, path.to_owned())
402 .await?
403 .map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))
404 }
405 pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {
406 R::wrap(self.0.rpc.remote(Address::Plugin(id)))
407 }
408
409 pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {
410 R::wrap(self.0.rpc.remote(Address::Agent))
411 }
412 pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {
413 self.ensure_escalated().await?;
414 Ok(R::wrap(self.0.rpc.remote(Address::AgentPrivileged)))
415 }
416
417 async fn ensure_escalated(&self) -> Result<()> {
418 self.0
419 .elevated
420 .get_or_try_init(|| async {
421 let (agent_path, local) = match &self.0.transport {
422 Transport::Ssh { agent_path, .. } => (agent_path.as_str().to_owned(), false),
423 Transport::Local { agent_path, .. } => (
424 agent_path
425 .to_str()
426 .ok_or_else(|| anyhow!("local agent path is not utf-8"))?
427 .to_owned(),
428 true,
429 ),
430 };
431
432 let (tool, flags) = self.detect_escalation().await?;
433 let mut args: Vec<String> = Vec::new();
434 args.push("-w".to_owned());
435 args.push(tool.to_owned());
436 args.extend(flags.iter().copied().map(str::to_owned));
437 if tool == "run0" {
438 args.push(format!(
439 "--unit={}-{}.service",
440 self.0.user,
441 Uuid::new_v4().simple()
442 ));
443 }
444 args.push(agent_path);
445 args.push("real-agent".to_owned());
446 args.push("--privileged".to_owned());
447 if local {
448 args.push("--local".to_owned());
449 }
450
451 let child = self
452 .spawn(SpawnOptions {
453 program: "setsid".to_owned(),
454 args,
455 stdin: StdioMode::Pipe,
456 stdout: StdioMode::Pipe,
457 stderr: StderrMode::Inherit,
458 ..Default::default()
459 })
460 .await
461 .context("spawning privileged agent")?;
462
463 let stdin = child
464 .stdin
465 .ok_or_else(|| anyhow!("privileged agent stdin missing"))?;
466 let stdout = child
467 .stdout
468 .ok_or_else(|| anyhow!("privileged agent stdout missing"))?;
469
470 let port = child_port(stdout, stdin);
471 self.0
472 .rpc
473 .add_direct(Address::AgentPrivileged, port, Rtt(0));
474 anyhow::Ok(())
475 })
476 .await?;
477 Ok(())
478 }
479
480 async fn detect_escalation(&self) -> Result<(&'static str, &'static [&'static str])> {
481 for (tool, flags) in ESCALATORS {
482 let probe = self
483 .spawn(SpawnOptions {
484 program: (*tool).to_owned(),
485 args: vec!["--version".to_owned()],
486 stdout: StdioMode::Null,
487 stderr: StderrMode::Null,
488 ..Default::default()
489 })
490 .await;
491 if let Ok(child) = probe {
492 let _ = child.wait().await;
493 return Ok((tool, flags));
494 }
495 }
496 bail!("no escalation tool found")
497 }
498
499 pub fn runtime_dir(&self) -> Utf8PathBuf {
501 match &self.0.transport {
502 Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),
503 Transport::Local { runtime_dir, .. } => runtime_dir.clone(),
504 }
505 }
506
507 pub async fn bind_runtime_unix(&self, hint: &str) -> Result<(RemowtListener, Utf8PathBuf)> {
509 let sock = self
510 .runtime_dir()
511 .join(format!("remowt-{hint}-{}.sock", Uuid::new_v4()));
512 let listener = self.bind_unix(&sock).await?;
513 Ok((listener, sock))
514 }
515
516 pub async fn bind_unix(&self, path: &Utf8Path) -> Result<RemowtListener> {
518 match &self.0.transport {
519 Transport::Ssh { sess, subs, .. } => {
520 let (tx, rx) = oneshot::channel();
521 subs.lock().expect("lock").insert(path.to_owned(), tx);
522 sess.streamlocal_forward(path.to_owned()).await?;
523 Ok(RemowtListener::Ssh(rx))
524 }
525 Transport::Local { .. } => {
526 let _ = std::fs::remove_file(path);
527 Ok(RemowtListener::Local(
528 UnixListener::bind(path)?,
529 path.to_owned(),
530 ))
531 }
532 }
533 }
534}
535
536pub(crate) fn drain_to_tracing(
537 stream: impl AsyncRead + Unpin + 'static + Send,
538 context: String,
539 stderr: bool,
540) -> JoinHandle<()> {
541 tokio::spawn(async move {
542 let mut reader = BufReader::new(stream);
543 let mut buf = Vec::with_capacity(4096);
544 loop {
545 buf.clear();
546 match reader.read_until(b'\n', &mut buf).await {
547 Ok(0) => break,
548 Ok(_) => {
549 let line = String::from_utf8_lossy(buf.strip_suffix(b"\n").unwrap_or(&buf));
550 if stderr {
551 warn!(context = %context, "{line}");
552 } else {
553 info!(context = %context, "{line}");
554 }
555 }
556 Err(e) => {
557 warn!(context = %context, "child stdio read failed: {e}");
558 break;
559 }
560 }
561 }
562 })
563}
564
565fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {
566 if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {
567 if !dir.is_empty() {
568 return Ok((Utf8PathBuf::from(dir), None));
569 }
570 }
571 let tmp = tempfile::Builder::new()
572 .prefix("remowt.")
573 .rand_bytes(12)
574 .tempdir()?;
575 let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())
576 .map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;
577 Ok((dir, Some(tmp)))
578}
579
580async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {
581 let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;
582 let dir = dir.trim();
583 if dir.is_empty() {
584 let tmp = run_string_ok(sess, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir").await?;
585 Ok(Utf8PathBuf::from(tmp))
586 } else {
587 Ok(Utf8PathBuf::from(dir))
588 }
589}