kanade_shared/bootstrap.rs
1//! Idempotent JetStream bootstrap (Sprint 6.x follow-up).
2//!
3//! Lists every NATS JetStream resource the kanade fleet expects —
4//! streams, KV buckets, Object Stores — and asks the broker to
5//! create-or-update them. v0.25.0 switched from `create_*` to
6//! `create_or_update_*`: the old form returned error 10058 ("name
7//! already in use with a different configuration") when a release
8//! widened a stream's subjects or changed its retention policy on
9//! a broker that still held the older config. With the new form the
10//! broker reconciles its definition to the one in this file, so
11//! version bumps no longer require operator-side data wipes.
12//!
13//! Centralising the list here means a future "we added a new
14//! bucket" change touches one place and both the operator CLI +
15//! the auto-bootstrap path pick it up.
16
17use std::time::Duration;
18
19use anyhow::{Context, Result};
20use async_nats::jetstream::{
21 self,
22 kv::Config as KvConfig,
23 object_store::Config as ObjectStoreConfig,
24 stream::{Config as StreamConfig, DiscardPolicy},
25};
26use tracing::{info, warn};
27
28use crate::kv::{
29 BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_FLEET_CONFIG,
30 BUCKET_JOBS, BUCKET_JOBS_YAML, BUCKET_NOTIFICATIONS_READ, BUCKET_SCHEDULES,
31 BUCKET_SCHEDULES_YAML, BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES,
32 OBJECT_APP_PACKAGES, OBJECT_RESULT_OUTPUT, OBJECT_SCRIPTS, STREAM_AUDIT, STREAM_EVENTS,
33 STREAM_EXEC, STREAM_INVENTORY, STREAM_NOTIFICATIONS, STREAM_OBS_EVENTS, STREAM_RESULTS,
34};
35
36/// Create-or-update an Object Store, but never let it wedge backend
37/// startup. `create_object_store` neither reconciles an existing
38/// store's config nor has a `create_or_update` form in async-nats
39/// 0.49, so a store whose desired config drifted — e.g. the #518
40/// `max_bytes` cap added after the bucket was first created uncapped,
41/// which the broker then rejects with error 10058 ("stream name
42/// already in use with a different configuration") — would otherwise
43/// fail `ensure_jetstream_resources` and crash the backend on boot
44/// (production outage 2026-06-11). Fall back to the existing store
45/// (uncapped, as it already was) and warn. #506 tracks real
46/// reconciliation of object-store config.
47async fn ensure_object_store(js: &jetstream::Context, cfg: ObjectStoreConfig) -> Result<()> {
48 let name = cfg.bucket.clone();
49 if let Err(e) = js.create_object_store(cfg).await {
50 // The fallback is deliberately broad — any create error is
51 // tolerated AS LONG AS the store already exists, because the
52 // alternative is a wedged backend and "never crash on boot"
53 // wins over "surface this specific error". The expected error
54 // is 10058 (config drift, the incident), but auth/network
55 // blips on an already-bootstrapped broker take this path too;
56 // they remain visible via the `warn!`. Only a genuine
57 // "can't create AND doesn't exist" is fatal.
58 if js.get_object_store(&name).await.is_err() {
59 return Err(e).with_context(|| {
60 format!("create_object_store {name} (and no existing store to fall back to)")
61 });
62 }
63 warn!(
64 store = %name, error = %e,
65 "object store exists with a different config; using it as-is (cap not reconciled)",
66 );
67 }
68 info!(store = %name, "ready");
69 Ok(())
70}
71
72/// Idempotently create every NATS JetStream resource the kanade
73/// fleet relies on. Calling repeatedly is safe — `create_*` returns
74/// the existing resource if it's already configured.
75///
76/// Returns once every resource is in place. The function is async
77/// so backends can `await` it as part of their startup sequence
78/// (one round-trip per resource — ~10 RTTs total).
79pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
80 // ── Streams ──────────────────────────────────────────────────
81 // #518: every stream carries a `max_bytes` cap with
82 // `Discard::Old` on top of its `max_age` window. Within their
83 // age windows the streams used to be unbounded by size, and
84 // JetStream's file store shares a disk with SQLite on the
85 // backend host — one job printing 200 KB per run fleet-wide
86 // could exhaust the store, at which point EVERY publish fails
87 // (results, obs, audit, KV puts). With the caps, worst-case
88 // degradation is "shorter history on the offending stream"
89 // instead of "broker down".
90 //
91 // Sizing: JetStream RESERVES each `max_bytes` against its
92 // available storage (min of max_file_store and free disk) at
93 // create/update time and fails with error 10047 when the sum
94 // doesn't fit, so these must stay small enough for modest
95 // hosts. That's fine: every stream here is a transport +
96 // replay buffer — the durable record is the backend's SQLite
97 // (results/inventory/obs/audit are all projected within
98 // seconds) — so the caps are runaway-output backstops, not
99 // history budgets. Total reservation ≈ 5.3 GiB including the
100 // result_output object store below.
101 const MIB: i64 = 1024 * 1024;
102 const GIB: i64 = 1024 * MIB;
103
104 // INVENTORY — 90-day rolling history (spec §2.3.1).
105 js.create_or_update_stream(StreamConfig {
106 name: STREAM_INVENTORY.into(),
107 subjects: vec!["inventory.>".into()],
108 max_age: Duration::from_secs(90 * 24 * 60 * 60),
109 max_bytes: GIB,
110 discard: DiscardPolicy::Old,
111 ..Default::default()
112 })
113 .await
114 .with_context(|| format!("create_or_update_stream {STREAM_INVENTORY}"))?;
115 info!(stream = STREAM_INVENTORY, "ready");
116
117 // RESULTS — 30-day rolling history. The biggest producer by
118 // far (every job run on every PC, with up to 256 KB of inline
119 // stdout/stderr per message), so it gets the largest slice of
120 // the disk budget.
121 js.create_or_update_stream(StreamConfig {
122 name: STREAM_RESULTS.into(),
123 subjects: vec!["results.>".into()],
124 max_age: Duration::from_secs(30 * 24 * 60 * 60),
125 max_bytes: 2 * GIB,
126 discard: DiscardPolicy::Old,
127 ..Default::default()
128 })
129 .await
130 .with_context(|| format!("create_or_update_stream {STREAM_RESULTS}"))?;
131 info!(stream = STREAM_RESULTS, "ready");
132
133 // EXEC — latest-per-subject only (spec §2.6 Layer 1). v0.22.1:
134 // catch the existing `commands.{all,group.X,pc.Y}` subjects so a
135 // single backend publish lands in BOTH the agent's live core
136 // subscription AND the stream's retention store. Reconnecting
137 // agents catch up via a durable consumer with
138 // `DeliverPolicy::LastPerSubject` — they receive the most
139 // recent Command per subject they care about, no matter how
140 // long they were offline (within `max_age`).
141 js.create_or_update_stream(StreamConfig {
142 name: STREAM_EXEC.into(),
143 subjects: vec!["commands.>".into()],
144 max_messages_per_subject: 1,
145 max_age: Duration::from_secs(7 * 24 * 60 * 60),
146 // Latest-per-subject keeps this tiny (one Command per
147 // group/pc subject); the cap is a backstop against subject
148 // cardinality bugs, not a working budget.
149 max_bytes: 64 * MIB,
150 discard: DiscardPolicy::Old,
151 ..Default::default()
152 })
153 .await
154 .with_context(|| format!("create_or_update_stream {STREAM_EXEC}"))?;
155 info!(stream = STREAM_EXEC, "ready");
156
157 // EVENTS — short-lived broadcast bus for kill / revoke / etc.
158 // 7-day window matches the EXEC spec window.
159 js.create_or_update_stream(StreamConfig {
160 name: STREAM_EVENTS.into(),
161 subjects: vec!["events.>".into()],
162 max_age: Duration::from_secs(7 * 24 * 60 * 60),
163 max_bytes: 256 * MIB,
164 discard: DiscardPolicy::Old,
165 ..Default::default()
166 })
167 .await
168 .with_context(|| format!("create_or_update_stream {STREAM_EVENTS}"))?;
169 info!(stream = STREAM_EVENTS, "ready");
170
171 // AUDIT — operator-action record (spec §2.3.1). The DURABLE
172 // copy is the backend's SQLite `audit_log` table (the projector
173 // INSERTs each message, idempotently since #501; 365-day
174 // retention since #486) — the stream is transport + replay
175 // buffer, not the archive, so it can be bounded like the rest.
176 // 90 days / 512 MiB is far more than the projector ever lags;
177 // previously this stream had NO limits at all, making it an
178 // unbounded disk leak on the broker host.
179 js.create_or_update_stream(StreamConfig {
180 name: STREAM_AUDIT.into(),
181 subjects: vec!["audit.>".into()],
182 max_age: Duration::from_secs(90 * 24 * 60 * 60),
183 max_bytes: 512 * MIB,
184 discard: DiscardPolicy::Old,
185 ..Default::default()
186 })
187 .await
188 .with_context(|| format!("create_or_update_stream {STREAM_AUDIT}"))?;
189 info!(stream = STREAM_AUDIT, "ready");
190
191 // OBS_EVENTS — per-PC observability timeline (Issue #246). The
192 // 90-day window matches `obs_events` table retention so a
193 // backend bootstrapping after long downtime can catch up but
194 // doesn't carry data the table will discard anyway. Subject
195 // filter `obs.>` catches every PC without a per-PC subscription.
196 //
197 // Days-to-seconds is spelt out once instead of `90 * 24 * 60 *
198 // 60` open-coded across bootstrap + cleanup; the matching prune
199 // window in `kanade-backend::cleanup` quotes the same number
200 // separately (SQLite-relative string syntax there, not a
201 // duration), so it can't share a constant — but a single
202 // arithmetic spell-out here makes the relationship grep-able.
203 const SECS_PER_DAY: u64 = 24 * 60 * 60;
204 const OBS_EVENTS_RETENTION_DAYS: u64 = 90;
205 js.create_or_update_stream(StreamConfig {
206 name: STREAM_OBS_EVENTS.into(),
207 subjects: vec!["obs.>".into()],
208 max_age: Duration::from_secs(OBS_EVENTS_RETENTION_DAYS * SECS_PER_DAY),
209 max_bytes: 512 * MIB,
210 discard: DiscardPolicy::Old,
211 ..Default::default()
212 })
213 .await
214 .with_context(|| format!("create_or_update_stream {STREAM_OBS_EVENTS}"))?;
215 info!(stream = STREAM_OBS_EVENTS, "ready");
216
217 // NOTIFICATIONS — end-user notification history (SPEC §2.3.1 /
218 // Phase E). 90-day window matches INVENTORY: a Client App that
219 // connects after a notification was sent fetches the missed ones
220 // via KLP `notifications.list`. Subject filter `notifications.>`
221 // catches every fan-out target (`all` / `group.X` / `pc.Y`) with
222 // one stream. Retains all messages per subject — each notification
223 // is its own history entry, not a latest-only state like EXEC.
224 // #518: 512 MiB cap + DiscardPolicy::Old, matching the other
225 // 90-day streams (AUDIT / OBS_EVENTS) — notification payloads are
226 // small, so this is generous headroom while still bounding the
227 // broker's disk lease.
228 js.create_or_update_stream(StreamConfig {
229 name: STREAM_NOTIFICATIONS.into(),
230 subjects: vec!["notifications.>".into()],
231 max_age: Duration::from_secs(90 * 24 * 60 * 60),
232 max_bytes: 512 * MIB,
233 discard: DiscardPolicy::Old,
234 ..Default::default()
235 })
236 .await
237 .with_context(|| format!("create_or_update_stream {STREAM_NOTIFICATIONS}"))?;
238 info!(stream = STREAM_NOTIFICATIONS, "ready");
239
240 // ── KV buckets ───────────────────────────────────────────────
241 // script_current — cmd_id → version (spec §2.6 Layer 2).
242 js.create_or_update_key_value(KvConfig {
243 bucket: BUCKET_SCRIPT_CURRENT.into(),
244 history: 5,
245 ..Default::default()
246 })
247 .await
248 .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_CURRENT}"))?;
249 info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
250
251 // script_status — cmd_id → ACTIVE / REVOKED.
252 js.create_or_update_key_value(KvConfig {
253 bucket: BUCKET_SCRIPT_STATUS.into(),
254 history: 5,
255 ..Default::default()
256 })
257 .await
258 .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_STATUS}"))?;
259 info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
260
261 // agents_state — pc_id → latest hw snapshot (history=1).
262 js.create_or_update_key_value(KvConfig {
263 bucket: BUCKET_AGENTS_STATE.into(),
264 history: 1,
265 ..Default::default()
266 })
267 .await
268 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENTS_STATE}"))?;
269 info!(bucket = BUCKET_AGENTS_STATE, "ready");
270
271 // agent_config — Sprint 6 layered scopes (global / groups.* /
272 // pcs.*) plus the legacy target_version key.
273 js.create_or_update_key_value(KvConfig {
274 bucket: BUCKET_AGENT_CONFIG.into(),
275 history: 5,
276 ..Default::default()
277 })
278 .await
279 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_CONFIG}"))?;
280 info!(bucket = BUCKET_AGENT_CONFIG, "ready");
281
282 // agent_groups — Sprint 5 per-pc group membership.
283 js.create_or_update_key_value(KvConfig {
284 bucket: BUCKET_AGENT_GROUPS.into(),
285 history: 5,
286 ..Default::default()
287 })
288 .await
289 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_GROUPS}"))?;
290 info!(bucket = BUCKET_AGENT_GROUPS, "ready");
291
292 // schedules — admin-API CRUD'd cron table (spec §2.5.3).
293 // Backend's scheduler.rs also creates this on startup; calling
294 // twice is harmless.
295 js.create_or_update_key_value(KvConfig {
296 bucket: BUCKET_SCHEDULES.into(),
297 history: 5,
298 ..Default::default()
299 })
300 .await
301 .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES}"))?;
302 info!(bucket = BUCKET_SCHEDULES, "ready");
303
304 // jobs — v0.15 operator-registered Manifest catalog. Schedules
305 // reference rows here by id; editing a job rewrites what future
306 // schedule fires exec.
307 js.create_or_update_key_value(KvConfig {
308 bucket: BUCKET_JOBS.into(),
309 history: 5,
310 ..Default::default()
311 })
312 .await
313 .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS}"))?;
314 info!(bucket = BUCKET_JOBS, "ready");
315
316 // fleet_config — #418 Phase 5 fleet-wide singletons (the global
317 // change-freeze under KEY_FREEZE). history: 1 — only the current
318 // state matters; both schedulers watch it.
319 js.create_or_update_key_value(KvConfig {
320 bucket: BUCKET_FLEET_CONFIG.into(),
321 history: 1,
322 ..Default::default()
323 })
324 .await
325 .with_context(|| format!("create_or_update_key_value {BUCKET_FLEET_CONFIG}"))?;
326 info!(bucket = BUCKET_FLEET_CONFIG, "ready");
327
328 // notifications_read — per-(pc, user, notification) read/ack state
329 // (SPEC §2.3.2 / Phase E). The agent writes here on KLP
330 // `notifications.ack`; `notifications.list` reads it back to filter
331 // the unread bucket. history: 1 — only the latest ack per key
332 // matters.
333 js.create_or_update_key_value(KvConfig {
334 bucket: BUCKET_NOTIFICATIONS_READ.into(),
335 history: 1,
336 ..Default::default()
337 })
338 .await
339 .with_context(|| format!("create_or_update_key_value {BUCKET_NOTIFICATIONS_READ}"))?;
340 info!(bucket = BUCKET_NOTIFICATIONS_READ, "ready");
341
342 // jobs_yaml / schedules_yaml — operator source-of-truth YAML
343 // alongside the JSON catalogs above. Same key shape (manifest id
344 // / schedule id), but the value is the raw YAML bytes so the
345 // SPA's YAML editor preserves comments + script block-scalar
346 // indentation across edits. Agents/scheduler don't read these.
347 js.create_or_update_key_value(KvConfig {
348 bucket: BUCKET_JOBS_YAML.into(),
349 history: 5,
350 ..Default::default()
351 })
352 .await
353 .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS_YAML}"))?;
354 info!(bucket = BUCKET_JOBS_YAML, "ready");
355
356 js.create_or_update_key_value(KvConfig {
357 bucket: BUCKET_SCHEDULES_YAML.into(),
358 history: 5,
359 ..Default::default()
360 })
361 .await
362 .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES_YAML}"))?;
363 info!(bucket = BUCKET_SCHEDULES_YAML, "ready");
364
365 // ── Object Store ─────────────────────────────────────────────
366 // agent_releases — one object per version, raw exe bytes.
367 ensure_object_store(
368 js,
369 ObjectStoreConfig {
370 bucket: OBJECT_AGENT_RELEASES.into(),
371 ..Default::default()
372 },
373 )
374 .await?;
375
376 // app_packages — generic operator-uploaded binary distribution
377 // (kanade-client today; third-party installers like Webex /
378 // Teams once those flows land). Object keys are
379 // `<name>/<version>`; see `kanade-shared::kv::OBJECT_APP_PACKAGES`
380 // for the full rationale.
381 ensure_object_store(
382 js,
383 ObjectStoreConfig {
384 bucket: OBJECT_APP_PACKAGES.into(),
385 ..Default::default()
386 },
387 )
388 .await?;
389
390 // scripts — manifest script bodies referenced by
391 // `Execute::script_object` (SPEC §2.4.1). Sibling of
392 // `app_packages`; see `kanade-shared::kv::OBJECT_SCRIPTS` for
393 // the bucket-split rationale (smaller payloads + manifest-
394 // coupled lifecycle vs operator-curated installers).
395 ensure_object_store(
396 js,
397 ObjectStoreConfig {
398 bucket: OBJECT_SCRIPTS.into(),
399 ..Default::default()
400 },
401 )
402 .await?;
403
404 // result_output — overflow stdout / stderr blobs for the
405 // `ExecResult` wire kind (#227). Anything larger than the agent's
406 // 256 KB inline threshold gets uploaded here under
407 // `<request_id>/{stdout,stderr}`; the backend's results
408 // projector derefs the pointer fields before INSERT so SQLite
409 // + the SPA see the full text inline. 30-day max_age matches
410 // STREAM_RESULTS so the lifetimes stay in lockstep — a row still
411 // resolvable in execution_results never points at a missing
412 // blob.
413 // #518: capped like the streams — a job whose output overflows
414 // the inline threshold writes blobs HERE instead of
415 // STREAM_RESULTS, so without its own cap this store bypasses
416 // the stream budget entirely and can still fill the file store.
417 // The projector derefs blobs within seconds of publish, so
418 // eviction only ever hits already-projected (or expired)
419 // output.
420 ensure_object_store(
421 js,
422 ObjectStoreConfig {
423 bucket: OBJECT_RESULT_OUTPUT.into(),
424 max_age: Duration::from_secs(SECS_PER_DAY * 30),
425 max_bytes: GIB,
426 ..Default::default()
427 },
428 )
429 .await?;
430
431 Ok(())
432}
433
434#[cfg(test)]
435mod tests {
436 use super::*;
437 use std::process::Stdio;
438
439 /// Throwaway `nats-server -js` on a random port, like the
440 /// kv_cas_live / offline_boot harnesses. Ignored tests only.
441 struct Broker {
442 js: jetstream::Context,
443 _server: tokio::process::Child,
444 _storage: tempfile::TempDir,
445 }
446
447 async fn spawn_broker() -> Broker {
448 let port = portpicker::pick_unused_port().expect("pick port");
449 let storage = tempfile::TempDir::new().expect("storage tempdir");
450 let server = tokio::process::Command::new("nats-server")
451 .arg("-js")
452 .arg("-p")
453 .arg(port.to_string())
454 .arg("-sd")
455 .arg(storage.path())
456 .stdout(Stdio::null())
457 .stderr(Stdio::null())
458 .kill_on_drop(true)
459 .spawn()
460 .expect("spawn nats-server (is it in PATH?)");
461 let url = format!("nats://127.0.0.1:{port}");
462 let mut client = None;
463 for _ in 0..50 {
464 if let Ok(c) = async_nats::connect(&url).await {
465 client = Some(c);
466 break;
467 }
468 tokio::time::sleep(Duration::from_millis(100)).await;
469 }
470 Broker {
471 js: jetstream::new(client.expect("nats-server did not come up in 5s")),
472 _server: server,
473 _storage: storage,
474 }
475 }
476
477 /// #506 / 2026-06-11 incident: `create_object_store` neither
478 /// reconciles config nor has a create-or-update form, so adding
479 /// the #518 `max_bytes` cap to a store first created uncapped made
480 /// the broker reject the create (error 10058 "name already in use
481 /// with a different configuration") and crashed the backend on
482 /// boot. `ensure_object_store` must instead accept the existing
483 /// store and let startup proceed.
484 #[tokio::test]
485 #[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
486 async fn ensure_object_store_accepts_config_drift() {
487 let b = spawn_broker().await;
488 // First create: uncapped, as the pre-#518 backend did.
489 ensure_object_store(
490 &b.js,
491 ObjectStoreConfig {
492 bucket: "result_output".into(),
493 ..Default::default()
494 },
495 )
496 .await
497 .expect("fresh create");
498
499 // Second create with a conflicting config (now capped) must
500 // NOT error — it accepts the existing store.
501 ensure_object_store(
502 &b.js,
503 ObjectStoreConfig {
504 bucket: "result_output".into(),
505 max_bytes: 1024 * 1024 * 1024,
506 ..Default::default()
507 },
508 )
509 .await
510 .expect("config drift must not wedge startup");
511
512 // The store is still usable.
513 let store = b.js.get_object_store("result_output").await.expect("store");
514 store
515 .put("k", &mut &b"hi"[..])
516 .await
517 .expect("put after drift");
518 }
519
520 /// A fresh create with a cap succeeds on a broker with room (the
521 /// normal first-boot path).
522 #[tokio::test]
523 #[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
524 async fn ensure_object_store_fresh_create_with_cap() {
525 let b = spawn_broker().await;
526 ensure_object_store(
527 &b.js,
528 ObjectStoreConfig {
529 bucket: "fresh".into(),
530 max_bytes: 64 * 1024 * 1024,
531 ..Default::default()
532 },
533 )
534 .await
535 .expect("fresh capped create");
536 b.js.get_object_store("fresh").await.expect("exists");
537 }
538
539 /// The fatal path: when create fails for a store that ALSO does
540 /// not exist, the error must propagate (we only swallow errors we
541 /// can fall back from). An invalid bucket name fails create's
542 /// charset validation and never creates a store to fall back to.
543 #[tokio::test]
544 #[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
545 async fn ensure_object_store_propagates_when_no_fallback() {
546 let b = spawn_broker().await;
547 let err = ensure_object_store(
548 &b.js,
549 ObjectStoreConfig {
550 // Spaces / '!' are rejected by the object-store name
551 // rules, so create fails and get also finds nothing.
552 bucket: "bad name!".into(),
553 ..Default::default()
554 },
555 )
556 .await
557 .expect_err("a create failure with no existing store must be fatal");
558 assert!(
559 err.to_string()
560 .contains("no existing store to fall back to"),
561 "unexpected error: {err:#}",
562 );
563 }
564}