use std::path::PathBuf;
use anyhow::{Context, Result, bail};
use clap::{Args, Subcommand};
use kanade_shared::kv::{
BUCKET_AGENT_CONFIG, KEY_AGENT_CONFIG_GLOBAL, OBJECT_AGENT_RELEASES, agent_config_group_key,
agent_config_pc_key,
};
use kanade_shared::subject;
use kanade_shared::wire::{ConfigScope, LogsRequest};
use tokio::fs;
use tracing::info;
#[derive(Args, Debug)]
pub struct AgentArgs {
#[command(subcommand)]
pub sub: AgentSub,
}
#[derive(Subcommand, Debug)]
pub enum AgentSub {
Publish {
binary: PathBuf,
},
Rollout(RolloutArgs),
Current,
Logs {
pc_id: String,
#[arg(long, default_value_t = 500)]
tail: u32,
},
}
#[derive(Args, Debug)]
pub struct RolloutArgs {
pub version: String,
#[arg(long, conflicts_with_all = ["group", "pc"])]
pub global: bool,
#[arg(long, value_name = "NAME")]
pub group: Option<String>,
#[arg(long, value_name = "PC_ID")]
pub pc: Option<String>,
#[arg(long, value_name = "DURATION")]
pub jitter: Option<String>,
}
pub async fn execute(client: async_nats::Client, args: AgentArgs) -> Result<()> {
match args.sub {
AgentSub::Publish { binary } => publish(client, binary).await,
AgentSub::Rollout(args) => rollout(client, args).await,
AgentSub::Current => current(client).await,
AgentSub::Logs { pc_id, tail } => logs(client, pc_id, tail).await,
}
}
async fn publish(client: async_nats::Client, binary: PathBuf) -> Result<()> {
let bytes = fs::read(&binary)
.await
.with_context(|| format!("read {binary:?}"))?;
let version = kanade_shared::exe_version::extract_pe_version(&bytes).with_context(|| {
format!(
"couldn't extract VERSIONINFO from {binary:?} — is it a Windows PE built \
with `winres` (kanade ≥ v0.13.1)? Older binaries need to be re-published \
from a current build."
)
})?;
info!(version, size = bytes.len(), "uploading new agent binary");
let js = async_nats::jetstream::new(client);
let store = js
.get_object_store(OBJECT_AGENT_RELEASES)
.await
.with_context(|| {
format!("object store '{OBJECT_AGENT_RELEASES}' missing — run `kanade jetstream setup`")
})?;
let mut cursor = std::io::Cursor::new(bytes);
let meta = store
.put(version.as_str(), &mut cursor)
.await
.context("object_store.put")?;
info!(version, digest = ?meta.digest, "agent binary uploaded");
println!("published: {version}");
println!(" object_store : {OBJECT_AGENT_RELEASES}/{version}");
println!();
println!("Next: target a scope with `kanade agent rollout`:");
println!(" kanade agent rollout {version} --group canary --jitter 5m # try on canary first");
println!(" kanade agent rollout {version} --global --jitter 30m # fleet-wide");
Ok(())
}
async fn rollout(client: async_nats::Client, args: RolloutArgs) -> Result<()> {
let (key, label) = match (args.global, args.group.as_deref(), args.pc.as_deref()) {
(true, None, None) => (KEY_AGENT_CONFIG_GLOBAL.to_string(), "global".to_string()),
(false, Some(g), None) => (agent_config_group_key(g), format!("group:{g}")),
(false, None, Some(p)) => (agent_config_pc_key(p), format!("pc:{p}")),
(false, None, None) => bail!(
"must pick a scope: --global / --group <name> / --pc <pc_id>. \
Refusing to rollout — explicit scope keeps a forgotten flag from \
fanning a release out to every agent."
),
_ => bail!("--global / --group / --pc are mutually exclusive"),
};
let js = async_nats::jetstream::new(client);
let store = js
.get_object_store(OBJECT_AGENT_RELEASES)
.await
.with_context(|| {
format!("object store '{OBJECT_AGENT_RELEASES}' missing — run `kanade jetstream setup`")
})?;
store.info(&args.version).await.with_context(|| {
format!(
"version '{}' not found in {OBJECT_AGENT_RELEASES} — \
run `kanade agent publish <binary>` first (the version is \
auto-extracted from the binary's VERSIONINFO)",
args.version
)
})?;
let kv = js
.get_key_value(BUCKET_AGENT_CONFIG)
.await
.with_context(|| {
format!("KV '{BUCKET_AGENT_CONFIG}' missing — run `kanade jetstream setup`")
})?;
let mut scope = match kv.get(&key).await? {
Some(b) => serde_json::from_slice::<ConfigScope>(&b)
.with_context(|| format!("decode existing {BUCKET_AGENT_CONFIG}.{key}"))?,
None => ConfigScope::default(),
};
scope.target_version = Some(args.version.clone());
if let Some(j) = args.jitter.as_deref() {
scope.target_version_jitter = Some(j.to_owned());
}
let payload = serde_json::to_vec(&scope).context("encode ConfigScope")?;
kv.put(key.as_str(), payload.into())
.await
.context("KV put scope ConfigScope")?;
info!(
scope = %label,
version = %args.version,
jitter = ?args.jitter,
"rollout: target_version flipped",
);
println!("rolled out: {} -> {}", label, args.version);
println!(
" kv : {BUCKET_AGENT_CONFIG}.{key}.target_version = {}",
args.version
);
if let Some(j) = args.jitter.as_deref() {
println!(" kv : {BUCKET_AGENT_CONFIG}.{key}.target_version_jitter = {j}");
} else {
println!(
" jitter : (unchanged — explicit `--jitter <duration>` recommended for fleets ≥ 100)"
);
}
Ok(())
}
async fn logs(client: async_nats::Client, pc_id: String, tail: u32) -> Result<()> {
let req = LogsRequest { tail_lines: tail };
let payload = serde_json::to_vec(&req).context("encode LogsRequest")?;
let reply = tokio::time::timeout(
std::time::Duration::from_secs(10),
client.request(subject::logs_fetch(&pc_id), payload.into()),
)
.await
.with_context(|| format!("timeout waiting for {pc_id} (10s)"))?
.with_context(|| format!("request logs.fetch.{pc_id}"))?;
use std::io::Write;
std::io::stdout().write_all(&reply.payload).ok();
Ok(())
}
async fn current(client: async_nats::Client) -> Result<()> {
let js = async_nats::jetstream::new(client);
let kv = js
.get_key_value(BUCKET_AGENT_CONFIG)
.await
.with_context(|| format!("KV '{BUCKET_AGENT_CONFIG}' missing"))?;
match kv.get(KEY_AGENT_CONFIG_GLOBAL).await? {
Some(b) => {
let scope: ConfigScope = serde_json::from_slice(&b)
.with_context(|| format!("decode {BUCKET_AGENT_CONFIG}.global"))?;
match scope.target_version {
Some(v) => println!("global.target_version = {v}"),
None => println!("global.target_version = (unset)"),
}
}
None => println!("global = (unset)"),
}
Ok(())
}