use std::path::PathBuf;
use anyhow::{Context, Result};
use clap::{Args, Subcommand};
use kanade_shared::kv::{
BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, KEY_AGENT_CONFIG_GLOBAL, OBJECT_AGENT_RELEASES,
};
use kanade_shared::wire::{AgentGroups, ConfigScope};
use tokio::fs;
use tracing::info;
#[derive(Args, Debug)]
pub struct AgentArgs {
#[command(subcommand)]
pub sub: AgentSub,
}
#[derive(Subcommand, Debug)]
pub enum AgentSub {
Publish {
binary: PathBuf,
#[arg(long)]
version: String,
},
Current,
Groups(GroupsArgs),
}
#[derive(Args, Debug)]
pub struct GroupsArgs {
#[command(subcommand)]
pub sub: GroupsSub,
}
#[derive(Subcommand, Debug)]
pub enum GroupsSub {
List { pc_id: String },
Add { pc_id: String, group: String },
Rm { pc_id: String, group: String },
Set {
pc_id: String,
#[arg(trailing_var_arg = true)]
groups: Vec<String>,
},
}
pub async fn execute(client: async_nats::Client, args: AgentArgs) -> Result<()> {
match args.sub {
AgentSub::Publish { binary, version } => publish(client, binary, version).await,
AgentSub::Current => current(client).await,
AgentSub::Groups(g) => groups(client, g).await,
}
}
async fn publish(client: async_nats::Client, binary: PathBuf, version: String) -> Result<()> {
let bytes = fs::read(&binary)
.await
.with_context(|| format!("read {binary:?}"))?;
info!(version, size = bytes.len(), "uploading new agent binary");
let js = async_nats::jetstream::new(client);
let store = js
.get_object_store(OBJECT_AGENT_RELEASES)
.await
.with_context(|| {
format!("object store '{OBJECT_AGENT_RELEASES}' missing — run `kanade jetstream setup`")
})?;
let mut cursor = std::io::Cursor::new(bytes);
let meta = store
.put(version.as_str(), &mut cursor)
.await
.context("object_store.put")?;
info!(version, digest = ?meta.digest, "agent binary uploaded");
let kv = js
.get_key_value(BUCKET_AGENT_CONFIG)
.await
.with_context(|| {
format!("KV '{BUCKET_AGENT_CONFIG}' missing — run `kanade jetstream setup`")
})?;
let mut global = match kv.get(KEY_AGENT_CONFIG_GLOBAL).await? {
Some(b) => serde_json::from_slice::<ConfigScope>(&b)
.with_context(|| format!("decode existing {BUCKET_AGENT_CONFIG}.global"))?,
None => ConfigScope::default(),
};
global.target_version = Some(version.clone());
let payload = serde_json::to_vec(&global).context("encode global ConfigScope")?;
kv.put(KEY_AGENT_CONFIG_GLOBAL, payload.into())
.await
.context("KV put global ConfigScope")?;
info!(version, "broadcast agent_config.global.target_version");
println!("published: {version}");
println!(" object_store : {OBJECT_AGENT_RELEASES}/{version}");
println!(
" kv : {BUCKET_AGENT_CONFIG}.{KEY_AGENT_CONFIG_GLOBAL}.target_version = {version}"
);
Ok(())
}
async fn current(client: async_nats::Client) -> Result<()> {
let js = async_nats::jetstream::new(client);
let kv = js
.get_key_value(BUCKET_AGENT_CONFIG)
.await
.with_context(|| format!("KV '{BUCKET_AGENT_CONFIG}' missing"))?;
match kv.get(KEY_AGENT_CONFIG_GLOBAL).await? {
Some(b) => {
let scope: ConfigScope = serde_json::from_slice(&b)
.with_context(|| format!("decode {BUCKET_AGENT_CONFIG}.global"))?;
match scope.target_version {
Some(v) => println!("global.target_version = {v}"),
None => println!("global.target_version = (unset)"),
}
}
None => println!("global = (unset)"),
}
Ok(())
}
async fn groups(client: async_nats::Client, args: GroupsArgs) -> Result<()> {
let js = async_nats::jetstream::new(client);
let kv = js
.get_key_value(BUCKET_AGENT_GROUPS)
.await
.with_context(|| {
format!("KV '{BUCKET_AGENT_GROUPS}' missing — run `kanade jetstream setup`")
})?;
match args.sub {
GroupsSub::List { pc_id } => {
let g = read_groups(&kv, &pc_id).await?;
if g.is_empty() {
println!("{pc_id}: (no groups)");
} else {
println!("{pc_id}: {}", g.groups.join(", "));
}
}
GroupsSub::Add { pc_id, group } => {
let mut g = read_groups(&kv, &pc_id).await?;
if g.insert(&group) {
write_groups(&kv, &pc_id, &g).await?;
println!("{pc_id}: added '{group}' -> [{}]", g.groups.join(", "));
} else {
println!("{pc_id}: already has '{group}' (no change)");
}
}
GroupsSub::Rm { pc_id, group } => {
let mut g = read_groups(&kv, &pc_id).await?;
if g.remove(&group) {
write_groups(&kv, &pc_id, &g).await?;
let after = if g.is_empty() {
"(no groups)".to_string()
} else {
g.groups.join(", ")
};
println!("{pc_id}: removed '{group}' -> [{after}]");
} else {
println!("{pc_id}: not a member of '{group}' (no change)");
}
}
GroupsSub::Set { pc_id, groups } => {
let normalised = AgentGroups::new(groups);
write_groups(&kv, &pc_id, &normalised).await?;
if normalised.is_empty() {
println!("{pc_id}: cleared all groups");
} else {
println!(
"{pc_id}: set membership to [{}]",
normalised.groups.join(", ")
);
}
}
}
Ok(())
}
async fn read_groups(kv: &async_nats::jetstream::kv::Store, pc_id: &str) -> Result<AgentGroups> {
match kv.get(pc_id).await.context("kv get")? {
Some(bytes) => serde_json::from_slice(&bytes).context("decode agent_groups"),
None => Ok(AgentGroups::default()),
}
}
async fn write_groups(
kv: &async_nats::jetstream::kv::Store,
pc_id: &str,
groups: &AgentGroups,
) -> Result<()> {
let bytes = serde_json::to_vec(groups).context("encode agent_groups")?;
kv.put(pc_id, bytes.into()).await.context("kv put")?;
info!(pc_id, groups = ?groups.groups, "agent_groups updated");
Ok(())
}