Skip to main content

kanade_shared/
bootstrap.rs

1//! Idempotent JetStream bootstrap (Sprint 6.x follow-up).
2//!
3//! Lists every NATS JetStream resource the kanade fleet expects —
4//! streams, KV buckets, Object Stores — and asks the broker to
5//! create-or-update them. v0.25.0 switched from `create_*` to
6//! `create_or_update_*`: the old form returned error 10058 ("name
7//! already in use with a different configuration") when a release
8//! widened a stream's subjects or changed its retention policy on
9//! a broker that still held the older config. With the new form the
10//! broker reconciles its definition to the one in this file, so
11//! version bumps no longer require operator-side data wipes.
12//!
13//! Centralising the list here means a future "we added a new
14//! bucket" change touches one place and both the operator CLI +
15//! the auto-bootstrap path pick it up.
16
17use std::time::Duration;
18
19use anyhow::{Context, Result};
20use async_nats::jetstream::{
21    self,
22    kv::Config as KvConfig,
23    object_store::Config as ObjectStoreConfig,
24    stream::{Config as StreamConfig, DiscardPolicy},
25};
26use tracing::{info, warn};
27
28use crate::kv::{
29    BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_FLEET_CONFIG,
30    BUCKET_JOBS, BUCKET_JOBS_YAML, BUCKET_SCHEDULES, BUCKET_SCHEDULES_YAML, BUCKET_SCRIPT_CURRENT,
31    BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, OBJECT_APP_PACKAGES, OBJECT_RESULT_OUTPUT,
32    OBJECT_SCRIPTS, STREAM_AUDIT, STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY, STREAM_OBS_EVENTS,
33    STREAM_RESULTS,
34};
35
36/// Create-or-update an Object Store, but never let it wedge backend
37/// startup. `create_object_store` neither reconciles an existing
38/// store's config nor has a `create_or_update` form in async-nats
39/// 0.49, so a store whose desired config drifted — e.g. the #518
40/// `max_bytes` cap added after the bucket was first created uncapped,
41/// which the broker then rejects with error 10058 ("stream name
42/// already in use with a different configuration") — would otherwise
43/// fail `ensure_jetstream_resources` and crash the backend on boot
44/// (production outage 2026-06-11). Fall back to the existing store
45/// (uncapped, as it already was) and warn. #506 tracks real
46/// reconciliation of object-store config.
47async fn ensure_object_store(js: &jetstream::Context, cfg: ObjectStoreConfig) -> Result<()> {
48    let name = cfg.bucket.clone();
49    if let Err(e) = js.create_object_store(cfg).await {
50        // The fallback is deliberately broad — any create error is
51        // tolerated AS LONG AS the store already exists, because the
52        // alternative is a wedged backend and "never crash on boot"
53        // wins over "surface this specific error". The expected error
54        // is 10058 (config drift, the incident), but auth/network
55        // blips on an already-bootstrapped broker take this path too;
56        // they remain visible via the `warn!`. Only a genuine
57        // "can't create AND doesn't exist" is fatal.
58        if js.get_object_store(&name).await.is_err() {
59            return Err(e).with_context(|| {
60                format!("create_object_store {name} (and no existing store to fall back to)")
61            });
62        }
63        warn!(
64            store = %name, error = %e,
65            "object store exists with a different config; using it as-is (cap not reconciled)",
66        );
67    }
68    info!(store = %name, "ready");
69    Ok(())
70}
71
72/// Idempotently create every NATS JetStream resource the kanade
73/// fleet relies on. Calling repeatedly is safe — `create_*` returns
74/// the existing resource if it's already configured.
75///
76/// Returns once every resource is in place. The function is async
77/// so backends can `await` it as part of their startup sequence
78/// (one round-trip per resource — ~10 RTTs total).
79pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
80    // ── Streams ──────────────────────────────────────────────────
81    // #518: every stream carries a `max_bytes` cap with
82    // `Discard::Old` on top of its `max_age` window. Within their
83    // age windows the streams used to be unbounded by size, and
84    // JetStream's file store shares a disk with SQLite on the
85    // backend host — one job printing 200 KB per run fleet-wide
86    // could exhaust the store, at which point EVERY publish fails
87    // (results, obs, audit, KV puts). With the caps, worst-case
88    // degradation is "shorter history on the offending stream"
89    // instead of "broker down".
90    //
91    // Sizing: JetStream RESERVES each `max_bytes` against its
92    // available storage (min of max_file_store and free disk) at
93    // create/update time and fails with error 10047 when the sum
94    // doesn't fit, so these must stay small enough for modest
95    // hosts. That's fine: every stream here is a transport +
96    // replay buffer — the durable record is the backend's SQLite
97    // (results/inventory/obs/audit are all projected within
98    // seconds) — so the caps are runaway-output backstops, not
99    // history budgets. Total reservation ≈ 5.3 GiB including the
100    // result_output object store below.
101    const MIB: i64 = 1024 * 1024;
102    const GIB: i64 = 1024 * MIB;
103
104    // INVENTORY — 90-day rolling history (spec §2.3.1).
105    js.create_or_update_stream(StreamConfig {
106        name: STREAM_INVENTORY.into(),
107        subjects: vec!["inventory.>".into()],
108        max_age: Duration::from_secs(90 * 24 * 60 * 60),
109        max_bytes: GIB,
110        discard: DiscardPolicy::Old,
111        ..Default::default()
112    })
113    .await
114    .with_context(|| format!("create_or_update_stream {STREAM_INVENTORY}"))?;
115    info!(stream = STREAM_INVENTORY, "ready");
116
117    // RESULTS — 30-day rolling history. The biggest producer by
118    // far (every job run on every PC, with up to 256 KB of inline
119    // stdout/stderr per message), so it gets the largest slice of
120    // the disk budget.
121    js.create_or_update_stream(StreamConfig {
122        name: STREAM_RESULTS.into(),
123        subjects: vec!["results.>".into()],
124        max_age: Duration::from_secs(30 * 24 * 60 * 60),
125        max_bytes: 2 * GIB,
126        discard: DiscardPolicy::Old,
127        ..Default::default()
128    })
129    .await
130    .with_context(|| format!("create_or_update_stream {STREAM_RESULTS}"))?;
131    info!(stream = STREAM_RESULTS, "ready");
132
133    // EXEC — latest-per-subject only (spec §2.6 Layer 1). v0.22.1:
134    // catch the existing `commands.{all,group.X,pc.Y}` subjects so a
135    // single backend publish lands in BOTH the agent's live core
136    // subscription AND the stream's retention store. Reconnecting
137    // agents catch up via a durable consumer with
138    // `DeliverPolicy::LastPerSubject` — they receive the most
139    // recent Command per subject they care about, no matter how
140    // long they were offline (within `max_age`).
141    js.create_or_update_stream(StreamConfig {
142        name: STREAM_EXEC.into(),
143        subjects: vec!["commands.>".into()],
144        max_messages_per_subject: 1,
145        max_age: Duration::from_secs(7 * 24 * 60 * 60),
146        // Latest-per-subject keeps this tiny (one Command per
147        // group/pc subject); the cap is a backstop against subject
148        // cardinality bugs, not a working budget.
149        max_bytes: 64 * MIB,
150        discard: DiscardPolicy::Old,
151        ..Default::default()
152    })
153    .await
154    .with_context(|| format!("create_or_update_stream {STREAM_EXEC}"))?;
155    info!(stream = STREAM_EXEC, "ready");
156
157    // EVENTS — short-lived broadcast bus for kill / revoke / etc.
158    // 7-day window matches the EXEC spec window.
159    js.create_or_update_stream(StreamConfig {
160        name: STREAM_EVENTS.into(),
161        subjects: vec!["events.>".into()],
162        max_age: Duration::from_secs(7 * 24 * 60 * 60),
163        max_bytes: 256 * MIB,
164        discard: DiscardPolicy::Old,
165        ..Default::default()
166    })
167    .await
168    .with_context(|| format!("create_or_update_stream {STREAM_EVENTS}"))?;
169    info!(stream = STREAM_EVENTS, "ready");
170
171    // AUDIT — operator-action record (spec §2.3.1). The DURABLE
172    // copy is the backend's SQLite `audit_log` table (the projector
173    // INSERTs each message, idempotently since #501; 365-day
174    // retention since #486) — the stream is transport + replay
175    // buffer, not the archive, so it can be bounded like the rest.
176    // 90 days / 512 MiB is far more than the projector ever lags;
177    // previously this stream had NO limits at all, making it an
178    // unbounded disk leak on the broker host.
179    js.create_or_update_stream(StreamConfig {
180        name: STREAM_AUDIT.into(),
181        subjects: vec!["audit.>".into()],
182        max_age: Duration::from_secs(90 * 24 * 60 * 60),
183        max_bytes: 512 * MIB,
184        discard: DiscardPolicy::Old,
185        ..Default::default()
186    })
187    .await
188    .with_context(|| format!("create_or_update_stream {STREAM_AUDIT}"))?;
189    info!(stream = STREAM_AUDIT, "ready");
190
191    // OBS_EVENTS — per-PC observability timeline (Issue #246). The
192    // 90-day window matches `obs_events` table retention so a
193    // backend bootstrapping after long downtime can catch up but
194    // doesn't carry data the table will discard anyway. Subject
195    // filter `obs.>` catches every PC without a per-PC subscription.
196    //
197    // Days-to-seconds is spelt out once instead of `90 * 24 * 60 *
198    // 60` open-coded across bootstrap + cleanup; the matching prune
199    // window in `kanade-backend::cleanup` quotes the same number
200    // separately (SQLite-relative string syntax there, not a
201    // duration), so it can't share a constant — but a single
202    // arithmetic spell-out here makes the relationship grep-able.
203    const SECS_PER_DAY: u64 = 24 * 60 * 60;
204    const OBS_EVENTS_RETENTION_DAYS: u64 = 90;
205    js.create_or_update_stream(StreamConfig {
206        name: STREAM_OBS_EVENTS.into(),
207        subjects: vec!["obs.>".into()],
208        max_age: Duration::from_secs(OBS_EVENTS_RETENTION_DAYS * SECS_PER_DAY),
209        max_bytes: 512 * MIB,
210        discard: DiscardPolicy::Old,
211        ..Default::default()
212    })
213    .await
214    .with_context(|| format!("create_or_update_stream {STREAM_OBS_EVENTS}"))?;
215    info!(stream = STREAM_OBS_EVENTS, "ready");
216
217    // ── KV buckets ───────────────────────────────────────────────
218    // script_current — cmd_id → version (spec §2.6 Layer 2).
219    js.create_or_update_key_value(KvConfig {
220        bucket: BUCKET_SCRIPT_CURRENT.into(),
221        history: 5,
222        ..Default::default()
223    })
224    .await
225    .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_CURRENT}"))?;
226    info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
227
228    // script_status — cmd_id → ACTIVE / REVOKED.
229    js.create_or_update_key_value(KvConfig {
230        bucket: BUCKET_SCRIPT_STATUS.into(),
231        history: 5,
232        ..Default::default()
233    })
234    .await
235    .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_STATUS}"))?;
236    info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
237
238    // agents_state — pc_id → latest hw snapshot (history=1).
239    js.create_or_update_key_value(KvConfig {
240        bucket: BUCKET_AGENTS_STATE.into(),
241        history: 1,
242        ..Default::default()
243    })
244    .await
245    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENTS_STATE}"))?;
246    info!(bucket = BUCKET_AGENTS_STATE, "ready");
247
248    // agent_config — Sprint 6 layered scopes (global / groups.* /
249    // pcs.*) plus the legacy target_version key.
250    js.create_or_update_key_value(KvConfig {
251        bucket: BUCKET_AGENT_CONFIG.into(),
252        history: 5,
253        ..Default::default()
254    })
255    .await
256    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_CONFIG}"))?;
257    info!(bucket = BUCKET_AGENT_CONFIG, "ready");
258
259    // agent_groups — Sprint 5 per-pc group membership.
260    js.create_or_update_key_value(KvConfig {
261        bucket: BUCKET_AGENT_GROUPS.into(),
262        history: 5,
263        ..Default::default()
264    })
265    .await
266    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_GROUPS}"))?;
267    info!(bucket = BUCKET_AGENT_GROUPS, "ready");
268
269    // schedules — admin-API CRUD'd cron table (spec §2.5.3).
270    // Backend's scheduler.rs also creates this on startup; calling
271    // twice is harmless.
272    js.create_or_update_key_value(KvConfig {
273        bucket: BUCKET_SCHEDULES.into(),
274        history: 5,
275        ..Default::default()
276    })
277    .await
278    .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES}"))?;
279    info!(bucket = BUCKET_SCHEDULES, "ready");
280
281    // jobs — v0.15 operator-registered Manifest catalog. Schedules
282    // reference rows here by id; editing a job rewrites what future
283    // schedule fires exec.
284    js.create_or_update_key_value(KvConfig {
285        bucket: BUCKET_JOBS.into(),
286        history: 5,
287        ..Default::default()
288    })
289    .await
290    .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS}"))?;
291    info!(bucket = BUCKET_JOBS, "ready");
292
293    // fleet_config — #418 Phase 5 fleet-wide singletons (the global
294    // change-freeze under KEY_FREEZE). history: 1 — only the current
295    // state matters; both schedulers watch it.
296    js.create_or_update_key_value(KvConfig {
297        bucket: BUCKET_FLEET_CONFIG.into(),
298        history: 1,
299        ..Default::default()
300    })
301    .await
302    .with_context(|| format!("create_or_update_key_value {BUCKET_FLEET_CONFIG}"))?;
303    info!(bucket = BUCKET_FLEET_CONFIG, "ready");
304
305    // jobs_yaml / schedules_yaml — operator source-of-truth YAML
306    // alongside the JSON catalogs above. Same key shape (manifest id
307    // / schedule id), but the value is the raw YAML bytes so the
308    // SPA's YAML editor preserves comments + script block-scalar
309    // indentation across edits. Agents/scheduler don't read these.
310    js.create_or_update_key_value(KvConfig {
311        bucket: BUCKET_JOBS_YAML.into(),
312        history: 5,
313        ..Default::default()
314    })
315    .await
316    .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS_YAML}"))?;
317    info!(bucket = BUCKET_JOBS_YAML, "ready");
318
319    js.create_or_update_key_value(KvConfig {
320        bucket: BUCKET_SCHEDULES_YAML.into(),
321        history: 5,
322        ..Default::default()
323    })
324    .await
325    .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES_YAML}"))?;
326    info!(bucket = BUCKET_SCHEDULES_YAML, "ready");
327
328    // ── Object Store ─────────────────────────────────────────────
329    // agent_releases — one object per version, raw exe bytes.
330    ensure_object_store(
331        js,
332        ObjectStoreConfig {
333            bucket: OBJECT_AGENT_RELEASES.into(),
334            ..Default::default()
335        },
336    )
337    .await?;
338
339    // app_packages — generic operator-uploaded binary distribution
340    // (kanade-client today; third-party installers like Webex /
341    // Teams once those flows land). Object keys are
342    // `<name>/<version>`; see `kanade-shared::kv::OBJECT_APP_PACKAGES`
343    // for the full rationale.
344    ensure_object_store(
345        js,
346        ObjectStoreConfig {
347            bucket: OBJECT_APP_PACKAGES.into(),
348            ..Default::default()
349        },
350    )
351    .await?;
352
353    // scripts — manifest script bodies referenced by
354    // `Execute::script_object` (SPEC §2.4.1). Sibling of
355    // `app_packages`; see `kanade-shared::kv::OBJECT_SCRIPTS` for
356    // the bucket-split rationale (smaller payloads + manifest-
357    // coupled lifecycle vs operator-curated installers).
358    ensure_object_store(
359        js,
360        ObjectStoreConfig {
361            bucket: OBJECT_SCRIPTS.into(),
362            ..Default::default()
363        },
364    )
365    .await?;
366
367    // result_output — overflow stdout / stderr blobs for the
368    // `ExecResult` wire kind (#227). Anything larger than the agent's
369    // 256 KB inline threshold gets uploaded here under
370    // `<request_id>/{stdout,stderr}`; the backend's results
371    // projector derefs the pointer fields before INSERT so SQLite
372    // + the SPA see the full text inline. 30-day max_age matches
373    // STREAM_RESULTS so the lifetimes stay in lockstep — a row still
374    // resolvable in execution_results never points at a missing
375    // blob.
376    // #518: capped like the streams — a job whose output overflows
377    // the inline threshold writes blobs HERE instead of
378    // STREAM_RESULTS, so without its own cap this store bypasses
379    // the stream budget entirely and can still fill the file store.
380    // The projector derefs blobs within seconds of publish, so
381    // eviction only ever hits already-projected (or expired)
382    // output.
383    ensure_object_store(
384        js,
385        ObjectStoreConfig {
386            bucket: OBJECT_RESULT_OUTPUT.into(),
387            max_age: Duration::from_secs(SECS_PER_DAY * 30),
388            max_bytes: GIB,
389            ..Default::default()
390        },
391    )
392    .await?;
393
394    Ok(())
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400    use std::process::Stdio;
401
402    /// Throwaway `nats-server -js` on a random port, like the
403    /// kv_cas_live / offline_boot harnesses. Ignored tests only.
404    struct Broker {
405        js: jetstream::Context,
406        _server: tokio::process::Child,
407        _storage: tempfile::TempDir,
408    }
409
410    async fn spawn_broker() -> Broker {
411        let port = portpicker::pick_unused_port().expect("pick port");
412        let storage = tempfile::TempDir::new().expect("storage tempdir");
413        let server = tokio::process::Command::new("nats-server")
414            .arg("-js")
415            .arg("-p")
416            .arg(port.to_string())
417            .arg("-sd")
418            .arg(storage.path())
419            .stdout(Stdio::null())
420            .stderr(Stdio::null())
421            .kill_on_drop(true)
422            .spawn()
423            .expect("spawn nats-server (is it in PATH?)");
424        let url = format!("nats://127.0.0.1:{port}");
425        let mut client = None;
426        for _ in 0..50 {
427            if let Ok(c) = async_nats::connect(&url).await {
428                client = Some(c);
429                break;
430            }
431            tokio::time::sleep(Duration::from_millis(100)).await;
432        }
433        Broker {
434            js: jetstream::new(client.expect("nats-server did not come up in 5s")),
435            _server: server,
436            _storage: storage,
437        }
438    }
439
440    /// #506 / 2026-06-11 incident: `create_object_store` neither
441    /// reconciles config nor has a create-or-update form, so adding
442    /// the #518 `max_bytes` cap to a store first created uncapped made
443    /// the broker reject the create (error 10058 "name already in use
444    /// with a different configuration") and crashed the backend on
445    /// boot. `ensure_object_store` must instead accept the existing
446    /// store and let startup proceed.
447    #[tokio::test]
448    #[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
449    async fn ensure_object_store_accepts_config_drift() {
450        let b = spawn_broker().await;
451        // First create: uncapped, as the pre-#518 backend did.
452        ensure_object_store(
453            &b.js,
454            ObjectStoreConfig {
455                bucket: "result_output".into(),
456                ..Default::default()
457            },
458        )
459        .await
460        .expect("fresh create");
461
462        // Second create with a conflicting config (now capped) must
463        // NOT error — it accepts the existing store.
464        ensure_object_store(
465            &b.js,
466            ObjectStoreConfig {
467                bucket: "result_output".into(),
468                max_bytes: 1024 * 1024 * 1024,
469                ..Default::default()
470            },
471        )
472        .await
473        .expect("config drift must not wedge startup");
474
475        // The store is still usable.
476        let store = b.js.get_object_store("result_output").await.expect("store");
477        store
478            .put("k", &mut &b"hi"[..])
479            .await
480            .expect("put after drift");
481    }
482
483    /// A fresh create with a cap succeeds on a broker with room (the
484    /// normal first-boot path).
485    #[tokio::test]
486    #[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
487    async fn ensure_object_store_fresh_create_with_cap() {
488        let b = spawn_broker().await;
489        ensure_object_store(
490            &b.js,
491            ObjectStoreConfig {
492                bucket: "fresh".into(),
493                max_bytes: 64 * 1024 * 1024,
494                ..Default::default()
495            },
496        )
497        .await
498        .expect("fresh capped create");
499        b.js.get_object_store("fresh").await.expect("exists");
500    }
501
502    /// The fatal path: when create fails for a store that ALSO does
503    /// not exist, the error must propagate (we only swallow errors we
504    /// can fall back from). An invalid bucket name fails create's
505    /// charset validation and never creates a store to fall back to.
506    #[tokio::test]
507    #[ignore = "requires nats-server in PATH; cargo test -- --ignored"]
508    async fn ensure_object_store_propagates_when_no_fallback() {
509        let b = spawn_broker().await;
510        let err = ensure_object_store(
511            &b.js,
512            ObjectStoreConfig {
513                // Spaces / '!' are rejected by the object-store name
514                // rules, so create fails and get also finds nothing.
515                bucket: "bad name!".into(),
516                ..Default::default()
517            },
518        )
519        .await
520        .expect_err("a create failure with no existing store must be fatal");
521        assert!(
522            err.to_string()
523                .contains("no existing store to fall back to"),
524            "unexpected error: {err:#}",
525        );
526    }
527}