use std::time::Duration;
use anyhow::Result;
use async_nats::jetstream::{
self,
kv::Config as KvConfig,
stream::{Config as StreamConfig, DiscardPolicy},
};
use clap::{Args, Subcommand};
use kanade_shared::kv::{
BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_SCRIPT_CURRENT,
BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, STREAM_AUDIT, STREAM_DEPLOY, STREAM_INVENTORY,
STREAM_RESULTS,
};
use tracing::info;
#[derive(Args, Debug)]
pub struct JetstreamArgs {
#[command(subcommand)]
pub sub: JetstreamSub,
}
#[derive(Subcommand, Debug)]
pub enum JetstreamSub {
Setup,
Status,
}
pub async fn execute(client: async_nats::Client, args: JetstreamArgs) -> Result<()> {
let js = jetstream::new(client);
match args.sub {
JetstreamSub::Setup => setup(js).await,
JetstreamSub::Status => status(js).await,
}
}
async fn setup(js: jetstream::Context) -> Result<()> {
js.create_stream(StreamConfig {
name: STREAM_INVENTORY.into(),
subjects: vec!["inventory.>".into()],
max_age: Duration::from_secs(90 * 24 * 60 * 60),
..Default::default()
})
.await?;
info!(stream = STREAM_INVENTORY, "ready");
js.create_stream(StreamConfig {
name: STREAM_RESULTS.into(),
subjects: vec!["results.>".into()],
max_age: Duration::from_secs(30 * 24 * 60 * 60),
..Default::default()
})
.await?;
info!(stream = STREAM_RESULTS, "ready");
js.create_stream(StreamConfig {
name: STREAM_DEPLOY.into(),
subjects: vec!["commands.deploy.>".into()],
max_messages_per_subject: 1,
discard: DiscardPolicy::Old,
max_age: Duration::from_secs(7 * 24 * 60 * 60),
..Default::default()
})
.await?;
info!(stream = STREAM_DEPLOY, "ready");
js.create_stream(StreamConfig {
name: STREAM_AUDIT.into(),
subjects: vec!["audit.>".into()],
..Default::default()
})
.await?;
info!(stream = STREAM_AUDIT, "ready");
js.create_key_value(KvConfig {
bucket: BUCKET_SCRIPT_CURRENT.into(),
history: 5,
..Default::default()
})
.await?;
info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
js.create_key_value(KvConfig {
bucket: BUCKET_SCRIPT_STATUS.into(),
history: 5,
..Default::default()
})
.await?;
info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
js.create_key_value(KvConfig {
bucket: BUCKET_AGENTS_STATE.into(),
history: 1,
..Default::default()
})
.await?;
info!(bucket = BUCKET_AGENTS_STATE, "ready");
js.create_key_value(KvConfig {
bucket: BUCKET_AGENT_CONFIG.into(),
history: 5,
..Default::default()
})
.await?;
info!(bucket = BUCKET_AGENT_CONFIG, "ready");
js.create_key_value(KvConfig {
bucket: BUCKET_AGENT_GROUPS.into(),
history: 5,
..Default::default()
})
.await?;
info!(bucket = BUCKET_AGENT_GROUPS, "ready");
js.create_object_store(async_nats::jetstream::object_store::Config {
bucket: OBJECT_AGENT_RELEASES.into(),
..Default::default()
})
.await?;
info!(object_store = OBJECT_AGENT_RELEASES, "ready");
println!("jetstream setup complete:");
println!(
" streams : {STREAM_INVENTORY}, {STREAM_RESULTS}, {STREAM_DEPLOY}, {STREAM_AUDIT}"
);
println!(
" KV : {BUCKET_SCRIPT_CURRENT}, {BUCKET_SCRIPT_STATUS}, {BUCKET_AGENTS_STATE}, {BUCKET_AGENT_CONFIG}"
);
println!(" object stores : {OBJECT_AGENT_RELEASES}");
Ok(())
}
async fn status(js: jetstream::Context) -> Result<()> {
println!("streams:");
for name in [
STREAM_INVENTORY,
STREAM_RESULTS,
STREAM_DEPLOY,
STREAM_AUDIT,
] {
match js.get_stream(name).await {
Ok(mut stream) => match stream.info().await {
Ok(info) => println!(
" {name}: messages={}, bytes={}",
info.state.messages, info.state.bytes
),
Err(e) => println!(" {name}: info error: {e}"),
},
Err(_) => println!(" {name}: NOT FOUND"),
}
}
println!("KV buckets:");
for bucket in [
BUCKET_SCRIPT_CURRENT,
BUCKET_SCRIPT_STATUS,
BUCKET_AGENTS_STATE,
BUCKET_AGENT_CONFIG,
] {
match js.get_key_value(bucket).await {
Ok(_) => println!(" {bucket}: OK"),
Err(_) => println!(" {bucket}: NOT FOUND"),
}
}
println!("object stores:");
for name in [OBJECT_AGENT_RELEASES] {
match js.get_object_store(name).await {
Ok(_) => println!(" {name}: OK"),
Err(_) => println!(" {name}: NOT FOUND"),
}
}
Ok(())
}