use std::path::PathBuf;
use anyhow::{Context, Result, bail};
use clap::{Args, Subcommand};
use futures::StreamExt;
use kanade_shared::kv::OBJECT_APP_PACKAGES;
use tracing::info;
#[derive(Args, Debug)]
pub struct AppArgs {
#[command(subcommand)]
pub sub: AppSub,
}
#[derive(Subcommand, Debug)]
pub enum AppSub {
Publish {
name: String,
binary: PathBuf,
#[arg(long)]
version: Option<String>,
},
List,
Delete { name: String, version: String },
}
pub async fn execute(client: async_nats::Client, args: AppArgs) -> Result<()> {
match args.sub {
AppSub::Publish {
name,
binary,
version,
} => publish(client, name, binary, version).await,
AppSub::List => list(client).await,
AppSub::Delete { name, version } => delete(client, name, version).await,
}
}
fn validate_segment(label: &str, value: &str) -> Result<()> {
if value.is_empty() {
bail!("{label} must be non-empty");
}
if value.contains('/') {
bail!("{label} must not contain '/'");
}
for c in value.chars() {
if !c.is_ascii() {
bail!("{label} must be ASCII-printable (rejected non-ASCII {c:?})");
}
if c.is_ascii_control() {
bail!("{label} must not contain control characters");
}
if c == '"' || c == '\\' {
bail!("{label} must not contain '\"' or '\\\\'");
}
}
Ok(())
}
async fn publish(
client: async_nats::Client,
name: String,
binary: PathBuf,
version: Option<String>,
) -> Result<()> {
validate_segment("name", &name)?;
let resolved_version = match version {
Some(v) => v,
None => {
let bytes = tokio::fs::read(&binary)
.await
.with_context(|| format!("read {binary:?}"))?;
kanade_shared::exe_version::extract_pe_version(&bytes).with_context(|| {
format!(
"no --version given and couldn't extract VERSIONINFO from {binary:?} \
(Windows PE built with `winres`? otherwise pass `--version <label>`)"
)
})?
}
};
validate_segment("version", &resolved_version)?;
let version = resolved_version;
let mut file = tokio::fs::File::open(&binary)
.await
.with_context(|| format!("open {binary:?}"))?;
info!(name, version, "uploading app package");
let js = async_nats::jetstream::new(client);
let store = js
.get_object_store(OBJECT_APP_PACKAGES)
.await
.with_context(|| {
format!("object store '{OBJECT_APP_PACKAGES}' missing — run `kanade jetstream setup`")
})?;
let key = format!("{name}/{version}");
let meta = store
.put(key.as_str(), &mut file)
.await
.context("object_store.put")?;
info!(name, version, size = meta.size, digest = ?meta.digest, "app package uploaded");
super::publish_verify::verify_readback(&store, key.as_str(), meta.digest.as_deref(), meta.size)
.await
.context("publish read-back verify")?;
println!("published: {key}");
println!(" object_store : {OBJECT_APP_PACKAGES}/{key}");
println!(" size : {} bytes", meta.size);
if let Some(d) = meta.digest.as_deref() {
println!(" digest : {d}");
}
Ok(())
}
async fn list(client: async_nats::Client) -> Result<()> {
let js = async_nats::jetstream::new(client);
let store = js
.get_object_store(OBJECT_APP_PACKAGES)
.await
.with_context(|| {
format!("object store '{OBJECT_APP_PACKAGES}' missing — run `kanade jetstream setup`")
})?;
let mut list = store.list().await.context("object_store.list")?;
let mut rows: Vec<Row> = Vec::new();
while let Some(item) = list.next().await {
let meta = item.context("list app packages")?;
rows.push(Row {
key: meta.name,
size: meta.size,
digest: meta.digest,
modified: meta
.modified
.and_then(|t| chrono::DateTime::from_timestamp(t.unix_timestamp(), t.nanosecond()))
.map(|d| d.to_rfc3339()),
});
}
rows.sort_by(|a, b| a.key.cmp(&b.key));
if rows.is_empty() {
println!("(no app packages)");
return Ok(());
}
for row in rows {
let dgst = row.digest.as_deref().unwrap_or("—");
let modt = row.modified.as_deref().unwrap_or("—");
println!("{}\t{}\t{}\t{}", row.key, row.size, modt, dgst);
}
Ok(())
}
struct Row {
key: String,
size: usize,
digest: Option<String>,
modified: Option<String>,
}
async fn delete(client: async_nats::Client, name: String, version: String) -> Result<()> {
validate_segment("name", &name)?;
validate_segment("version", &version)?;
let js = async_nats::jetstream::new(client);
let store = js
.get_object_store(OBJECT_APP_PACKAGES)
.await
.with_context(|| {
format!("object store '{OBJECT_APP_PACKAGES}' missing — run `kanade jetstream setup`")
})?;
let key = format!("{name}/{version}");
match store.delete(key.as_str()).await {
Ok(()) => {
info!(%key, "app package deleted");
println!("deleted: {key}");
Ok(())
}
Err(e) => {
let msg = e.to_string();
if msg.contains("not found") || msg.contains("no objects") {
println!("not present: {key} (idempotent no-op)");
Ok(())
} else {
Err(e).with_context(|| format!("object_store.delete {key}"))
}
}
}
}