#![feature(exit_status_error)]
mod conditional_sudo_command;
mod limit;
mod secret;
use clap::Parser;
use eyre::{ContextCompat as _, WrapErr as _};
use futures_util::future::try_join_all;
use openssh::Stdio;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tracing::Instrument as _;
use crate::conditional_sudo_command::ConditionalSudoCommand as _;
#[derive(Debug, clap::Subcommand)]
enum Subcommand {
Play {
#[clap(long)]
limit: Option<String>,
#[clap(long)]
unprivileged: bool,
},
Inventory {
#[clap(long)]
host: Option<String>,
},
#[clap(subcommand)]
Secret(secret::SecretCommand),
}
#[derive(Debug, Parser)]
struct Args {
#[command(subcommand)]
subcommand: Subcommand,
}
#[derive(Debug)]
struct DiscoveredHost {
host: cindy::Host<serde_json::Value>,
session: openssh::Session,
platform: &'static platforms::Platform,
use_sudo: bool,
}
async fn get_inventory(
orchestrator_exe_path: &Path,
) -> eyre::Result<cindy::Inventory<serde_json::Value>> {
tracing::info!("Pulling inventory from orchestrator binary...");
let dump = tokio::process::Command::new(orchestrator_exe_path)
.env("CINDY_DUMP_INVENTORY", "1")
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.output()
.await?;
if !dump.status.success() {
eyre::bail!(
"inventory dump exited with {:?}\n{}",
dump.status,
String::from_utf8_lossy(&dump.stderr)
);
}
serde_json::from_slice(&dump.stdout)
.context("Failed to parse inventory JSON from orchestrator dump")
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
let args = Args::parse();
color_eyre::install()?;
tracing_subscriber::fmt().init();
if let Subcommand::Secret(cmd) = args.subcommand {
let orch = if secret::requires_orchestrator(&cmd) {
Some(compile_orchestrator().await?)
} else {
None
};
return secret::dispatch(cmd, orch.as_deref()).await;
}
let orchestrator_exe_path = compile_orchestrator().await?;
let inventory = get_inventory(&orchestrator_exe_path).await?;
let (limit, unprivileged) = match args.subcommand {
Subcommand::Play {
limit,
unprivileged,
} => (limit, unprivileged),
Subcommand::Inventory { host } => {
if let Some(host_name) = host {
let host = inventory
.hosts
.into_iter()
.find(|host| host.name == host_name)
.context("No such host found in inventory")?;
println!("{host:#?}");
} else {
println!("{inventory:#?}");
}
return Ok(());
}
Subcommand::Secret(_) => unreachable!("handled above"),
};
let selected_indices = limit::select_indices(
&inventory.hosts,
limit.as_deref(),
|h| h.name.as_str(),
|h| h.tags.as_slice(),
)?;
let selected: Vec<cindy::Host<serde_json::Value>> = selected_indices
.into_iter()
.map(|i| inventory.hosts[i].clone())
.collect();
if selected.is_empty() {
tracing::warn!(
limit,
"no hosts matched the `--limit` expression; nothing to do"
);
return Ok(());
}
tracing::info!(count = selected.len(), "selected hosts after --limit");
tracing::info!("Initiating pre-flight checks across targets...");
let discovery_futures = selected.into_iter().map(|host| async move {
let session = openssh::Session::connect(&host.name, openssh::KnownHosts::Add).await?;
let uname_out = session
.command("uname")
.args(["-m"])
.output()
.await?
.exit_ok()?;
let machine_arch = String::from_utf8(uname_out.stdout)?.trim().to_string();
let triplet = format!("{machine_arch}-unknown-linux-musl");
let platform = platforms::Platform::find(&triplet)
.context(format!("Couldn't find platform by triplet: '{triplet}'"))?;
let id_out = session
.command("id")
.args(["-u"])
.output()
.await?
.exit_ok()?;
let use_sudo = !unprivileged && String::from_utf8(id_out.stdout)?.trim() != "0";
Ok::<DiscoveredHost, eyre::Error>(DiscoveredHost {
host,
session,
platform,
use_sudo,
})
});
let discovered_hosts = try_join_all(discovery_futures).await?;
let unique_platforms: HashSet<&'static platforms::Platform> =
discovered_hosts.iter().map(|h| h.platform).collect();
trait OutputOrError: Sized {
fn ok_or_die(self) -> eyre::Result<Self>;
}
impl OutputOrError for std::process::Output {
fn ok_or_die(self) -> eyre::Result<Self> {
if !self.status.success() {
eyre::bail!("{}", String::from_utf8(self.stderr).unwrap())
}
Ok(self)
}
}
tracing::info!(component = "rust-src", "installing component");
std::process::Command::new("rustup")
.args(["component", "add", "rust-src"])
.output()?
.ok_or_die()?;
for platform in &unique_platforms {
if matches!(platform.tier, platforms::Tier::Three) {
continue;
}
tracing::info!(triplet = platform.target_triple, "installing target");
std::process::Command::new("rustup")
.args(["target", "add", platform.target_triple])
.output()?
.ok_or_die()?;
}
let cargo_meta = Arc::new(cargo_metadata::MetadataCommand::new().exec()?);
let compilation_futures =
unique_platforms.into_iter().map(|platform| {
let meta = Arc::clone(&cargo_meta);
async move {
Ok::<_, eyre::Error>((platform, compile_remote_worker(platform, &meta).await?))
}
});
let compilation_results = Arc::new(
try_join_all(compilation_futures)
.await?
.into_iter()
.collect::<HashMap<_, _>>(),
);
let orchestrator_exe_path = compile_orchestrator().await?;
tracing::info!("Launching parallel deployment loops...");
let orchestrator_path = Arc::new(orchestrator_exe_path);
let deployment_futures = discovered_hosts.into_iter().map(|host| {
let orch_path = Arc::clone(&orchestrator_path);
let compilation_results = Arc::clone(&compilation_results);
async move {
let hostname = host.host.name.clone();
let context_span = tracing::info_span!("run", %hostname);
async move {
let local_worker_path = compilation_results
.get(host.platform)
.expect("Not compiled");
let remote_tmp_path = format!(
"/tmp/.cindy-worker.{}",
rand::distr::SampleString::sample_string(&rand::distr::Alphanumeric, &mut rand::rng(), 16),
);
let mut upload = host
.session
.conditional_sudo_command(host.use_sudo, "sh")
.args(["-c", &format!("umask 0266 && cat >'{remote_tmp_path}' && chmod u+x '{remote_tmp_path}'")])
.stdin(Stdio::piped())
.spawn()
.await?;
if let Some(mut stdin) = upload.stdin().take() {
stdin.write_all(&std::fs::read(local_worker_path)?).await?;
}
upload.wait().await?.exit_ok()?;
let mut remote_run = host
.session
.conditional_sudo_command(host.use_sudo, &remote_tmp_path)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.await?;
let host_context_json = serde_json::to_string(&host.host)
.expect("Failed to serialise host context");
let mut orchestrator_run = tokio::process::Command::new(&*orch_path)
.env("CINDY_HOST_CONTEXT", host_context_json)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()?;
let mut r_stdout = remote_run.stdout().take().unwrap();
let mut r_stdin = remote_run.stdin().take().unwrap();
let mut r_stderr = BufReader::new(remote_run.stderr().take().unwrap());
let mut o_stdout = orchestrator_run.stdout.take().unwrap();
let mut o_stdin = orchestrator_run.stdin.take().unwrap();
let mut o_stderr = BufReader::new(orchestrator_run.stderr.take().unwrap());
let prefix_r = hostname.clone();
let bridge_c = async move {
let mut line = String::new();
while r_stderr.read_line(&mut line).await.unwrap_or(0) > 0 {
eprint!("\x1b[36m[{prefix_r}]\x1b[0m {line}");
line.clear();
}
};
let prefix_o = hostname.clone();
let bridge_d = async move {
let mut line = String::new();
while o_stderr.read_line(&mut line).await.unwrap_or(0) > 0 {
eprint!("\x1b[33m[{prefix_o}]\x1b[0m {line}");
line.clear();
}
};
tokio::spawn(bridge_c);
tokio::spawn(bridge_d);
tokio::select! {
biased; res = tokio::io::copy(&mut r_stdout, &mut o_stdin) => {
tracing::debug!("Remote stdout -> Orchestrator stdin stream closed: {:?}", res);
}
res = async move {
let _ = tokio::io::copy(&mut o_stdout, &mut r_stdin).await;
let _ = r_stdin.shutdown().await;
} => {
tracing::debug!("Orchestrator stdout -> Remote stdin stream closed: {:?}", res);
}
exit = remote_run.wait() => {
tracing::error!("Remote worker process exited unexpectedly: {exit:?}");
}
exit = orchestrator_run.wait() => {
tracing::error!("Local orchestrator process exited unexpectedly: {exit:?}");
}
}
tracing::info!("Execution successfully completed.");
Ok::<(), eyre::Error>(())
}
.instrument(context_span)
.await
}
});
try_join_all(deployment_futures).await?;
tracing::info!("All pipeline target tracking execution runs completed successfully!");
Ok(())
}
async fn compile_orchestrator() -> eyre::Result<PathBuf> {
tracing::info!("compiling orchestrator binary");
let output = tokio::process::Command::new("cargo")
.args([
"build",
"--release",
"--message-format=json",
"--no-default-features",
"--features",
"orchestrator",
"--target-dir",
"target/cindy/orchestrator",
])
.output()
.await?;
if !output.status.success() {
eyre::bail!("{}", String::from_utf8(output.stderr).unwrap())
}
let stdout_str = String::from_utf8(output.stdout)?;
let exe_path = stdout_str
.lines()
.filter_map(|l| serde_json::from_str::<serde_json::Value>(l).ok())
.filter_map(|json| json["executable"].as_str().map(PathBuf::from))
.next_back()
.context("Failed to extract orchestrator executable path")?;
Ok(exe_path)
}
#[tracing::instrument(skip_all, fields(triplet = platform.target_triple))]
async fn compile_remote_worker(
platform: &'static platforms::Platform,
metadata: &cargo_metadata::Metadata,
) -> eyre::Result<PathBuf> {
tracing::info!(platform.target_triple, "compiling remote binary");
let mut cmd = tokio::process::Command::new("cross");
cmd.args([
"build",
"--release",
"--message-format=json",
"--target",
platform.target_triple,
"--no-default-features",
"--features",
"remote",
"--target-dir",
&format!("target/cindy/remote/{}", platform.target_triple),
]);
if matches!(platform.tier, platforms::Tier::Three) {
tracing::warn!("tier 3 target, building with `-Zbuild-std`");
cmd.arg("-Zbuild-std");
}
if let Some(pkg) = metadata.packages.iter().find(|p| p.name == "cindy")
&& pkg.source.is_none()
&& let Some(pkg_dir) = pkg.manifest_path.parent()
{
let metadata = cargo_metadata::MetadataCommand::new()
.current_dir(pkg_dir)
.exec()?;
let framework_workspace = metadata.workspace_root;
cmd.env(
"CROSS_CONTAINER_OPTS",
format!("-v {framework_workspace}:{framework_workspace}"),
);
}
let output = cmd.output().await?;
if !output.status.success() {
eyre::bail!("{}", String::from_utf8(output.stderr).unwrap())
}
let stdout_str = String::from_utf8(output.stdout)?;
let exe_path = stdout_str
.lines()
.filter_map(|l| serde_json::from_str::<serde_json::Value>(l).ok())
.filter_map(|json| json["executable"].as_str().map(String::from))
.next_back()
.context("Failed to extract remote executable path")?
.replace(
"/target",
&format!(
"{}/target/cindy/remote/{}",
metadata.workspace_root, platform.target_triple
),
);
Ok(PathBuf::from(exe_path))
}