Skip to main content

kanade_shared/
bootstrap.rs

1//! Idempotent JetStream bootstrap (Sprint 6.x follow-up).
2//!
3//! Lists every NATS JetStream resource the kanade fleet expects —
4//! streams, KV buckets, Object Stores — and asks the broker to
5//! create-or-update them. v0.25.0 switched from `create_*` to
6//! `create_or_update_*`: the old form returned error 10058 ("name
7//! already in use with a different configuration") when a release
8//! widened a stream's subjects or changed its retention policy on
9//! a broker that still held the older config. With the new form the
10//! broker reconciles its definition to the one in this file, so
11//! version bumps no longer require operator-side data wipes.
12//!
13//! Centralising the list here means a future "we added a new
14//! bucket" change touches one place and both the operator CLI +
15//! the auto-bootstrap path pick it up.
16
17use std::time::Duration;
18
19use anyhow::{Context, Result};
20use async_nats::jetstream::{
21    self,
22    kv::Config as KvConfig,
23    object_store::Config as ObjectStoreConfig,
24    stream::{Config as StreamConfig, DiscardPolicy},
25};
26use tracing::info;
27
28use crate::kv::{
29    BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_JOBS, BUCKET_JOBS_YAML,
30    BUCKET_SCHEDULES, BUCKET_SCHEDULES_YAML, BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS,
31    OBJECT_AGENT_RELEASES, OBJECT_APP_PACKAGES, OBJECT_RESULT_OUTPUT, OBJECT_SCRIPTS, STREAM_AUDIT,
32    STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY, STREAM_OBS_EVENTS, STREAM_RESULTS,
33};
34
35/// Idempotently create every NATS JetStream resource the kanade
36/// fleet relies on. Calling repeatedly is safe — `create_*` returns
37/// the existing resource if it's already configured.
38///
39/// Returns once every resource is in place. The function is async
40/// so backends can `await` it as part of their startup sequence
41/// (one round-trip per resource — ~10 RTTs total).
42pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
43    // ── Streams ──────────────────────────────────────────────────
44    // INVENTORY — 90-day rolling history (spec §2.3.1).
45    js.create_or_update_stream(StreamConfig {
46        name: STREAM_INVENTORY.into(),
47        subjects: vec!["inventory.>".into()],
48        max_age: Duration::from_secs(90 * 24 * 60 * 60),
49        ..Default::default()
50    })
51    .await
52    .with_context(|| format!("create_or_update_stream {STREAM_INVENTORY}"))?;
53    info!(stream = STREAM_INVENTORY, "ready");
54
55    // RESULTS — 30-day rolling history.
56    js.create_or_update_stream(StreamConfig {
57        name: STREAM_RESULTS.into(),
58        subjects: vec!["results.>".into()],
59        max_age: Duration::from_secs(30 * 24 * 60 * 60),
60        ..Default::default()
61    })
62    .await
63    .with_context(|| format!("create_or_update_stream {STREAM_RESULTS}"))?;
64    info!(stream = STREAM_RESULTS, "ready");
65
66    // EXEC — latest-per-subject only (spec §2.6 Layer 1). v0.22.1:
67    // catch the existing `commands.{all,group.X,pc.Y}` subjects so a
68    // single backend publish lands in BOTH the agent's live core
69    // subscription AND the stream's retention store. Reconnecting
70    // agents catch up via a durable consumer with
71    // `DeliverPolicy::LastPerSubject` — they receive the most
72    // recent Command per subject they care about, no matter how
73    // long they were offline (within `max_age`).
74    js.create_or_update_stream(StreamConfig {
75        name: STREAM_EXEC.into(),
76        subjects: vec!["commands.>".into()],
77        max_messages_per_subject: 1,
78        discard: DiscardPolicy::Old,
79        max_age: Duration::from_secs(7 * 24 * 60 * 60),
80        ..Default::default()
81    })
82    .await
83    .with_context(|| format!("create_or_update_stream {STREAM_EXEC}"))?;
84    info!(stream = STREAM_EXEC, "ready");
85
86    // EVENTS — short-lived broadcast bus for kill / revoke / etc.
87    // 7-day window matches the EXEC spec window.
88    js.create_or_update_stream(StreamConfig {
89        name: STREAM_EVENTS.into(),
90        subjects: vec!["events.>".into()],
91        max_age: Duration::from_secs(7 * 24 * 60 * 60),
92        ..Default::default()
93    })
94    .await
95    .with_context(|| format!("create_or_update_stream {STREAM_EVENTS}"))?;
96    info!(stream = STREAM_EVENTS, "ready");
97
98    // AUDIT — permanent record of operator actions (spec §2.3.1).
99    js.create_or_update_stream(StreamConfig {
100        name: STREAM_AUDIT.into(),
101        subjects: vec!["audit.>".into()],
102        ..Default::default()
103    })
104    .await
105    .with_context(|| format!("create_or_update_stream {STREAM_AUDIT}"))?;
106    info!(stream = STREAM_AUDIT, "ready");
107
108    // OBS_EVENTS — per-PC observability timeline (Issue #246). The
109    // 90-day window matches `obs_events` table retention so a
110    // backend bootstrapping after long downtime can catch up but
111    // doesn't carry data the table will discard anyway. Subject
112    // filter `obs.>` catches every PC without a per-PC subscription.
113    //
114    // Days-to-seconds is spelt out once instead of `90 * 24 * 60 *
115    // 60` open-coded across bootstrap + cleanup; the matching prune
116    // window in `kanade-backend::cleanup` quotes the same number
117    // separately (SQLite-relative string syntax there, not a
118    // duration), so it can't share a constant — but a single
119    // arithmetic spell-out here makes the relationship grep-able.
120    const SECS_PER_DAY: u64 = 24 * 60 * 60;
121    const OBS_EVENTS_RETENTION_DAYS: u64 = 90;
122    js.create_or_update_stream(StreamConfig {
123        name: STREAM_OBS_EVENTS.into(),
124        subjects: vec!["obs.>".into()],
125        max_age: Duration::from_secs(OBS_EVENTS_RETENTION_DAYS * SECS_PER_DAY),
126        ..Default::default()
127    })
128    .await
129    .with_context(|| format!("create_or_update_stream {STREAM_OBS_EVENTS}"))?;
130    info!(stream = STREAM_OBS_EVENTS, "ready");
131
132    // ── KV buckets ───────────────────────────────────────────────
133    // script_current — cmd_id → version (spec §2.6 Layer 2).
134    js.create_or_update_key_value(KvConfig {
135        bucket: BUCKET_SCRIPT_CURRENT.into(),
136        history: 5,
137        ..Default::default()
138    })
139    .await
140    .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_CURRENT}"))?;
141    info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
142
143    // script_status — cmd_id → ACTIVE / REVOKED.
144    js.create_or_update_key_value(KvConfig {
145        bucket: BUCKET_SCRIPT_STATUS.into(),
146        history: 5,
147        ..Default::default()
148    })
149    .await
150    .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_STATUS}"))?;
151    info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
152
153    // agents_state — pc_id → latest hw snapshot (history=1).
154    js.create_or_update_key_value(KvConfig {
155        bucket: BUCKET_AGENTS_STATE.into(),
156        history: 1,
157        ..Default::default()
158    })
159    .await
160    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENTS_STATE}"))?;
161    info!(bucket = BUCKET_AGENTS_STATE, "ready");
162
163    // agent_config — Sprint 6 layered scopes (global / groups.* /
164    // pcs.*) plus the legacy target_version key.
165    js.create_or_update_key_value(KvConfig {
166        bucket: BUCKET_AGENT_CONFIG.into(),
167        history: 5,
168        ..Default::default()
169    })
170    .await
171    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_CONFIG}"))?;
172    info!(bucket = BUCKET_AGENT_CONFIG, "ready");
173
174    // agent_groups — Sprint 5 per-pc group membership.
175    js.create_or_update_key_value(KvConfig {
176        bucket: BUCKET_AGENT_GROUPS.into(),
177        history: 5,
178        ..Default::default()
179    })
180    .await
181    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_GROUPS}"))?;
182    info!(bucket = BUCKET_AGENT_GROUPS, "ready");
183
184    // schedules — admin-API CRUD'd cron table (spec §2.5.3).
185    // Backend's scheduler.rs also creates this on startup; calling
186    // twice is harmless.
187    js.create_or_update_key_value(KvConfig {
188        bucket: BUCKET_SCHEDULES.into(),
189        history: 5,
190        ..Default::default()
191    })
192    .await
193    .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES}"))?;
194    info!(bucket = BUCKET_SCHEDULES, "ready");
195
196    // jobs — v0.15 operator-registered Manifest catalog. Schedules
197    // reference rows here by id; editing a job rewrites what future
198    // schedule fires exec.
199    js.create_or_update_key_value(KvConfig {
200        bucket: BUCKET_JOBS.into(),
201        history: 5,
202        ..Default::default()
203    })
204    .await
205    .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS}"))?;
206    info!(bucket = BUCKET_JOBS, "ready");
207
208    // jobs_yaml / schedules_yaml — operator source-of-truth YAML
209    // alongside the JSON catalogs above. Same key shape (manifest id
210    // / schedule id), but the value is the raw YAML bytes so the
211    // SPA's YAML editor preserves comments + script block-scalar
212    // indentation across edits. Agents/scheduler don't read these.
213    js.create_or_update_key_value(KvConfig {
214        bucket: BUCKET_JOBS_YAML.into(),
215        history: 5,
216        ..Default::default()
217    })
218    .await
219    .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS_YAML}"))?;
220    info!(bucket = BUCKET_JOBS_YAML, "ready");
221
222    js.create_or_update_key_value(KvConfig {
223        bucket: BUCKET_SCHEDULES_YAML.into(),
224        history: 5,
225        ..Default::default()
226    })
227    .await
228    .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES_YAML}"))?;
229    info!(bucket = BUCKET_SCHEDULES_YAML, "ready");
230
231    // ── Object Store ─────────────────────────────────────────────
232    // agent_releases — one object per version, raw exe bytes.
233    js.create_object_store(ObjectStoreConfig {
234        bucket: OBJECT_AGENT_RELEASES.into(),
235        ..Default::default()
236    })
237    .await
238    .with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
239    info!(store = OBJECT_AGENT_RELEASES, "ready");
240
241    // app_packages — generic operator-uploaded binary distribution
242    // (kanade-client today; third-party installers like Webex /
243    // Teams once those flows land). Object keys are
244    // `<name>/<version>`; see `kanade-shared::kv::OBJECT_APP_PACKAGES`
245    // for the full rationale.
246    js.create_object_store(ObjectStoreConfig {
247        bucket: OBJECT_APP_PACKAGES.into(),
248        ..Default::default()
249    })
250    .await
251    .with_context(|| format!("create_object_store {OBJECT_APP_PACKAGES}"))?;
252    info!(store = OBJECT_APP_PACKAGES, "ready");
253
254    // scripts — manifest script bodies referenced by
255    // `Execute::script_object` (SPEC §2.4.1). Sibling of
256    // `app_packages`; see `kanade-shared::kv::OBJECT_SCRIPTS` for
257    // the bucket-split rationale (smaller payloads + manifest-
258    // coupled lifecycle vs operator-curated installers).
259    js.create_object_store(ObjectStoreConfig {
260        bucket: OBJECT_SCRIPTS.into(),
261        ..Default::default()
262    })
263    .await
264    .with_context(|| format!("create_object_store {OBJECT_SCRIPTS}"))?;
265    info!(store = OBJECT_SCRIPTS, "ready");
266
267    // result_output — overflow stdout / stderr blobs for the
268    // `ExecResult` wire kind (#227). Anything larger than the agent's
269    // 256 KB inline threshold gets uploaded here under
270    // `<request_id>/{stdout,stderr}`; the backend's results
271    // projector derefs the pointer fields before INSERT so SQLite
272    // + the SPA see the full text inline. 30-day max_age matches
273    // STREAM_RESULTS so the lifetimes stay in lockstep — a row still
274    // resolvable in execution_results never points at a missing
275    // blob.
276    js.create_object_store(ObjectStoreConfig {
277        bucket: OBJECT_RESULT_OUTPUT.into(),
278        max_age: Duration::from_secs(SECS_PER_DAY * 30),
279        ..Default::default()
280    })
281    .await
282    .with_context(|| format!("create_object_store {OBJECT_RESULT_OUTPUT}"))?;
283    info!(store = OBJECT_RESULT_OUTPUT, "ready");
284
285    Ok(())
286}