1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
//! `kanade app` — manage the generic app-package Object Store
//! (`OBJECT_APP_PACKAGES`, #207).
//!
//! Sibling of `kanade agent publish`: same NATS-direct shape (no
//! backend HTTP hop), different bucket. `agent publish` covers the
//! agent's own self-update binary, this covers everything else
//! operators install on endpoints — kanade-client, kanade-backend,
//! Webex / Teams / vendor MSIs, etc.
//!
//! See `kanade-shared::kv::OBJECT_APP_PACKAGES` for the bucket-
//! level design notes. Object key shape is `<name>/<version>`;
//! operator picks `<name>` once per package family and `<version>`
//! per release.
use std::path::PathBuf;
use anyhow::{Context, Result, bail};
use clap::{Args, Subcommand};
use futures::StreamExt;
use kanade_shared::kv::OBJECT_APP_PACKAGES;
use tracing::info;
use super::validate_segment;
#[derive(Args, Debug)]
pub struct AppArgs {
#[command(subcommand)]
pub sub: AppSub,
}
#[derive(Subcommand, Debug)]
pub enum AppSub {
/// Upload a binary / installer to the app_packages Object Store
/// under `<name>/<version>`. Mirrors `kanade agent publish` —
/// goes straight at NATS, no backend HTTP round-trip.
///
/// Operators pick `<name>` once per package family
/// (e.g. `kanade-client`, `kanade-backend`, `webex-meetings`).
/// `<version>` defaults to the binary's embedded VERSIONINFO
/// (same pelite extraction as `kanade agent publish`) — pass
/// `--version` to override (vendor MSIs / non-PE binaries need
/// the explicit label).
Publish {
/// Package family name. Slash-free, ASCII-printable; see
/// `kanade-backend::api::app_packages::validate_segment`
/// for the full set of restrictions the HTTP side enforces.
name: String,
/// Path to the binary to upload.
binary: PathBuf,
/// Version label. When omitted, extracted from the binary's
/// embedded VERSIONINFO (Windows PE built with `winres` —
/// every kanade-* binary qualifies). Required for binaries
/// without VERSIONINFO (most vendor installers) — fails fast
/// rather than silently uploading under an empty version.
#[arg(long)]
version: Option<String>,
},
/// List every `<name>/<version>` row in the bucket — size +
/// digest + last-modified.
List,
/// Delete a single package version. No-op + clear message when
/// the key isn't present (idempotent re-runs are fine).
Delete { name: String, version: String },
}
pub async fn execute(client: async_nats::Client, args: AppArgs) -> Result<()> {
match args.sub {
AppSub::Publish {
name,
binary,
version,
} => publish(client, name, binary, version).await,
AppSub::List => list(client).await,
AppSub::Delete { name, version } => delete(client, name, version).await,
}
}
async fn publish(
client: async_nats::Client,
name: String,
binary: PathBuf,
version: Option<String>,
) -> Result<()> {
validate_segment("name", &name)?;
// #261: default version to the binary's embedded VERSIONINFO —
// same pelite extractor that `kanade agent publish` uses. Avoids
// the operator typing the version twice (once in the build, once
// here) for kanade-* binaries. Explicit `--version` overrides for
// non-PE inputs (MSIs, scripts) where extraction returns None.
//
// The slurp below buffers the whole binary in RAM (pelite needs a
// contiguous `&[u8]`, and we read the full file rather than just
// the PE header for the same reason `kanade agent publish` does).
// The streaming upload path below benefits separately — it doesn't
// help here, so the RSS spike during extraction is real.
//
// In practice this is bounded: every binary that actually carries
// a VERSIONINFO (i.e. takes the slurp path at all) is one of the
// `winres`-built kanade-* binaries — those are tens of MB. Vendor
// MSIs that hit the 256 MB ceiling never reach the slurp branch
// because extraction would return None — the operator MUST pass
// `--version` for them, and the explicit-flag path skips the read.
// If a future package family lands that's both large AND
// VERSIONINFO-tagged, switch this to memmap2 (Gemini #263 MED).
let resolved_version = match version {
Some(v) => v,
None => {
let bytes = tokio::fs::read(&binary)
.await
.with_context(|| format!("read {binary:?}"))?;
match kanade_shared::exe_version::extract_pe_version(&bytes) {
Some(v) => v,
// #270: extraction failed. Interactive shell → prompt for
// the label inline; pipe / CI → fail fast with the same
// guidance the original path emitted.
None => match super::prompt_version_if_interactive(binary.clone()).await? {
Some(v) => v,
None => bail!(
"no --version given and couldn't extract VERSIONINFO from {binary:?} \
(Windows PE built with `winres`? otherwise pass `--version <label>`)"
),
},
}
}
};
validate_segment("version", &resolved_version)?;
let version = resolved_version;
// Stream from disk instead of slurping (Gemini #222 MED).
// App packages can hit 256 MB — buffering the whole binary
// would peak the CLI's RSS unnecessarily, and `Object Store::put`
// already takes `&mut impl AsyncRead`, so streaming is the
// natural shape.
let mut file = tokio::fs::File::open(&binary)
.await
.with_context(|| format!("open {binary:?}"))?;
info!(name, version, "uploading app package");
let js = async_nats::jetstream::new(client.clone());
let store = js
.get_object_store(OBJECT_APP_PACKAGES)
.await
.with_context(|| {
format!("object store '{OBJECT_APP_PACKAGES}' missing — run `kanade jetstream setup`")
})?;
let key = format!("{name}/{version}");
let meta = store
.put(key.as_str(), &mut file)
.await
.context("object_store.put")?;
info!(name, version, size = meta.size, digest = ?meta.digest, "app package uploaded");
// #277: a downstream `store.get(key)` (e.g. backend serving
// /api/app-packages/...) can read stale / partial bytes for ~30-
// 180 s after `put` returns on at least single-node JetStream.
// Block on a read-back hash check here so the operator only sees
// "published" once the bytes are actually consumable; without it,
// a `kanade exec install-kanade-backend --pcs ...` fired
// immediately after publish 50/50 fails with a sha-mismatch on
// the agent side.
super::publish_verify::verify_readback(&store, key.as_str(), meta.digest.as_deref(), meta.size)
.await
.context("publish read-back verify")?;
println!("published: {key}");
println!(" object_store : {OBJECT_APP_PACKAGES}/{key}");
println!(" size : {} bytes", meta.size);
if let Some(d) = meta.digest.as_deref() {
println!(" digest : {d}");
}
crate::audit::record(
&client,
"app_package_publish",
Some(&key),
serde_json::json!({ "size": meta.size, "digest": meta.digest }),
)
.await;
Ok(())
}
async fn list(client: async_nats::Client) -> Result<()> {
let js = async_nats::jetstream::new(client);
let store = js
.get_object_store(OBJECT_APP_PACKAGES)
.await
.with_context(|| {
format!("object store '{OBJECT_APP_PACKAGES}' missing — run `kanade jetstream setup`")
})?;
let mut list = store.list().await.context("object_store.list")?;
let mut rows: Vec<Row> = Vec::new();
while let Some(item) = list.next().await {
let meta = item.context("list app packages")?;
rows.push(Row {
key: meta.name,
size: meta.size,
digest: meta.digest,
modified: meta
.modified
.and_then(|t| chrono::DateTime::from_timestamp(t.unix_timestamp(), t.nanosecond()))
.map(|d| d.to_rfc3339()),
});
}
rows.sort_by(|a, b| a.key.cmp(&b.key));
if rows.is_empty() {
println!("(no app packages)");
return Ok(());
}
for row in rows {
let dgst = row.digest.as_deref().unwrap_or("—");
let modt = row.modified.as_deref().unwrap_or("—");
// TSV: `<key>\t<size>\t<modified>\t<digest>` — shell-pipe
// friendly. SPA Apps page (#218) renders the same fields.
println!("{}\t{}\t{}\t{}", row.key, row.size, modt, dgst);
}
Ok(())
}
struct Row {
key: String,
size: usize,
digest: Option<String>,
modified: Option<String>,
}
async fn delete(client: async_nats::Client, name: String, version: String) -> Result<()> {
validate_segment("name", &name)?;
validate_segment("version", &version)?;
let js = async_nats::jetstream::new(client.clone());
let store = js
.get_object_store(OBJECT_APP_PACKAGES)
.await
.with_context(|| {
format!("object store '{OBJECT_APP_PACKAGES}' missing — run `kanade jetstream setup`")
})?;
let key = format!("{name}/{version}");
match store.delete(key.as_str()).await {
Ok(()) => {
info!(%key, "app package deleted");
println!("deleted: {key}");
crate::audit::record(
&client,
"app_package_delete",
Some(&key),
serde_json::json!({}),
)
.await;
Ok(())
}
Err(e) => {
let msg = e.to_string();
if msg.contains("not found") || msg.contains("no objects") {
println!("not present: {key} (idempotent no-op)");
Ok(())
} else {
Err(e).with_context(|| format!("object_store.delete {key}"))
}
}
}
}