Skip to main content

remowt_client/
lib.rs

1use std::collections::HashMap;
2use std::env;
3use std::path::PathBuf;
4use std::sync::{Arc, Mutex};
5
6use anyhow::{anyhow, bail, ensure, Context as _, Result};
7use bifrostlink::declarative::RemoteEndpoints;
8use bifrostlink::{Remote, Rpc, Rtt};
9use camino::{Utf8Path, Utf8PathBuf};
10use remowt_link_shared::plugin::PluginEndpointsClient;
11use remowt_link_shared::port::child_port;
12use remowt_link_shared::{Address, BifConfig};
13use russh::client::{connect, Config, Handle, Handler, Msg, Session};
14use russh::keys::agent::client::AgentClient;
15use russh::keys::agent::AgentIdentity;
16use russh::keys::check_known_hosts;
17use russh::keys::ssh_key::PublicKey;
18use russh::Channel;
19use tempfile::TempDir;
20use tokio::io::AsyncRead;
21use tokio::net::UnixListener;
22use tokio::sync::oneshot;
23use tokio::task::JoinHandle;
24use tokio::{
25	fs,
26	io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader},
27};
28use tracing::{debug, info, warn};
29use uuid::Uuid;
30
31pub mod editor;
32mod forwarded;
33mod shell;
34mod ssh_exec;
35mod subprocess;
36
37use self::ssh_exec::SshExecChild;
38pub use self::subprocess::{RemowtChild, SpawnOptions, StderrMode, StdioMode};
39pub use forwarded::{RemowtListener, RemowtStream};
40pub use shell::{RemowtShell, RemowtShellResizer};
41
42type Subs = Arc<Mutex<HashMap<Utf8PathBuf, oneshot::Sender<Channel<Msg>>>>>;
43
44fn sh_quote(s: impl AsRef<str>) -> String {
45	format!("'{}'", s.as_ref().replace('\'', "'\\''"))
46}
47
48const ESCALATORS: [(&str, &[&str]); 2] = [("run0", &["--background=", "--pipe"]), ("sudo", &[])];
49
50pub struct AgentBundle {
51	dir: PathBuf,
52	hashes: HashMap<String, String>,
53}
54
55impl AgentBundle {
56	pub fn from_dir(dir: impl Into<PathBuf>) -> Result<Self> {
57		let dir = dir.into();
58		let hashes_path = dir.join("hashes");
59		let raw = std::fs::read_to_string(&hashes_path)
60			.with_context(|| format!("reading agent hashes at {}", hashes_path.display()))?;
61		let mut hashes = HashMap::new();
62		for line in raw.lines() {
63			let line = line.trim();
64			if line.is_empty() {
65				continue;
66			}
67			let (arch, hash) = line
68				.split_once(char::is_whitespace)
69				.ok_or_else(|| anyhow!("malformed hashes line: {line:?}"))?;
70			hashes.insert(arch.to_owned(), hash.trim().to_owned());
71		}
72		ensure!(
73			!hashes.is_empty(),
74			"agent bundle {} has no hashes",
75			dir.display()
76		);
77		Ok(Self { dir, hashes })
78	}
79
80	fn binary(&self, arch: &str) -> PathBuf {
81		self.dir.join(format!("remowt-agent-{arch}"))
82	}
83
84	fn local_binary(&self) -> Result<PathBuf> {
85		let arch = env::consts::ARCH;
86		let path = self.binary(arch);
87		ensure!(
88			path.is_file(),
89			"no local remowt-agent build for arch {arch} in bundle {}",
90			self.dir.display()
91		);
92		Ok(path)
93	}
94}
95
96async fn run(sess: &Handle<SshHandler>, cmd: &str) -> Result<(Option<u32>, Vec<u8>)> {
97	let ch = sess.channel_open_session().await?;
98	ch.exec(true, cmd).await?;
99
100	let mut child = SshExecChild::from_exec(ch);
101	drop(child.stdin);
102	drain_to_tracing(child.stderr, cmd.to_owned(), true);
103
104	let mut out = Vec::new();
105	child.stdout.read_to_end(&mut out).await?;
106	let code = child.exit.await.ok().flatten();
107	Ok((code, out))
108}
109
110async fn run_string_ok(sess: &Handle<SshHandler>, cmd: &str) -> Result<String> {
111	let (code, mut out) = run(sess, cmd).await?;
112	ensure!(
113		code == Some(0),
114		"remote command failed (exit {code:?}): {cmd}"
115	);
116	if !out.is_empty() {
117		ensure!(
118			out.ends_with(b"\n"),
119			"remote command was not newline-terminated: {cmd}: {out:?}"
120		);
121		out.pop();
122	}
123	String::from_utf8(out).context("expected utf8 output for command")
124}
125
126async fn deploy_agent(sess: &Handle<SshHandler>, bundle: &AgentBundle) -> Result<Utf8PathBuf> {
127	debug!("uname -a");
128	let arch = run_string_ok(sess, "uname -m").await?;
129	let hash = bundle
130		.hashes
131		.get(&arch)
132		.ok_or_else(|| anyhow!("no remowt-agent build for remote arch {arch:?}"))?;
133
134	debug!("get dir");
135	let cache = run_string_ok(sess, "echo \"$XDG_CACHE_HOME\"").await?;
136	let dir = if cache.is_empty() {
137		let home = run_string_ok(sess, "echo \"$HOME\"").await?;
138		ensure!(
139			!home.is_empty(),
140			"remote $HOME and $XDG_CACHE_HOME both empty"
141		);
142		Utf8PathBuf::from(home).join(".cache/remowt")
143	} else {
144		Utf8PathBuf::from(cache).join("remowt")
145	};
146	let path = dir.join(hash);
147
148	debug!("presence");
149	let (present, _) = run(sess, &format!("test -x {}", sh_quote(&path))).await?;
150	if present != Some(0) {
151		let bin = bundle.binary(&arch);
152		debug!("read");
153		let bytes = fs::read(&bin)
154			.await
155			.with_context(|| format!("reading agent binary {}", bin.display()))?;
156		debug!("upload");
157		upload_agent(sess, &dir, &path, bytes).await?;
158	}
159	Ok(path)
160}
161
162async fn upload_agent(
163	sess: &Handle<SshHandler>,
164	dir: &Utf8Path,
165	path: &Utf8Path,
166	bytes: Vec<u8>,
167) -> Result<()> {
168	debug!("mkdirp");
169	run_string_ok(sess, &format!("mkdir -p {}", sh_quote(dir))).await?;
170
171	let tmp = dir.join(format!("tmp.{}", Uuid::new_v4()));
172	let ch = sess.channel_open_session().await?;
173	debug!("cat");
174	ch.exec(true, format!("cat > {}", sh_quote(&tmp))).await?;
175
176	let mut child = SshExecChild::from_exec(ch);
177	child
178		.stdin
179		.write_all(&bytes)
180		.await
181		.context("sending agent binary")?;
182	child
183		.stdin
184		.shutdown()
185		.await
186		.context("sending agent binary")?;
187	let code = child.wait().await;
188	ensure!(code == Some(0), "agent upload failed (exit {code:?})");
189
190	debug!("chmod");
191	run_string_ok(sess, &format!("chmod 0755 {}", sh_quote(&tmp))).await?;
192	run_string_ok(
193		sess,
194		&format!("mv -f {} {}", sh_quote(&tmp), sh_quote(path)),
195	)
196	.await?;
197	Ok(())
198}
199
200pub struct SshHandler {
201	host: String,
202	port: u16,
203	subs: Subs,
204}
205impl Handler for SshHandler {
206	type Error = russh::Error;
207	async fn check_server_key(
208		&mut self,
209		server_public_key: &PublicKey,
210	) -> Result<bool, Self::Error> {
211		Ok(check_known_hosts(&self.host, self.port, server_public_key)?)
212	}
213	async fn server_channel_open_forwarded_streamlocal(
214		&mut self,
215		channel: Channel<Msg>,
216		socket_path: &str,
217		_session: &mut Session,
218	) -> Result<(), Self::Error> {
219		let Some(ch) = self
220			.subs
221			.lock()
222			.expect("lock")
223			.remove(&Utf8PathBuf::from(socket_path))
224		else {
225			return Err(russh::Error::WrongChannel);
226		};
227		let _ = ch.send(channel);
228		Ok(())
229	}
230}
231
232enum Transport {
233	Ssh {
234		sess: Arc<Handle<SshHandler>>,
235		subs: Subs,
236		runtime_dir: Utf8PathBuf,
237		agent_path: Utf8PathBuf,
238	},
239	Local {
240		agent_path: PathBuf,
241		runtime_dir: Utf8PathBuf,
242	},
243}
244
245struct RemowtInner {
246	transport: Transport,
247	rpc: Rpc<BifConfig>,
248	elevated: tokio::sync::OnceCell<()>,
249	#[allow(dead_code)]
250	children: Mutex<Vec<tokio::process::Child>>,
251	_runtime_tmp: Option<TempDir>,
252	user: String,
253}
254
255#[derive(Clone)]
256pub struct Remowt(Arc<RemowtInner>);
257
258pub type RemowtRemote = Remote<BifConfig>;
259
260impl Remowt {
261	/// Connect to the remote host over ssh, detect the architecture and deploy the required
262	/// agent binary.
263	pub async fn connect(host: &str, bundle: &AgentBundle, remowt_user: String) -> Result<Self> {
264		let conf = russh_config::parse_home(host)?;
265		let port = conf.host_config.port.or(conf.port).unwrap_or(22);
266		let hostname = conf
267			.host_config
268			.hostname
269			.clone()
270			.unwrap_or_else(|| conf.host_name.clone());
271		let user = conf
272			.user
273			.clone()
274			.unwrap_or_else(|| env::var("USER").unwrap_or_else(|_| "root".to_owned()));
275
276		let subs: Subs = Arc::new(Mutex::new(HashMap::new()));
277		let mut sess = connect(
278			Arc::new(Config::default()),
279			(hostname.clone(), port),
280			SshHandler {
281				host: hostname,
282				port,
283				subs: subs.clone(),
284			},
285		)
286		.await?;
287
288		let mut agent = AgentClient::connect_env().await?;
289		let rsa_hash = sess.best_supported_rsa_hash().await?.flatten();
290		let mut authenticated = false;
291		for ident in agent.request_identities().await? {
292			let AgentIdentity::PublicKey { key, .. } = ident else {
293				continue;
294			};
295			if sess
296				.authenticate_publickey_with(user.clone(), key, rsa_hash, &mut agent)
297				.await?
298				.success()
299			{
300				authenticated = true;
301				break;
302			}
303		}
304		ensure!(authenticated, "ssh authentication failed");
305
306		let sess = Arc::new(sess);
307
308		debug!("deploying agent");
309		let agent_path = deploy_agent(&sess, bundle).await?;
310
311		debug!("runtime dir");
312		let runtime_dir = remote_runtime_dir(&sess).await?;
313
314		let rpc = Rpc::<BifConfig>::new(Address::User);
315
316		let cmd_chan = sess.channel_open_session().await?;
317		debug!("starting agent");
318		cmd_chan
319			.exec(true, format!("{} real-agent", sh_quote(&agent_path)))
320			.await?;
321
322		let child = SshExecChild::from_exec(cmd_chan);
323		drain_to_tracing(child.stderr, "agent".to_owned(), true);
324		rpc.add_direct(
325			Address::Agent,
326			child_port(child.stdout, child.stdin),
327			Rtt(0),
328		);
329
330		Ok(Self(Arc::new(RemowtInner {
331			transport: Transport::Ssh {
332				sess,
333				subs,
334				runtime_dir,
335				agent_path,
336			},
337			rpc,
338			elevated: tokio::sync::OnceCell::new(),
339			children: Mutex::new(Vec::new()),
340			_runtime_tmp: None,
341			user: remowt_user,
342		})))
343	}
344
345	/// "Connect" to the local machine's agent, by starting the agent binary locally.
346	pub async fn connect_local(bundle: &AgentBundle, user: String) -> Result<Self> {
347		let agent_path = bundle.local_binary()?;
348		let mut child = tokio::process::Command::new(&agent_path)
349			.arg("real-agent")
350			.arg("--local")
351			.stdin(std::process::Stdio::piped())
352			.stdout(std::process::Stdio::piped())
353			.kill_on_drop(true)
354			.spawn()
355			.with_context(|| format!("spawning agent binary {}", agent_path.display()))?;
356		let stdin = child.stdin.take().expect("stdin piped");
357		let stdout = child.stdout.take().expect("stdout piped");
358
359		let rpc = Rpc::<BifConfig>::new(Address::User);
360		rpc.add_direct(Address::Agent, child_port(stdout, stdin), Rtt(0));
361
362		let (runtime_dir, runtime_tmp) = local_runtime_dir()?;
363
364		Ok(Self(Arc::new(RemowtInner {
365			transport: Transport::Local {
366				agent_path,
367				runtime_dir,
368			},
369			rpc,
370			elevated: tokio::sync::OnceCell::new(),
371			children: Mutex::new(vec![child]),
372			_runtime_tmp: runtime_tmp,
373			user,
374		})))
375	}
376
377	/// Get the handle to the underlying russh session handle.
378	pub fn ssh(&self) -> Option<Arc<Handle<SshHandler>>> {
379		match &self.0.transport {
380			Transport::Ssh { sess, .. } => Some(sess.clone()),
381			Transport::Local { .. } => None,
382		}
383	}
384
385	pub fn rpc(&self) -> Rpc<BifConfig> {
386		self.0.rpc.clone()
387	}
388
389	pub async fn load_plugin(&self, id: u16, name: &str) -> Result<()> {
390		let client: PluginEndpointsClient<BifConfig> = self.endpoints();
391		client
392			.load_plugin(id, name.to_owned())
393			.await?
394			.map_err(|e| anyhow!("agent failed to load plugin: {e}"))
395	}
396	pub async fn run0_load_plugin_path(&self, id: u16, path: &str) -> Result<()> {
397		self.ensure_escalated().await?;
398		let client: PluginEndpointsClient<BifConfig> =
399			PluginEndpointsClient::wrap(self.0.rpc.remote(Address::AgentPrivileged));
400		client
401			.load_plugin_path(id, path.to_owned())
402			.await?
403			.map_err(|e| anyhow!("privileged agent failed to load plugin: {e}"))
404	}
405	pub fn plugin_endpoints<R: RemoteEndpoints<BifConfig>>(&self, id: u16) -> R {
406		R::wrap(self.0.rpc.remote(Address::Plugin(id)))
407	}
408
409	pub fn endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> R {
410		R::wrap(self.0.rpc.remote(Address::Agent))
411	}
412	pub async fn run0_endpoints<R: RemoteEndpoints<BifConfig>>(&self) -> Result<R> {
413		self.ensure_escalated().await?;
414		Ok(R::wrap(self.0.rpc.remote(Address::AgentPrivileged)))
415	}
416
417	async fn ensure_escalated(&self) -> Result<()> {
418		self.0
419			.elevated
420			.get_or_try_init(|| async {
421				let (agent_path, local) = match &self.0.transport {
422					Transport::Ssh { agent_path, .. } => (agent_path.as_str().to_owned(), false),
423					Transport::Local { agent_path, .. } => (
424						agent_path
425							.to_str()
426							.ok_or_else(|| anyhow!("local agent path is not utf-8"))?
427							.to_owned(),
428						true,
429					),
430				};
431
432				let (tool, flags) = self.detect_escalation().await?;
433				let mut args: Vec<String> = Vec::new();
434				args.push("-w".to_owned());
435				args.push(tool.to_owned());
436				args.extend(flags.iter().copied().map(str::to_owned));
437				if tool == "run0" {
438					args.push(format!(
439						"--unit={}-{}.service",
440						self.0.user,
441						Uuid::new_v4().simple()
442					));
443				}
444				args.push(agent_path);
445				args.push("real-agent".to_owned());
446				args.push("--privileged".to_owned());
447				if local {
448					args.push("--local".to_owned());
449				}
450
451				let child = self
452					.spawn(SpawnOptions {
453						program: "setsid".to_owned(),
454						args,
455						stdin: StdioMode::Pipe,
456						stdout: StdioMode::Pipe,
457						stderr: StderrMode::Inherit,
458						..Default::default()
459					})
460					.await
461					.context("spawning privileged agent")?;
462
463				let stdin = child
464					.stdin
465					.ok_or_else(|| anyhow!("privileged agent stdin missing"))?;
466				let stdout = child
467					.stdout
468					.ok_or_else(|| anyhow!("privileged agent stdout missing"))?;
469
470				let port = child_port(stdout, stdin);
471				self.0
472					.rpc
473					.add_direct(Address::AgentPrivileged, port, Rtt(0));
474				anyhow::Ok(())
475			})
476			.await?;
477		Ok(())
478	}
479
480	async fn detect_escalation(&self) -> Result<(&'static str, &'static [&'static str])> {
481		for (tool, flags) in ESCALATORS {
482			let probe = self
483				.spawn(SpawnOptions {
484					program: (*tool).to_owned(),
485					args: vec!["--version".to_owned()],
486					stdout: StdioMode::Null,
487					stderr: StderrMode::Null,
488					..Default::default()
489				})
490				.await;
491			if let Ok(child) = probe {
492				let _ = child.wait().await;
493				return Ok((tool, flags));
494			}
495		}
496		bail!("no escalation tool found")
497	}
498
499	/// XDG_RUNTIME_DIR on the remote machine.
500	pub fn runtime_dir(&self) -> Utf8PathBuf {
501		match &self.0.transport {
502			Transport::Ssh { runtime_dir, .. } => runtime_dir.clone(),
503			Transport::Local { runtime_dir, .. } => runtime_dir.clone(),
504		}
505	}
506
507	/// Bind unix listener socket on the remote machine with auto-generated path, returning the path.
508	pub async fn bind_runtime_unix(&self, hint: &str) -> Result<(RemowtListener, Utf8PathBuf)> {
509		let sock = self
510			.runtime_dir()
511			.join(format!("remowt-{hint}-{}.sock", Uuid::new_v4()));
512		let listener = self.bind_unix(&sock).await?;
513		Ok((listener, sock))
514	}
515
516	/// Bind unix listener socket on the remote machine on the specified path.
517	pub async fn bind_unix(&self, path: &Utf8Path) -> Result<RemowtListener> {
518		match &self.0.transport {
519			Transport::Ssh { sess, subs, .. } => {
520				let (tx, rx) = oneshot::channel();
521				subs.lock().expect("lock").insert(path.to_owned(), tx);
522				sess.streamlocal_forward(path.to_owned()).await?;
523				Ok(RemowtListener::Ssh(rx))
524			}
525			Transport::Local { .. } => {
526				let _ = std::fs::remove_file(path);
527				Ok(RemowtListener::Local(
528					UnixListener::bind(path)?,
529					path.to_owned(),
530				))
531			}
532		}
533	}
534}
535
536pub(crate) fn drain_to_tracing(
537	stream: impl AsyncRead + Unpin + 'static + Send,
538	context: String,
539	stderr: bool,
540) -> JoinHandle<()> {
541	tokio::spawn(async move {
542		let mut reader = BufReader::new(stream);
543		let mut buf = Vec::with_capacity(4096);
544		loop {
545			buf.clear();
546			match reader.read_until(b'\n', &mut buf).await {
547				Ok(0) => break,
548				Ok(_) => {
549					let line = String::from_utf8_lossy(buf.strip_suffix(b"\n").unwrap_or(&buf));
550					if stderr {
551						warn!(context = %context, "{line}");
552					} else {
553						info!(context = %context, "{line}");
554					}
555				}
556				Err(e) => {
557					warn!(context = %context, "child stdio read failed: {e}");
558					break;
559				}
560			}
561		}
562	})
563}
564
565fn local_runtime_dir() -> Result<(Utf8PathBuf, Option<TempDir>)> {
566	if let Ok(dir) = env::var("XDG_RUNTIME_DIR") {
567		if !dir.is_empty() {
568			return Ok((Utf8PathBuf::from(dir), None));
569		}
570	}
571	let tmp = tempfile::Builder::new()
572		.prefix("remowt.")
573		.rand_bytes(12)
574		.tempdir()?;
575	let dir = Utf8PathBuf::from_path_buf(tmp.path().to_owned())
576		.map_err(|p| anyhow!("temp dir {} is not utf-8", p.display()))?;
577	Ok((dir, Some(tmp)))
578}
579
580async fn remote_runtime_dir(sess: &Handle<SshHandler>) -> Result<Utf8PathBuf> {
581	let dir = run_string_ok(sess, "echo \"$XDG_RUNTIME_DIR\"").await?;
582	let dir = dir.trim();
583	if dir.is_empty() {
584		let tmp = run_string_ok(sess, "mktemp -d remowt.XXXXXXXXXXXX --tmpdir").await?;
585		Ok(Utf8PathBuf::from(tmp))
586	} else {
587		Ok(Utf8PathBuf::from(dir))
588	}
589}