kanade_shared/bootstrap.rs
1//! Idempotent JetStream bootstrap (Sprint 6.x follow-up).
2//!
3//! Lists every NATS JetStream resource the kanade fleet expects —
4//! streams, KV buckets, Object Stores — and asks the broker to
5//! create-or-update them. v0.25.0 switched from `create_*` to
6//! `create_or_update_*`: the old form returned error 10058 ("name
7//! already in use with a different configuration") when a release
8//! widened a stream's subjects or changed its retention policy on
9//! a broker that still held the older config. With the new form the
10//! broker reconciles its definition to the one in this file, so
11//! version bumps no longer require operator-side data wipes.
12//!
13//! Centralising the list here means a future "we added a new
14//! bucket" change touches one place and both the operator CLI +
15//! the auto-bootstrap path pick it up.
16
17use std::time::Duration;
18
19use anyhow::{Context, Result};
20use async_nats::jetstream::{
21 self,
22 kv::Config as KvConfig,
23 object_store::Config as ObjectStoreConfig,
24 stream::{Config as StreamConfig, DiscardPolicy},
25};
26use tracing::info;
27
28use crate::kv::{
29 BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_FLEET_CONFIG,
30 BUCKET_JOBS, BUCKET_JOBS_YAML, BUCKET_SCHEDULES, BUCKET_SCHEDULES_YAML, BUCKET_SCRIPT_CURRENT,
31 BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, OBJECT_APP_PACKAGES, OBJECT_RESULT_OUTPUT,
32 OBJECT_SCRIPTS, STREAM_AUDIT, STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY, STREAM_OBS_EVENTS,
33 STREAM_RESULTS,
34};
35
36/// Idempotently create every NATS JetStream resource the kanade
37/// fleet relies on. Calling repeatedly is safe — `create_*` returns
38/// the existing resource if it's already configured.
39///
40/// Returns once every resource is in place. The function is async
41/// so backends can `await` it as part of their startup sequence
42/// (one round-trip per resource — ~10 RTTs total).
43pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
44 // ── Streams ──────────────────────────────────────────────────
45 // #518: every stream carries a `max_bytes` cap with
46 // `Discard::Old` on top of its `max_age` window. Within their
47 // age windows the streams used to be unbounded by size, and
48 // JetStream's file store shares a disk with SQLite on the
49 // backend host — one job printing 200 KB per run fleet-wide
50 // could exhaust the store, at which point EVERY publish fails
51 // (results, obs, audit, KV puts). With the caps, worst-case
52 // degradation is "shorter history on the offending stream"
53 // instead of "broker down".
54 //
55 // Sizing: JetStream RESERVES each `max_bytes` against its
56 // available storage (min of max_file_store and free disk) at
57 // create/update time and fails with error 10047 when the sum
58 // doesn't fit, so these must stay small enough for modest
59 // hosts. That's fine: every stream here is a transport +
60 // replay buffer — the durable record is the backend's SQLite
61 // (results/inventory/obs/audit are all projected within
62 // seconds) — so the caps are runaway-output backstops, not
63 // history budgets. Total reservation ≈ 5.3 GiB including the
64 // result_output object store below.
65 const MIB: i64 = 1024 * 1024;
66 const GIB: i64 = 1024 * MIB;
67
68 // INVENTORY — 90-day rolling history (spec §2.3.1).
69 js.create_or_update_stream(StreamConfig {
70 name: STREAM_INVENTORY.into(),
71 subjects: vec!["inventory.>".into()],
72 max_age: Duration::from_secs(90 * 24 * 60 * 60),
73 max_bytes: GIB,
74 discard: DiscardPolicy::Old,
75 ..Default::default()
76 })
77 .await
78 .with_context(|| format!("create_or_update_stream {STREAM_INVENTORY}"))?;
79 info!(stream = STREAM_INVENTORY, "ready");
80
81 // RESULTS — 30-day rolling history. The biggest producer by
82 // far (every job run on every PC, with up to 256 KB of inline
83 // stdout/stderr per message), so it gets the largest slice of
84 // the disk budget.
85 js.create_or_update_stream(StreamConfig {
86 name: STREAM_RESULTS.into(),
87 subjects: vec!["results.>".into()],
88 max_age: Duration::from_secs(30 * 24 * 60 * 60),
89 max_bytes: 2 * GIB,
90 discard: DiscardPolicy::Old,
91 ..Default::default()
92 })
93 .await
94 .with_context(|| format!("create_or_update_stream {STREAM_RESULTS}"))?;
95 info!(stream = STREAM_RESULTS, "ready");
96
97 // EXEC — latest-per-subject only (spec §2.6 Layer 1). v0.22.1:
98 // catch the existing `commands.{all,group.X,pc.Y}` subjects so a
99 // single backend publish lands in BOTH the agent's live core
100 // subscription AND the stream's retention store. Reconnecting
101 // agents catch up via a durable consumer with
102 // `DeliverPolicy::LastPerSubject` — they receive the most
103 // recent Command per subject they care about, no matter how
104 // long they were offline (within `max_age`).
105 js.create_or_update_stream(StreamConfig {
106 name: STREAM_EXEC.into(),
107 subjects: vec!["commands.>".into()],
108 max_messages_per_subject: 1,
109 max_age: Duration::from_secs(7 * 24 * 60 * 60),
110 // Latest-per-subject keeps this tiny (one Command per
111 // group/pc subject); the cap is a backstop against subject
112 // cardinality bugs, not a working budget.
113 max_bytes: 64 * MIB,
114 discard: DiscardPolicy::Old,
115 ..Default::default()
116 })
117 .await
118 .with_context(|| format!("create_or_update_stream {STREAM_EXEC}"))?;
119 info!(stream = STREAM_EXEC, "ready");
120
121 // EVENTS — short-lived broadcast bus for kill / revoke / etc.
122 // 7-day window matches the EXEC spec window.
123 js.create_or_update_stream(StreamConfig {
124 name: STREAM_EVENTS.into(),
125 subjects: vec!["events.>".into()],
126 max_age: Duration::from_secs(7 * 24 * 60 * 60),
127 max_bytes: 256 * MIB,
128 discard: DiscardPolicy::Old,
129 ..Default::default()
130 })
131 .await
132 .with_context(|| format!("create_or_update_stream {STREAM_EVENTS}"))?;
133 info!(stream = STREAM_EVENTS, "ready");
134
135 // AUDIT — operator-action record (spec §2.3.1). The DURABLE
136 // copy is the backend's SQLite `audit_log` table (the projector
137 // INSERTs each message, idempotently since #501; 365-day
138 // retention since #486) — the stream is transport + replay
139 // buffer, not the archive, so it can be bounded like the rest.
140 // 90 days / 512 MiB is far more than the projector ever lags;
141 // previously this stream had NO limits at all, making it an
142 // unbounded disk leak on the broker host.
143 js.create_or_update_stream(StreamConfig {
144 name: STREAM_AUDIT.into(),
145 subjects: vec!["audit.>".into()],
146 max_age: Duration::from_secs(90 * 24 * 60 * 60),
147 max_bytes: 512 * MIB,
148 discard: DiscardPolicy::Old,
149 ..Default::default()
150 })
151 .await
152 .with_context(|| format!("create_or_update_stream {STREAM_AUDIT}"))?;
153 info!(stream = STREAM_AUDIT, "ready");
154
155 // OBS_EVENTS — per-PC observability timeline (Issue #246). The
156 // 90-day window matches `obs_events` table retention so a
157 // backend bootstrapping after long downtime can catch up but
158 // doesn't carry data the table will discard anyway. Subject
159 // filter `obs.>` catches every PC without a per-PC subscription.
160 //
161 // Days-to-seconds is spelt out once instead of `90 * 24 * 60 *
162 // 60` open-coded across bootstrap + cleanup; the matching prune
163 // window in `kanade-backend::cleanup` quotes the same number
164 // separately (SQLite-relative string syntax there, not a
165 // duration), so it can't share a constant — but a single
166 // arithmetic spell-out here makes the relationship grep-able.
167 const SECS_PER_DAY: u64 = 24 * 60 * 60;
168 const OBS_EVENTS_RETENTION_DAYS: u64 = 90;
169 js.create_or_update_stream(StreamConfig {
170 name: STREAM_OBS_EVENTS.into(),
171 subjects: vec!["obs.>".into()],
172 max_age: Duration::from_secs(OBS_EVENTS_RETENTION_DAYS * SECS_PER_DAY),
173 max_bytes: 512 * MIB,
174 discard: DiscardPolicy::Old,
175 ..Default::default()
176 })
177 .await
178 .with_context(|| format!("create_or_update_stream {STREAM_OBS_EVENTS}"))?;
179 info!(stream = STREAM_OBS_EVENTS, "ready");
180
181 // ── KV buckets ───────────────────────────────────────────────
182 // script_current — cmd_id → version (spec §2.6 Layer 2).
183 js.create_or_update_key_value(KvConfig {
184 bucket: BUCKET_SCRIPT_CURRENT.into(),
185 history: 5,
186 ..Default::default()
187 })
188 .await
189 .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_CURRENT}"))?;
190 info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
191
192 // script_status — cmd_id → ACTIVE / REVOKED.
193 js.create_or_update_key_value(KvConfig {
194 bucket: BUCKET_SCRIPT_STATUS.into(),
195 history: 5,
196 ..Default::default()
197 })
198 .await
199 .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_STATUS}"))?;
200 info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
201
202 // agents_state — pc_id → latest hw snapshot (history=1).
203 js.create_or_update_key_value(KvConfig {
204 bucket: BUCKET_AGENTS_STATE.into(),
205 history: 1,
206 ..Default::default()
207 })
208 .await
209 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENTS_STATE}"))?;
210 info!(bucket = BUCKET_AGENTS_STATE, "ready");
211
212 // agent_config — Sprint 6 layered scopes (global / groups.* /
213 // pcs.*) plus the legacy target_version key.
214 js.create_or_update_key_value(KvConfig {
215 bucket: BUCKET_AGENT_CONFIG.into(),
216 history: 5,
217 ..Default::default()
218 })
219 .await
220 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_CONFIG}"))?;
221 info!(bucket = BUCKET_AGENT_CONFIG, "ready");
222
223 // agent_groups — Sprint 5 per-pc group membership.
224 js.create_or_update_key_value(KvConfig {
225 bucket: BUCKET_AGENT_GROUPS.into(),
226 history: 5,
227 ..Default::default()
228 })
229 .await
230 .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_GROUPS}"))?;
231 info!(bucket = BUCKET_AGENT_GROUPS, "ready");
232
233 // schedules — admin-API CRUD'd cron table (spec §2.5.3).
234 // Backend's scheduler.rs also creates this on startup; calling
235 // twice is harmless.
236 js.create_or_update_key_value(KvConfig {
237 bucket: BUCKET_SCHEDULES.into(),
238 history: 5,
239 ..Default::default()
240 })
241 .await
242 .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES}"))?;
243 info!(bucket = BUCKET_SCHEDULES, "ready");
244
245 // jobs — v0.15 operator-registered Manifest catalog. Schedules
246 // reference rows here by id; editing a job rewrites what future
247 // schedule fires exec.
248 js.create_or_update_key_value(KvConfig {
249 bucket: BUCKET_JOBS.into(),
250 history: 5,
251 ..Default::default()
252 })
253 .await
254 .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS}"))?;
255 info!(bucket = BUCKET_JOBS, "ready");
256
257 // fleet_config — #418 Phase 5 fleet-wide singletons (the global
258 // change-freeze under KEY_FREEZE). history: 1 — only the current
259 // state matters; both schedulers watch it.
260 js.create_or_update_key_value(KvConfig {
261 bucket: BUCKET_FLEET_CONFIG.into(),
262 history: 1,
263 ..Default::default()
264 })
265 .await
266 .with_context(|| format!("create_or_update_key_value {BUCKET_FLEET_CONFIG}"))?;
267 info!(bucket = BUCKET_FLEET_CONFIG, "ready");
268
269 // jobs_yaml / schedules_yaml — operator source-of-truth YAML
270 // alongside the JSON catalogs above. Same key shape (manifest id
271 // / schedule id), but the value is the raw YAML bytes so the
272 // SPA's YAML editor preserves comments + script block-scalar
273 // indentation across edits. Agents/scheduler don't read these.
274 js.create_or_update_key_value(KvConfig {
275 bucket: BUCKET_JOBS_YAML.into(),
276 history: 5,
277 ..Default::default()
278 })
279 .await
280 .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS_YAML}"))?;
281 info!(bucket = BUCKET_JOBS_YAML, "ready");
282
283 js.create_or_update_key_value(KvConfig {
284 bucket: BUCKET_SCHEDULES_YAML.into(),
285 history: 5,
286 ..Default::default()
287 })
288 .await
289 .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES_YAML}"))?;
290 info!(bucket = BUCKET_SCHEDULES_YAML, "ready");
291
292 // ── Object Store ─────────────────────────────────────────────
293 // agent_releases — one object per version, raw exe bytes.
294 js.create_object_store(ObjectStoreConfig {
295 bucket: OBJECT_AGENT_RELEASES.into(),
296 ..Default::default()
297 })
298 .await
299 .with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
300 info!(store = OBJECT_AGENT_RELEASES, "ready");
301
302 // app_packages — generic operator-uploaded binary distribution
303 // (kanade-client today; third-party installers like Webex /
304 // Teams once those flows land). Object keys are
305 // `<name>/<version>`; see `kanade-shared::kv::OBJECT_APP_PACKAGES`
306 // for the full rationale.
307 js.create_object_store(ObjectStoreConfig {
308 bucket: OBJECT_APP_PACKAGES.into(),
309 ..Default::default()
310 })
311 .await
312 .with_context(|| format!("create_object_store {OBJECT_APP_PACKAGES}"))?;
313 info!(store = OBJECT_APP_PACKAGES, "ready");
314
315 // scripts — manifest script bodies referenced by
316 // `Execute::script_object` (SPEC §2.4.1). Sibling of
317 // `app_packages`; see `kanade-shared::kv::OBJECT_SCRIPTS` for
318 // the bucket-split rationale (smaller payloads + manifest-
319 // coupled lifecycle vs operator-curated installers).
320 js.create_object_store(ObjectStoreConfig {
321 bucket: OBJECT_SCRIPTS.into(),
322 ..Default::default()
323 })
324 .await
325 .with_context(|| format!("create_object_store {OBJECT_SCRIPTS}"))?;
326 info!(store = OBJECT_SCRIPTS, "ready");
327
328 // result_output — overflow stdout / stderr blobs for the
329 // `ExecResult` wire kind (#227). Anything larger than the agent's
330 // 256 KB inline threshold gets uploaded here under
331 // `<request_id>/{stdout,stderr}`; the backend's results
332 // projector derefs the pointer fields before INSERT so SQLite
333 // + the SPA see the full text inline. 30-day max_age matches
334 // STREAM_RESULTS so the lifetimes stay in lockstep — a row still
335 // resolvable in execution_results never points at a missing
336 // blob.
337 // #518: capped like the streams — a job whose output overflows
338 // the inline threshold writes blobs HERE instead of
339 // STREAM_RESULTS, so without its own cap this store bypasses
340 // the stream budget entirely and can still fill the file store.
341 // The projector derefs blobs within seconds of publish, so
342 // eviction only ever hits already-projected (or expired)
343 // output.
344 js.create_object_store(ObjectStoreConfig {
345 bucket: OBJECT_RESULT_OUTPUT.into(),
346 max_age: Duration::from_secs(SECS_PER_DAY * 30),
347 max_bytes: GIB,
348 ..Default::default()
349 })
350 .await
351 .with_context(|| format!("create_object_store {OBJECT_RESULT_OUTPUT}"))?;
352 info!(store = OBJECT_RESULT_OUTPUT, "ready");
353
354 Ok(())
355}