use std::path::PathBuf;
use anyhow::{Context, Result, bail};
use clap::{Args, Subcommand};
use kanade_shared::kv::{
BUCKET_AGENT_CONFIG, KEY_AGENT_CONFIG_GLOBAL, OBJECT_AGENT_RELEASES, agent_config_group_key,
agent_config_pc_key,
};
use kanade_shared::subject;
use kanade_shared::wire::{ConfigScope, LogsRequest};
use tokio::fs;
use tracing::info;
#[derive(Args, Debug)]
pub struct AgentArgs {
#[command(subcommand)]
pub sub: AgentSub,
}
#[derive(Subcommand, Debug)]
pub enum AgentSub {
Publish {
binary: PathBuf,
#[arg(long)]
version: Option<String>,
},
Rollout(RolloutArgs),
Current,
Logs {
pc_id: String,
#[arg(long, default_value_t = 500)]
tail: u32,
},
}
#[derive(Args, Debug)]
pub struct RolloutArgs {
pub version: String,
#[arg(long, conflicts_with_all = ["group", "pc"])]
pub global: bool,
#[arg(long, value_name = "NAME")]
pub group: Option<String>,
#[arg(long, value_name = "PC_ID")]
pub pc: Option<String>,
#[arg(long, value_name = "DURATION")]
pub jitter: Option<String>,
}
pub async fn execute(client: async_nats::Client, args: AgentArgs) -> Result<()> {
match args.sub {
AgentSub::Publish { binary, version } => publish(client, binary, version).await,
AgentSub::Rollout(args) => rollout(client, args).await,
AgentSub::Current => current(client).await,
AgentSub::Logs { pc_id, tail } => logs(client, pc_id, tail).await,
}
}
async fn publish(
client: async_nats::Client,
binary: PathBuf,
version: Option<String>,
) -> Result<()> {
let version = match version {
Some(v) => v,
None => probe_binary_version(&binary).with_context(|| {
format!(
"auto-detect version from {binary:?} via `--version` — \
pass --version explicitly if the binary can't be executed here \
(e.g. cross-arch publish from Linux/macOS)"
)
})?,
};
let bytes = fs::read(&binary)
.await
.with_context(|| format!("read {binary:?}"))?;
info!(version, size = bytes.len(), "uploading new agent binary");
let js = async_nats::jetstream::new(client);
let store = js
.get_object_store(OBJECT_AGENT_RELEASES)
.await
.with_context(|| {
format!("object store '{OBJECT_AGENT_RELEASES}' missing — run `kanade jetstream setup`")
})?;
let mut cursor = std::io::Cursor::new(bytes);
let meta = store
.put(version.as_str(), &mut cursor)
.await
.context("object_store.put")?;
info!(version, digest = ?meta.digest, "agent binary uploaded");
println!("published: {version}");
println!(" object_store : {OBJECT_AGENT_RELEASES}/{version}");
println!();
println!("Next: target a scope with `kanade agent rollout`:");
println!(" kanade agent rollout {version} --group canary --jitter 5m # try on canary first");
println!(" kanade agent rollout {version} --global --jitter 30m # fleet-wide");
Ok(())
}
async fn rollout(client: async_nats::Client, args: RolloutArgs) -> Result<()> {
let (key, label) = match (args.global, args.group.as_deref(), args.pc.as_deref()) {
(true, None, None) => (KEY_AGENT_CONFIG_GLOBAL.to_string(), "global".to_string()),
(false, Some(g), None) => (agent_config_group_key(g), format!("group:{g}")),
(false, None, Some(p)) => (agent_config_pc_key(p), format!("pc:{p}")),
(false, None, None) => bail!(
"must pick a scope: --global / --group <name> / --pc <pc_id>. \
Refusing to rollout — explicit scope keeps a forgotten flag from \
fanning a release out to every agent."
),
_ => bail!("--global / --group / --pc are mutually exclusive"),
};
let js = async_nats::jetstream::new(client);
let store = js
.get_object_store(OBJECT_AGENT_RELEASES)
.await
.with_context(|| {
format!("object store '{OBJECT_AGENT_RELEASES}' missing — run `kanade jetstream setup`")
})?;
store.info(&args.version).await.with_context(|| {
format!(
"version '{}' not found in {OBJECT_AGENT_RELEASES} — run \
`kanade agent publish <binary> --version {}` first",
args.version, args.version
)
})?;
let kv = js
.get_key_value(BUCKET_AGENT_CONFIG)
.await
.with_context(|| {
format!("KV '{BUCKET_AGENT_CONFIG}' missing — run `kanade jetstream setup`")
})?;
let mut scope = match kv.get(&key).await? {
Some(b) => serde_json::from_slice::<ConfigScope>(&b)
.with_context(|| format!("decode existing {BUCKET_AGENT_CONFIG}.{key}"))?,
None => ConfigScope::default(),
};
scope.target_version = Some(args.version.clone());
if let Some(j) = args.jitter.as_deref() {
scope.target_version_jitter = Some(j.to_owned());
}
let payload = serde_json::to_vec(&scope).context("encode ConfigScope")?;
kv.put(key.as_str(), payload.into())
.await
.context("KV put scope ConfigScope")?;
info!(
scope = %label,
version = %args.version,
jitter = ?args.jitter,
"rollout: target_version flipped",
);
println!("rolled out: {} -> {}", label, args.version);
println!(
" kv : {BUCKET_AGENT_CONFIG}.{key}.target_version = {}",
args.version
);
if let Some(j) = args.jitter.as_deref() {
println!(" kv : {BUCKET_AGENT_CONFIG}.{key}.target_version_jitter = {j}");
} else {
println!(
" jitter : (unchanged — explicit `--jitter <duration>` recommended for fleets ≥ 100)"
);
}
Ok(())
}
fn probe_binary_version(binary: &std::path::Path) -> Result<String> {
let out = std::process::Command::new(binary)
.arg("--version")
.output()
.with_context(|| format!("run `{} --version`", binary.display()))?;
if !out.status.success() {
anyhow::bail!(
"`{} --version` exited {} (stderr: {})",
binary.display(),
out.status,
String::from_utf8_lossy(&out.stderr).trim()
);
}
let line = String::from_utf8(out.stdout)
.context("`--version` stdout is not UTF-8")?
.lines()
.next()
.unwrap_or_default()
.trim()
.to_owned();
let token = line
.split_whitespace()
.nth(1)
.with_context(|| format!("unexpected --version output shape: {line:?}"))?;
Ok(token.to_owned())
}
async fn logs(client: async_nats::Client, pc_id: String, tail: u32) -> Result<()> {
let req = LogsRequest { tail_lines: tail };
let payload = serde_json::to_vec(&req).context("encode LogsRequest")?;
let reply = tokio::time::timeout(
std::time::Duration::from_secs(10),
client.request(subject::logs_fetch(&pc_id), payload.into()),
)
.await
.with_context(|| format!("timeout waiting for {pc_id} (10s)"))?
.with_context(|| format!("request logs.fetch.{pc_id}"))?;
use std::io::Write;
std::io::stdout().write_all(&reply.payload).ok();
Ok(())
}
async fn current(client: async_nats::Client) -> Result<()> {
let js = async_nats::jetstream::new(client);
let kv = js
.get_key_value(BUCKET_AGENT_CONFIG)
.await
.with_context(|| format!("KV '{BUCKET_AGENT_CONFIG}' missing"))?;
match kv.get(KEY_AGENT_CONFIG_GLOBAL).await? {
Some(b) => {
let scope: ConfigScope = serde_json::from_slice(&b)
.with_context(|| format!("decode {BUCKET_AGENT_CONFIG}.global"))?;
match scope.target_version {
Some(v) => println!("global.target_version = {v}"),
None => println!("global.target_version = (unset)"),
}
}
None => println!("global = (unset)"),
}
Ok(())
}