mod conditional_sudo_command;
mod limit;
mod picker;
mod progress;
mod secret;
use clap::Parser;
use eyre::{ContextCompat as _, WrapErr as _};
use futures_util::future::try_join_all;
use openssh::Stdio;
use std::collections::{HashMap, HashSet};
use std::io::IsTerminal as _;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader};
use tracing::Instrument as _;
use crate::conditional_sudo_command::ConditionalSudoCommand as _;
trait ExitOk {
fn exit_ok(self) -> eyre::Result<()>;
}
impl ExitOk for std::process::ExitStatus {
fn exit_ok(self) -> eyre::Result<()> {
if self.success() {
Ok(())
} else {
eyre::bail!("process exited with {self}");
}
}
}
trait OutputOrError: Sized {
fn ok_or_die(self) -> eyre::Result<Self>;
}
impl OutputOrError for std::process::Output {
fn ok_or_die(self) -> eyre::Result<Self> {
if !self.status.success() {
eyre::bail!("{}", String::from_utf8_lossy(&self.stderr));
}
Ok(self)
}
}
#[derive(Debug, clap::Subcommand)]
enum Subcommand {
Play {
#[clap(long)]
limit: Option<String>,
#[clap(long)]
unprivileged: bool,
},
Inventory {
#[clap(long)]
host: Option<String>,
#[clap(long)]
json: bool,
#[clap(long)]
reveal: bool,
},
#[clap(subcommand)]
Secret(secret::SecretCommand),
}
#[derive(Debug, Parser)]
struct Args {
#[command(subcommand)]
subcommand: Subcommand,
}
#[derive(Debug)]
struct DiscoveredHost {
host: cindy::Host<serde_json::Value>,
session: openssh::Session,
platform: &'static platforms::Platform,
use_sudo: bool,
}
async fn forward_stderr(
mut reader: impl AsyncBufRead + Unpin,
tx: tokio::sync::mpsc::UnboundedSender<progress::Msg>,
idx: usize,
source: progress::Source,
) {
let mut line = String::new();
while reader.read_line(&mut line).await.unwrap_or(0) > 0 {
let _ = tx.send(progress::Msg::Line(idx, source, line.trim_end().to_owned()));
line.clear();
}
}
struct PulledInventory {
inventory: cindy::Inventory<serde_json::Value>,
debug: std::collections::BTreeMap<String, String>,
json_revealed: Option<Result<serde_json::Value, String>>,
}
async fn get_inventory(
orchestrator_exe_path: &Path,
reveal_secrets: bool,
) -> eyre::Result<PulledInventory> {
tracing::info!("Pulling inventory from orchestrator binary...");
let mut cmd = tokio::process::Command::new(orchestrator_exe_path);
cmd.env("CINDY_DUMP_INVENTORY", "1")
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
if reveal_secrets {
cmd.env("CINDY_REVEAL_SECRETS", "1");
}
let dump = cmd.output().await?;
if !dump.status.success() {
eyre::bail!(
"inventory dump exited with {:?}\n{}",
dump.status,
String::from_utf8_lossy(&dump.stderr)
);
}
let dump: cindy::inventory::InventoryDump = serde_json::from_slice(&dump.stdout)
.context("Failed to parse inventory dump from orchestrator")?;
let inventory = serde_json::from_value(dump.json)
.context("Failed to parse inventory JSON from orchestrator dump")?;
Ok(PulledInventory {
inventory,
debug: dump.debug,
json_revealed: dump.json_revealed,
})
}
fn workspace_root() -> eyre::Result<PathBuf> {
let meta = cargo_metadata::MetadataCommand::new().exec()?;
Ok(meta.workspace_root.into_std_path_buf())
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
let args = Args::parse();
color_eyre::install()?;
let workspace_root = workspace_root()?;
if std::io::stderr().is_terminal() {
tracing_subscriber::fmt()
.with_ansi(true)
.with_writer(progress::LogWriter)
.init();
} else {
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.init();
}
if let Subcommand::Secret(cmd) = args.subcommand {
let orch = if secret::requires_orchestrator(&cmd) {
Some(compile_orchestrator(&workspace_root).await?)
} else {
None
};
return secret::dispatch(cmd, orch.as_deref()).await;
}
let is_play = matches!(args.subcommand, Subcommand::Play { .. });
let (progress_tx, progress_rx) = tokio::sync::mpsc::unbounded_channel::<progress::Msg>();
let view = is_play.then(|| tokio::spawn(progress::run(progress_rx)));
let result: eyre::Result<()> = run_after_launch(
args.subcommand,
workspace_root,
progress_tx.clone(),
)
.await;
drop(progress_tx);
if let Some(view) = view {
let _ = view.await;
}
match result {
Ok(()) => {
tracing::info!("All pipeline target tracking execution runs completed successfully!");
Ok(())
}
Err(e) if e.downcast_ref::<RunFailed>().is_some() => {
eprintln!("\x1b[31m{e}\x1b[0m");
std::process::exit(1);
}
Err(e) => Err(e),
}
}
async fn run_after_launch(
subcommand: Subcommand,
workspace_root: PathBuf,
progress_tx: tokio::sync::mpsc::UnboundedSender<progress::Msg>,
) -> eyre::Result<()> {
let orchestrator_exe_path = compile_orchestrator(&workspace_root).await?;
let reveal_secrets = matches!(subcommand, Subcommand::Inventory { reveal: true, .. });
let PulledInventory {
inventory,
debug,
json_revealed,
} = get_inventory(&orchestrator_exe_path, reveal_secrets).await?;
let (limit, unprivileged) = match subcommand {
Subcommand::Play {
limit,
unprivileged,
} => (limit, unprivileged),
Subcommand::Inventory { host, json, reveal } => {
if json {
let whole: serde_json::Value = if reveal {
match json_revealed
.context("orchestrator dump omitted revealed JSON despite `--reveal`")?
{
Ok(v) => v,
Err(e) => eyre::bail!("failed to reveal secrets for JSON output: {e}"),
}
} else {
serde_json::to_value(&inventory)?
};
let value = match &host {
Some(host_name) => whole
.get("hosts")
.and_then(|h| h.as_array())
.and_then(|hosts| {
hosts
.iter()
.find(|h| h.get("name").and_then(|n| n.as_str()) == Some(host_name))
})
.cloned()
.with_context(|| {
format!("No such host `{host_name}` found in inventory")
})?,
None => whole,
};
println!("{}", serde_json::to_string_pretty(&value)?);
return Ok(());
}
let key = host
.as_deref()
.unwrap_or(cindy::inventory::WHOLE_INVENTORY_KEY);
let rendered = debug.get(key).with_context(|| {
if host.is_some() {
format!("No such host `{key}` found in inventory")
} else {
"Orchestrator dump did not include a whole-inventory rendering".to_owned()
}
})?;
println!("{rendered}");
return Ok(());
}
Subcommand::Secret(_) => unreachable!("handled above"),
};
let selected_indices = limit::select_indices(
&inventory.hosts,
limit.as_deref(),
|h| h.name.as_str(),
|h| h.tags.as_slice(),
)?;
let selected: Vec<cindy::Host<serde_json::Value>> = selected_indices
.into_iter()
.map(|i| inventory.hosts[i].clone())
.collect();
if selected.is_empty() {
tracing::warn!(
limit,
"no hosts matched the `--limit` expression; nothing to do"
);
return Ok(());
}
tracing::info!(count = selected.len(), "selected hosts after --limit");
tracing::info!("Initiating pre-flight checks across targets...");
let discovery_futures = selected.into_iter().map(|host| async move {
let session = openssh::Session::connect(&host.name, openssh::KnownHosts::Add).await?;
let uname_out = session
.command("uname")
.args(["-m"])
.output()
.await?
.ok_or_die()?;
let machine_arch = String::from_utf8(uname_out.stdout)?.trim().to_string();
let triplet = format!("{machine_arch}-unknown-linux-musl");
let platform = platforms::Platform::find(&triplet)
.context(format!("Couldn't find platform by triplet: '{triplet}'"))?;
let id_out = session
.command("id")
.args(["-u"])
.output()
.await?
.ok_or_die()?;
let use_sudo = !unprivileged && String::from_utf8(id_out.stdout)?.trim() != "0";
Ok::<DiscoveredHost, eyre::Error>(DiscoveredHost {
host,
session,
platform,
use_sudo,
})
});
let discovered_hosts = try_join_all(discovery_futures).await?;
for host in &discovered_hosts {
let _ = progress_tx.send(progress::Msg::AddHost {
name: host.host.name.clone(),
tags: host.host.tags.clone(),
});
}
let unique_platforms: HashSet<&'static platforms::Platform> =
discovered_hosts.iter().map(|h| h.platform).collect();
tracing::info!(component = "rust-src", "installing component");
std::process::Command::new("rustup")
.args(["component", "add", "rust-src"])
.output()?
.ok_or_die()?;
for platform in &unique_platforms {
if matches!(platform.tier, platforms::Tier::Three) {
continue;
}
tracing::info!(triplet = platform.target_triple, "installing target");
std::process::Command::new("rustup")
.args(["target", "add", platform.target_triple])
.output()?
.ok_or_die()?;
}
let cargo_meta = Arc::new(cargo_metadata::MetadataCommand::new().exec()?);
let root = Arc::new(workspace_root.clone());
let compilation_futures =
unique_platforms.into_iter().map(|platform| {
let meta = Arc::clone(&cargo_meta);
let root = Arc::clone(&root);
async move {
Ok::<_, eyre::Error>((
platform,
compile_remote_worker(platform, &meta, &root).await?,
))
}
});
let compilation_results = Arc::new(
try_join_all(compilation_futures)
.await?
.into_iter()
.collect::<HashMap<_, _>>(),
);
tracing::info!("Pre-flighting vault keys across targets...");
let in_code_vaults = dump_in_code_vaults(&orchestrator_exe_path).await?;
let mut host_contexts: HashMap<String, String> = HashMap::new();
let mut host_vault_keys: HashMap<String, HashMap<String, String>> = HashMap::new();
let mut preflight_failures: Vec<String> = Vec::new();
for host in &discovered_hosts {
let host_context_json =
serde_json::to_string(&host.host).expect("Failed to serialise host context");
let needed = host_needed_vaults(&in_code_vaults, &host_context_json);
let (keys, missing) = collect_vault_keys(&needed);
if !missing.is_empty() {
preflight_failures.push(format!(
" {}: missing key(s) for vault(s) {missing:?}",
host.host.name
));
}
host_contexts.insert(host.host.name.clone(), host_context_json);
host_vault_keys.insert(host.host.name.clone(), keys);
}
if !preflight_failures.is_empty() {
eyre::bail!(
"vault key preflight failed — refusing to start. The following \
hosts are missing decryption keys:\n{}\n\nProvision the key file(s) \
(`cindy secret vault create <name>`, or copy `keys/<name>.dek` from \
a teammate) and re-run.",
preflight_failures.join("\n")
);
}
tracing::info!("Vault key preflight passed for all targets.");
tracing::info!("Launching parallel deployment loops...");
let orchestrator_path = Arc::new(orchestrator_exe_path);
let host_contexts = Arc::new(host_contexts);
let host_vault_keys = Arc::new(host_vault_keys);
let deployment_futures = discovered_hosts.into_iter().enumerate().map(|(idx, host)| {
let orch_path = Arc::clone(&orchestrator_path);
let compilation_results = Arc::clone(&compilation_results);
let host_contexts = Arc::clone(&host_contexts);
let host_vault_keys = Arc::clone(&host_vault_keys);
let progress_tx = progress_tx.clone();
async move {
let hostname = host.host.name.clone();
let context_span = tracing::info_span!("run", %hostname);
let progress_status = progress_tx.clone();
let result = async move {
let local_worker_path = compilation_results
.get(host.platform)
.expect("Not compiled");
let remote_tmp_path = format!(
"/tmp/.cindy-worker.{}",
rand::distr::SampleString::sample_string(&rand::distr::Alphanumeric, &mut rand::rng(), 16),
);
let mut upload = host
.session
.conditional_sudo_command(host.use_sudo, "sh")
.args(["-c", &format!("umask 0266 && cat >'{remote_tmp_path}' && chmod u+x '{remote_tmp_path}'")])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.await?;
let upload_stderr = BufReader::new(upload.stderr().take().unwrap());
tokio::spawn(forward_stderr(
upload_stderr,
progress_tx.clone(),
idx,
progress::Source::Worker,
));
if let Some(mut stdin) = upload.stdin().take() {
let bin = std::fs::read(local_worker_path)?;
let _ = stdin.write_all(&bin).await;
}
upload.wait().await?.exit_ok()?;
let host_context_json = host_contexts
.get(&hostname)
.expect("preflight populated every host's context")
.clone();
let vault_keys_json = serde_json::to_string(
host_vault_keys
.get(&hostname)
.expect("preflight populated every host's vault keys"),
)
.expect("Failed to serialise vault key map");
let mut remote_run = host
.session
.conditional_sudo_command(host.use_sudo, &remote_tmp_path)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.await?;
let mut orchestrator_run = tokio::process::Command::new(&*orch_path)
.env("CINDY_HOST_CONTEXT", &host_context_json)
.env("CINDY_VAULT_KEYS", &vault_keys_json)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()?;
let mut r_stdout = remote_run.stdout().take().unwrap();
let mut r_stdin = remote_run.stdin().take().unwrap();
let r_stderr = BufReader::new(remote_run.stderr().take().unwrap());
let mut o_stdout = orchestrator_run.stdout.take().unwrap();
let mut o_stdin = orchestrator_run.stdin.take().unwrap();
let o_stderr = BufReader::new(orchestrator_run.stderr.take().unwrap());
let bridge_c =
forward_stderr(r_stderr, progress_tx.clone(), idx, progress::Source::Worker);
let bridge_d = forward_stderr(
o_stderr,
progress_tx.clone(),
idx,
progress::Source::Orchestrator,
);
tokio::spawn(bridge_c);
tokio::spawn(bridge_d);
tokio::select! {
biased; res = tokio::io::copy(&mut r_stdout, &mut o_stdin) => {
tracing::debug!("Remote stdout -> Orchestrator stdin stream closed: {:?}", res);
}
res = async {
let _ = tokio::io::copy(&mut o_stdout, &mut r_stdin).await;
let _ = r_stdin.shutdown().await;
} => {
tracing::debug!("Orchestrator stdout -> Remote stdin stream closed: {:?}", res);
}
}
let orch_exit = orchestrator_run.wait().await?;
drop(remote_run);
if !orch_exit.success() {
eyre::bail!("play failed (orchestrator exited with {orch_exit})");
}
tracing::info!("Execution successfully completed.");
Ok::<(), eyre::Error>(())
}
.instrument(context_span)
.await;
let status = if let Err(e) = &result {
for line in format!("{e:?}").lines() {
let _ = progress_status.send(progress::Msg::Line(
idx,
progress::Source::Orchestrator,
line.to_owned(),
));
}
progress::Status::Failed
} else {
progress::Status::Finished
};
let _ = progress_status.send(progress::Msg::Status(idx, status));
result
}
});
let deployment_futures: Vec<_> = deployment_futures.collect();
drop(progress_tx);
let results = futures_util::future::join_all(deployment_futures).await;
let failed = results.iter().filter(|r| r.is_err()).count();
if failed > 0 {
eyre::bail!(RunFailed(failed));
}
Ok(())
}
#[derive(Debug)]
struct RunFailed(usize);
impl std::fmt::Display for RunFailed {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} host(s) failed", self.0)
}
}
impl std::error::Error for RunFailed {}
async fn dump_in_code_vaults(orchestrator_exe_path: &Path) -> eyre::Result<Vec<String>> {
let out = tokio::process::Command::new(orchestrator_exe_path)
.env("CINDY_DUMP_VAULTS", "1")
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.output()
.await?;
if !out.status.success() {
eyre::bail!(
"vault enumeration exited with {:?}\n{}",
out.status,
String::from_utf8_lossy(&out.stderr)
);
}
serde_json::from_slice(&out.stdout).context("Failed to parse vault list from orchestrator")
}
fn collect_vault_keys(
needed: &std::collections::BTreeSet<String>,
) -> (HashMap<String, String>, Vec<String>) {
use base64::Engine as _;
let mut map = HashMap::new();
let mut missing = Vec::new();
for vault in needed {
match std::fs::read(cindy::secret::keychain::dek_path(vault)) {
Ok(bytes) => {
let b64 = base64::engine::general_purpose::STANDARD.encode(&bytes);
map.insert(vault.clone(), b64);
}
Err(_) => missing.push(vault.clone()),
}
}
(map, missing)
}
fn host_needed_vaults(
in_code: &[String],
host_context_json: &str,
) -> std::collections::BTreeSet<String> {
let mut needed: std::collections::BTreeSet<String> = in_code.iter().cloned().collect();
if let Ok(value) = serde_json::from_str::<serde_json::Value>(host_context_json) {
cindy::inventory::collect_sealed_vaults(&value, &mut needed);
}
needed
}
async fn compile_orchestrator(workspace_root: &Path) -> eyre::Result<PathBuf> {
tracing::info!("compiling orchestrator binary");
let target_dir = workspace_root.join("target/cindy/orchestrator");
let output = tokio::process::Command::new("cargo")
.args([
"build",
"--release",
"--message-format=json",
"--no-default-features",
"--features",
"orchestrator",
"--target-dir",
])
.arg(&target_dir)
.output()
.await?;
if !output.status.success() {
eyre::bail!("{}", String::from_utf8(output.stderr).unwrap())
}
let stdout_str = String::from_utf8(output.stdout)?;
let exe_path = stdout_str
.lines()
.filter_map(|l| serde_json::from_str::<serde_json::Value>(l).ok())
.filter_map(|json| json["executable"].as_str().map(PathBuf::from))
.next_back()
.context("Failed to extract orchestrator executable path")?;
Ok(exe_path)
}
#[tracing::instrument(skip_all, fields(triplet = platform.target_triple))]
async fn compile_remote_worker(
platform: &'static platforms::Platform,
metadata: &cargo_metadata::Metadata,
workspace_root: &Path,
) -> eyre::Result<PathBuf> {
tracing::info!(platform.target_triple, "compiling remote binary");
let target_dir = workspace_root.join(format!("target/cindy/remote/{}", platform.target_triple));
let mut cmd = tokio::process::Command::new("cross");
cmd.args([
"build",
"--release",
"--message-format=json",
"--target",
platform.target_triple,
"--no-default-features",
"--features",
"remote",
"--target-dir",
])
.arg(&target_dir);
if matches!(platform.tier, platforms::Tier::Three) {
tracing::warn!("tier 3 target, building with `-Zbuild-std`");
cmd.arg("-Zbuild-std");
}
if let Some(pkg) = metadata.packages.iter().find(|p| p.name == "cindy")
&& pkg.source.is_none()
&& let Some(pkg_dir) = pkg.manifest_path.parent()
{
let metadata = cargo_metadata::MetadataCommand::new()
.current_dir(pkg_dir)
.exec()?;
let framework_workspace = metadata.workspace_root;
cmd.env(
"CROSS_CONTAINER_OPTS",
format!("-v {framework_workspace}:{framework_workspace}"),
);
}
let output = cmd.output().await?;
if !output.status.success() {
eyre::bail!("{}", String::from_utf8(output.stderr).unwrap())
}
let stdout_str = String::from_utf8(output.stdout)?;
let exe_name = stdout_str
.lines()
.filter_map(|l| serde_json::from_str::<serde_json::Value>(l).ok())
.filter_map(|json| json["executable"].as_str().map(PathBuf::from))
.next_back()
.context("Failed to extract remote executable path")?
.file_name()
.context("remote executable path had no file name")?
.to_owned();
let exe_path = target_dir
.join(platform.target_triple)
.join("release")
.join(exe_name);
Ok(exe_path)
}