use anyhow::{Result, anyhow, bail};
use async_nats::jetstream;
use clap::{Args, Subcommand, ValueEnum};
use kanade_shared::bootstrap::ensure_jetstream_resources;
use kanade_shared::kv::{
BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_JOBS, BUCKET_SCHEDULES,
BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, STREAM_AUDIT,
STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY, STREAM_RESULTS,
};
#[derive(Args, Debug)]
pub struct JetstreamArgs {
#[command(subcommand)]
pub sub: JetstreamSub,
}
#[derive(Subcommand, Debug)]
pub enum JetstreamSub {
Setup,
Status,
Delete(DeleteArgs),
Reset(ResetArgs),
}
#[derive(Args, Debug)]
pub struct DeleteArgs {
#[arg(value_enum)]
pub kind: ResourceKind,
pub name: String,
#[arg(long)]
pub yes: bool,
}
#[derive(Args, Debug)]
pub struct ResetArgs {
#[arg(long)]
pub yes: bool,
}
#[derive(ValueEnum, Clone, Copy, Debug)]
pub enum ResourceKind {
Stream,
Bucket,
Store,
}
pub async fn execute(client: async_nats::Client, args: JetstreamArgs) -> Result<()> {
let js = jetstream::new(client);
match args.sub {
JetstreamSub::Setup => setup(js).await,
JetstreamSub::Status => status(js).await,
JetstreamSub::Delete(d) => delete(js, d).await,
JetstreamSub::Reset(r) => reset(js, r).await,
}
}
async fn setup(js: jetstream::Context) -> Result<()> {
ensure_jetstream_resources(&js).await?;
println!("jetstream setup complete:");
println!(
" streams : {STREAM_INVENTORY}, {STREAM_RESULTS}, {STREAM_EXEC}, {STREAM_EVENTS}, {STREAM_AUDIT}"
);
println!(
" KV : {BUCKET_SCRIPT_CURRENT}, {BUCKET_SCRIPT_STATUS}, {BUCKET_AGENTS_STATE}, {BUCKET_AGENT_CONFIG}, {BUCKET_AGENT_GROUPS}, {BUCKET_SCHEDULES}, {BUCKET_JOBS}"
);
println!(" object stores : {OBJECT_AGENT_RELEASES}");
Ok(())
}
const ALL_STREAMS: &[&str] = &[
STREAM_INVENTORY,
STREAM_RESULTS,
STREAM_EXEC,
STREAM_EVENTS,
STREAM_AUDIT,
];
const ALL_BUCKETS: &[&str] = &[
BUCKET_SCRIPT_CURRENT,
BUCKET_SCRIPT_STATUS,
BUCKET_AGENTS_STATE,
BUCKET_AGENT_CONFIG,
BUCKET_AGENT_GROUPS,
BUCKET_SCHEDULES,
BUCKET_JOBS,
];
const ALL_STORES: &[&str] = &[OBJECT_AGENT_RELEASES];
async fn status(js: jetstream::Context) -> Result<()> {
println!("streams:");
for name in ALL_STREAMS {
match js.get_stream(*name).await {
Ok(mut stream) => match stream.info().await {
Ok(info) => println!(
" {name}: messages={}, bytes={}",
info.state.messages, info.state.bytes
),
Err(e) => println!(" {name}: info error: {e}"),
},
Err(_) => println!(" {name}: NOT FOUND"),
}
}
println!("KV buckets:");
for bucket in ALL_BUCKETS {
match js.get_key_value(*bucket).await {
Ok(_) => println!(" {bucket}: OK"),
Err(_) => println!(" {bucket}: NOT FOUND"),
}
}
println!("object stores:");
for name in ALL_STORES {
match js.get_object_store(*name).await {
Ok(_) => println!(" {name}: OK"),
Err(_) => println!(" {name}: NOT FOUND"),
}
}
Ok(())
}
async fn delete(js: jetstream::Context, args: DeleteArgs) -> Result<()> {
let kind_label = match args.kind {
ResourceKind::Stream => "stream",
ResourceKind::Bucket => "KV bucket",
ResourceKind::Store => "object store",
};
if !args.yes {
println!(
"would delete {kind_label} {name} (re-run with --yes)",
name = args.name
);
bail!("--yes not supplied");
}
match args.kind {
ResourceKind::Stream => {
js.delete_stream(&args.name)
.await
.map_err(|e| anyhow!("delete_stream {}: {e}", args.name))?;
}
ResourceKind::Bucket => {
js.delete_key_value(&args.name)
.await
.map_err(|e| anyhow!("delete_key_value {}: {e}", args.name))?;
}
ResourceKind::Store => {
js.delete_object_store(&args.name)
.await
.map_err(|e| anyhow!("delete_object_store {}: {e}", args.name))?;
}
}
println!("deleted {kind_label} {}", args.name);
Ok(())
}
async fn reset(js: jetstream::Context, args: ResetArgs) -> Result<()> {
if !args.yes {
println!("would delete the following resources and re-bootstrap (re-run with --yes):");
println!(" streams : {}", ALL_STREAMS.join(", "));
println!(" KV : {}", ALL_BUCKETS.join(", "));
println!(" object stores : {}", ALL_STORES.join(", "));
bail!("--yes not supplied");
}
for name in ALL_STREAMS {
match js.delete_stream(*name).await {
Ok(_) => println!("deleted stream {name}"),
Err(e) => println!("skip stream {name}: {e}"),
}
}
for bucket in ALL_BUCKETS {
match js.delete_key_value(*bucket).await {
Ok(_) => println!("deleted bucket {bucket}"),
Err(e) => println!("skip bucket {bucket}: {e}"),
}
}
for name in ALL_STORES {
match js.delete_object_store(*name).await {
Ok(_) => println!("deleted store {name}"),
Err(e) => println!("skip store {name}: {e}"),
}
}
println!("re-bootstrapping...");
ensure_jetstream_resources(&js).await?;
println!("reset complete.");
Ok(())
}