Skip to main content

kanade_shared/
bootstrap.rs

1//! Idempotent JetStream bootstrap (Sprint 6.x follow-up).
2//!
3//! Lists every NATS JetStream resource the kanade fleet expects —
4//! streams, KV buckets, Object Stores — and asks the broker to
5//! create-or-update them. v0.25.0 switched from `create_*` to
6//! `create_or_update_*`: the old form returned error 10058 ("name
7//! already in use with a different configuration") when a release
8//! widened a stream's subjects or changed its retention policy on
9//! a broker that still held the older config. With the new form the
10//! broker reconciles its definition to the one in this file, so
11//! version bumps no longer require operator-side data wipes.
12//!
13//! Centralising the list here means a future "we added a new
14//! bucket" change touches one place and both the operator CLI +
15//! the auto-bootstrap path pick it up.
16
17use std::time::Duration;
18
19use anyhow::{Context, Result};
20use async_nats::jetstream::{
21    self,
22    kv::Config as KvConfig,
23    object_store::Config as ObjectStoreConfig,
24    stream::{Config as StreamConfig, DiscardPolicy},
25};
26use tracing::info;
27
28use crate::kv::{
29    BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_FLEET_CONFIG,
30    BUCKET_JOBS, BUCKET_JOBS_YAML, BUCKET_SCHEDULES, BUCKET_SCHEDULES_YAML, BUCKET_SCRIPT_CURRENT,
31    BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, OBJECT_APP_PACKAGES, OBJECT_RESULT_OUTPUT,
32    OBJECT_SCRIPTS, STREAM_AUDIT, STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY, STREAM_OBS_EVENTS,
33    STREAM_RESULTS,
34};
35
36/// Idempotently create every NATS JetStream resource the kanade
37/// fleet relies on. Calling repeatedly is safe — `create_*` returns
38/// the existing resource if it's already configured.
39///
40/// Returns once every resource is in place. The function is async
41/// so backends can `await` it as part of their startup sequence
42/// (one round-trip per resource — ~10 RTTs total).
43pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
44    // ── Streams ──────────────────────────────────────────────────
45    // #518: every stream carries a `max_bytes` cap with
46    // `Discard::Old` on top of its `max_age` window. Within their
47    // age windows the streams used to be unbounded by size, and
48    // JetStream's file store shares a disk with SQLite on the
49    // backend host — one job printing 200 KB per run fleet-wide
50    // could exhaust the store, at which point EVERY publish fails
51    // (results, obs, audit, KV puts). With the caps, worst-case
52    // degradation is "shorter history on the offending stream"
53    // instead of "broker down".
54    //
55    // Sizing: JetStream RESERVES each `max_bytes` against its
56    // available storage (min of max_file_store and free disk) at
57    // create/update time and fails with error 10047 when the sum
58    // doesn't fit, so these must stay small enough for modest
59    // hosts. That's fine: every stream here is a transport +
60    // replay buffer — the durable record is the backend's SQLite
61    // (results/inventory/obs/audit are all projected within
62    // seconds) — so the caps are runaway-output backstops, not
63    // history budgets. Total reservation ≈ 5.3 GiB including the
64    // result_output object store below.
65    const MIB: i64 = 1024 * 1024;
66    const GIB: i64 = 1024 * MIB;
67
68    // INVENTORY — 90-day rolling history (spec §2.3.1).
69    js.create_or_update_stream(StreamConfig {
70        name: STREAM_INVENTORY.into(),
71        subjects: vec!["inventory.>".into()],
72        max_age: Duration::from_secs(90 * 24 * 60 * 60),
73        max_bytes: GIB,
74        discard: DiscardPolicy::Old,
75        ..Default::default()
76    })
77    .await
78    .with_context(|| format!("create_or_update_stream {STREAM_INVENTORY}"))?;
79    info!(stream = STREAM_INVENTORY, "ready");
80
81    // RESULTS — 30-day rolling history. The biggest producer by
82    // far (every job run on every PC, with up to 256 KB of inline
83    // stdout/stderr per message), so it gets the largest slice of
84    // the disk budget.
85    js.create_or_update_stream(StreamConfig {
86        name: STREAM_RESULTS.into(),
87        subjects: vec!["results.>".into()],
88        max_age: Duration::from_secs(30 * 24 * 60 * 60),
89        max_bytes: 2 * GIB,
90        discard: DiscardPolicy::Old,
91        ..Default::default()
92    })
93    .await
94    .with_context(|| format!("create_or_update_stream {STREAM_RESULTS}"))?;
95    info!(stream = STREAM_RESULTS, "ready");
96
97    // EXEC — latest-per-subject only (spec §2.6 Layer 1). v0.22.1:
98    // catch the existing `commands.{all,group.X,pc.Y}` subjects so a
99    // single backend publish lands in BOTH the agent's live core
100    // subscription AND the stream's retention store. Reconnecting
101    // agents catch up via a durable consumer with
102    // `DeliverPolicy::LastPerSubject` — they receive the most
103    // recent Command per subject they care about, no matter how
104    // long they were offline (within `max_age`).
105    js.create_or_update_stream(StreamConfig {
106        name: STREAM_EXEC.into(),
107        subjects: vec!["commands.>".into()],
108        max_messages_per_subject: 1,
109        max_age: Duration::from_secs(7 * 24 * 60 * 60),
110        // Latest-per-subject keeps this tiny (one Command per
111        // group/pc subject); the cap is a backstop against subject
112        // cardinality bugs, not a working budget.
113        max_bytes: 64 * MIB,
114        discard: DiscardPolicy::Old,
115        ..Default::default()
116    })
117    .await
118    .with_context(|| format!("create_or_update_stream {STREAM_EXEC}"))?;
119    info!(stream = STREAM_EXEC, "ready");
120
121    // EVENTS — short-lived broadcast bus for kill / revoke / etc.
122    // 7-day window matches the EXEC spec window.
123    js.create_or_update_stream(StreamConfig {
124        name: STREAM_EVENTS.into(),
125        subjects: vec!["events.>".into()],
126        max_age: Duration::from_secs(7 * 24 * 60 * 60),
127        max_bytes: 256 * MIB,
128        discard: DiscardPolicy::Old,
129        ..Default::default()
130    })
131    .await
132    .with_context(|| format!("create_or_update_stream {STREAM_EVENTS}"))?;
133    info!(stream = STREAM_EVENTS, "ready");
134
135    // AUDIT — operator-action record (spec §2.3.1). The DURABLE
136    // copy is the backend's SQLite `audit_log` table (the projector
137    // INSERTs each message, idempotently since #501; 365-day
138    // retention since #486) — the stream is transport + replay
139    // buffer, not the archive, so it can be bounded like the rest.
140    // 90 days / 512 MiB is far more than the projector ever lags;
141    // previously this stream had NO limits at all, making it an
142    // unbounded disk leak on the broker host.
143    js.create_or_update_stream(StreamConfig {
144        name: STREAM_AUDIT.into(),
145        subjects: vec!["audit.>".into()],
146        max_age: Duration::from_secs(90 * 24 * 60 * 60),
147        max_bytes: 512 * MIB,
148        discard: DiscardPolicy::Old,
149        ..Default::default()
150    })
151    .await
152    .with_context(|| format!("create_or_update_stream {STREAM_AUDIT}"))?;
153    info!(stream = STREAM_AUDIT, "ready");
154
155    // OBS_EVENTS — per-PC observability timeline (Issue #246). The
156    // 90-day window matches `obs_events` table retention so a
157    // backend bootstrapping after long downtime can catch up but
158    // doesn't carry data the table will discard anyway. Subject
159    // filter `obs.>` catches every PC without a per-PC subscription.
160    //
161    // Days-to-seconds is spelt out once instead of `90 * 24 * 60 *
162    // 60` open-coded across bootstrap + cleanup; the matching prune
163    // window in `kanade-backend::cleanup` quotes the same number
164    // separately (SQLite-relative string syntax there, not a
165    // duration), so it can't share a constant — but a single
166    // arithmetic spell-out here makes the relationship grep-able.
167    const SECS_PER_DAY: u64 = 24 * 60 * 60;
168    const OBS_EVENTS_RETENTION_DAYS: u64 = 90;
169    js.create_or_update_stream(StreamConfig {
170        name: STREAM_OBS_EVENTS.into(),
171        subjects: vec!["obs.>".into()],
172        max_age: Duration::from_secs(OBS_EVENTS_RETENTION_DAYS * SECS_PER_DAY),
173        max_bytes: 512 * MIB,
174        discard: DiscardPolicy::Old,
175        ..Default::default()
176    })
177    .await
178    .with_context(|| format!("create_or_update_stream {STREAM_OBS_EVENTS}"))?;
179    info!(stream = STREAM_OBS_EVENTS, "ready");
180
181    // ── KV buckets ───────────────────────────────────────────────
182    // script_current — cmd_id → version (spec §2.6 Layer 2).
183    js.create_or_update_key_value(KvConfig {
184        bucket: BUCKET_SCRIPT_CURRENT.into(),
185        history: 5,
186        ..Default::default()
187    })
188    .await
189    .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_CURRENT}"))?;
190    info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
191
192    // script_status — cmd_id → ACTIVE / REVOKED.
193    js.create_or_update_key_value(KvConfig {
194        bucket: BUCKET_SCRIPT_STATUS.into(),
195        history: 5,
196        ..Default::default()
197    })
198    .await
199    .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_STATUS}"))?;
200    info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
201
202    // agents_state — pc_id → latest hw snapshot (history=1).
203    js.create_or_update_key_value(KvConfig {
204        bucket: BUCKET_AGENTS_STATE.into(),
205        history: 1,
206        ..Default::default()
207    })
208    .await
209    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENTS_STATE}"))?;
210    info!(bucket = BUCKET_AGENTS_STATE, "ready");
211
212    // agent_config — Sprint 6 layered scopes (global / groups.* /
213    // pcs.*) plus the legacy target_version key.
214    js.create_or_update_key_value(KvConfig {
215        bucket: BUCKET_AGENT_CONFIG.into(),
216        history: 5,
217        ..Default::default()
218    })
219    .await
220    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_CONFIG}"))?;
221    info!(bucket = BUCKET_AGENT_CONFIG, "ready");
222
223    // agent_groups — Sprint 5 per-pc group membership.
224    js.create_or_update_key_value(KvConfig {
225        bucket: BUCKET_AGENT_GROUPS.into(),
226        history: 5,
227        ..Default::default()
228    })
229    .await
230    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_GROUPS}"))?;
231    info!(bucket = BUCKET_AGENT_GROUPS, "ready");
232
233    // schedules — admin-API CRUD'd cron table (spec §2.5.3).
234    // Backend's scheduler.rs also creates this on startup; calling
235    // twice is harmless.
236    js.create_or_update_key_value(KvConfig {
237        bucket: BUCKET_SCHEDULES.into(),
238        history: 5,
239        ..Default::default()
240    })
241    .await
242    .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES}"))?;
243    info!(bucket = BUCKET_SCHEDULES, "ready");
244
245    // jobs — v0.15 operator-registered Manifest catalog. Schedules
246    // reference rows here by id; editing a job rewrites what future
247    // schedule fires exec.
248    js.create_or_update_key_value(KvConfig {
249        bucket: BUCKET_JOBS.into(),
250        history: 5,
251        ..Default::default()
252    })
253    .await
254    .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS}"))?;
255    info!(bucket = BUCKET_JOBS, "ready");
256
257    // fleet_config — #418 Phase 5 fleet-wide singletons (the global
258    // change-freeze under KEY_FREEZE). history: 1 — only the current
259    // state matters; both schedulers watch it.
260    js.create_or_update_key_value(KvConfig {
261        bucket: BUCKET_FLEET_CONFIG.into(),
262        history: 1,
263        ..Default::default()
264    })
265    .await
266    .with_context(|| format!("create_or_update_key_value {BUCKET_FLEET_CONFIG}"))?;
267    info!(bucket = BUCKET_FLEET_CONFIG, "ready");
268
269    // jobs_yaml / schedules_yaml — operator source-of-truth YAML
270    // alongside the JSON catalogs above. Same key shape (manifest id
271    // / schedule id), but the value is the raw YAML bytes so the
272    // SPA's YAML editor preserves comments + script block-scalar
273    // indentation across edits. Agents/scheduler don't read these.
274    js.create_or_update_key_value(KvConfig {
275        bucket: BUCKET_JOBS_YAML.into(),
276        history: 5,
277        ..Default::default()
278    })
279    .await
280    .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS_YAML}"))?;
281    info!(bucket = BUCKET_JOBS_YAML, "ready");
282
283    js.create_or_update_key_value(KvConfig {
284        bucket: BUCKET_SCHEDULES_YAML.into(),
285        history: 5,
286        ..Default::default()
287    })
288    .await
289    .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES_YAML}"))?;
290    info!(bucket = BUCKET_SCHEDULES_YAML, "ready");
291
292    // ── Object Store ─────────────────────────────────────────────
293    // agent_releases — one object per version, raw exe bytes.
294    js.create_object_store(ObjectStoreConfig {
295        bucket: OBJECT_AGENT_RELEASES.into(),
296        ..Default::default()
297    })
298    .await
299    .with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
300    info!(store = OBJECT_AGENT_RELEASES, "ready");
301
302    // app_packages — generic operator-uploaded binary distribution
303    // (kanade-client today; third-party installers like Webex /
304    // Teams once those flows land). Object keys are
305    // `<name>/<version>`; see `kanade-shared::kv::OBJECT_APP_PACKAGES`
306    // for the full rationale.
307    js.create_object_store(ObjectStoreConfig {
308        bucket: OBJECT_APP_PACKAGES.into(),
309        ..Default::default()
310    })
311    .await
312    .with_context(|| format!("create_object_store {OBJECT_APP_PACKAGES}"))?;
313    info!(store = OBJECT_APP_PACKAGES, "ready");
314
315    // scripts — manifest script bodies referenced by
316    // `Execute::script_object` (SPEC §2.4.1). Sibling of
317    // `app_packages`; see `kanade-shared::kv::OBJECT_SCRIPTS` for
318    // the bucket-split rationale (smaller payloads + manifest-
319    // coupled lifecycle vs operator-curated installers).
320    js.create_object_store(ObjectStoreConfig {
321        bucket: OBJECT_SCRIPTS.into(),
322        ..Default::default()
323    })
324    .await
325    .with_context(|| format!("create_object_store {OBJECT_SCRIPTS}"))?;
326    info!(store = OBJECT_SCRIPTS, "ready");
327
328    // result_output — overflow stdout / stderr blobs for the
329    // `ExecResult` wire kind (#227). Anything larger than the agent's
330    // 256 KB inline threshold gets uploaded here under
331    // `<request_id>/{stdout,stderr}`; the backend's results
332    // projector derefs the pointer fields before INSERT so SQLite
333    // + the SPA see the full text inline. 30-day max_age matches
334    // STREAM_RESULTS so the lifetimes stay in lockstep — a row still
335    // resolvable in execution_results never points at a missing
336    // blob.
337    // #518: capped like the streams — a job whose output overflows
338    // the inline threshold writes blobs HERE instead of
339    // STREAM_RESULTS, so without its own cap this store bypasses
340    // the stream budget entirely and can still fill the file store.
341    // The projector derefs blobs within seconds of publish, so
342    // eviction only ever hits already-projected (or expired)
343    // output.
344    js.create_object_store(ObjectStoreConfig {
345        bucket: OBJECT_RESULT_OUTPUT.into(),
346        max_age: Duration::from_secs(SECS_PER_DAY * 30),
347        max_bytes: GIB,
348        ..Default::default()
349    })
350    .await
351    .with_context(|| format!("create_object_store {OBJECT_RESULT_OUTPUT}"))?;
352    info!(store = OBJECT_RESULT_OUTPUT, "ready");
353
354    Ok(())
355}