Skip to main content

kanade_shared/
bootstrap.rs

1//! Idempotent JetStream bootstrap (Sprint 6.x follow-up).
2//!
3//! Lists every NATS JetStream resource the kanade fleet expects —
4//! streams, KV buckets, Object Stores — and asks the broker to
5//! create-or-update them. v0.25.0 switched from `create_*` to
6//! `create_or_update_*`: the old form returned error 10058 ("name
7//! already in use with a different configuration") when a release
8//! widened a stream's subjects or changed its retention policy on
9//! a broker that still held the older config. With the new form the
10//! broker reconciles its definition to the one in this file, so
11//! version bumps no longer require operator-side data wipes.
12//!
13//! Centralising the list here means a future "we added a new
14//! bucket" change touches one place and both the operator CLI +
15//! the auto-bootstrap path pick it up.
16
17use std::time::Duration;
18
19use anyhow::{Context, Result};
20use async_nats::jetstream::{
21    self,
22    kv::Config as KvConfig,
23    object_store::Config as ObjectStoreConfig,
24    stream::{Config as StreamConfig, DiscardPolicy},
25};
26use tracing::info;
27
28use crate::kv::{
29    BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_FLEET_CONFIG,
30    BUCKET_JOBS, BUCKET_JOBS_YAML, BUCKET_SCHEDULES, BUCKET_SCHEDULES_YAML, BUCKET_SCRIPT_CURRENT,
31    BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, OBJECT_APP_PACKAGES, OBJECT_RESULT_OUTPUT,
32    OBJECT_SCRIPTS, STREAM_AUDIT, STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY, STREAM_OBS_EVENTS,
33    STREAM_RESULTS,
34};
35
36/// Idempotently create every NATS JetStream resource the kanade
37/// fleet relies on. Calling repeatedly is safe — `create_*` returns
38/// the existing resource if it's already configured.
39///
40/// Returns once every resource is in place. The function is async
41/// so backends can `await` it as part of their startup sequence
42/// (one round-trip per resource — ~10 RTTs total).
43pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
44    // ── Streams ──────────────────────────────────────────────────
45    // INVENTORY — 90-day rolling history (spec §2.3.1).
46    js.create_or_update_stream(StreamConfig {
47        name: STREAM_INVENTORY.into(),
48        subjects: vec!["inventory.>".into()],
49        max_age: Duration::from_secs(90 * 24 * 60 * 60),
50        ..Default::default()
51    })
52    .await
53    .with_context(|| format!("create_or_update_stream {STREAM_INVENTORY}"))?;
54    info!(stream = STREAM_INVENTORY, "ready");
55
56    // RESULTS — 30-day rolling history.
57    js.create_or_update_stream(StreamConfig {
58        name: STREAM_RESULTS.into(),
59        subjects: vec!["results.>".into()],
60        max_age: Duration::from_secs(30 * 24 * 60 * 60),
61        ..Default::default()
62    })
63    .await
64    .with_context(|| format!("create_or_update_stream {STREAM_RESULTS}"))?;
65    info!(stream = STREAM_RESULTS, "ready");
66
67    // EXEC — latest-per-subject only (spec §2.6 Layer 1). v0.22.1:
68    // catch the existing `commands.{all,group.X,pc.Y}` subjects so a
69    // single backend publish lands in BOTH the agent's live core
70    // subscription AND the stream's retention store. Reconnecting
71    // agents catch up via a durable consumer with
72    // `DeliverPolicy::LastPerSubject` — they receive the most
73    // recent Command per subject they care about, no matter how
74    // long they were offline (within `max_age`).
75    js.create_or_update_stream(StreamConfig {
76        name: STREAM_EXEC.into(),
77        subjects: vec!["commands.>".into()],
78        max_messages_per_subject: 1,
79        discard: DiscardPolicy::Old,
80        max_age: Duration::from_secs(7 * 24 * 60 * 60),
81        ..Default::default()
82    })
83    .await
84    .with_context(|| format!("create_or_update_stream {STREAM_EXEC}"))?;
85    info!(stream = STREAM_EXEC, "ready");
86
87    // EVENTS — short-lived broadcast bus for kill / revoke / etc.
88    // 7-day window matches the EXEC spec window.
89    js.create_or_update_stream(StreamConfig {
90        name: STREAM_EVENTS.into(),
91        subjects: vec!["events.>".into()],
92        max_age: Duration::from_secs(7 * 24 * 60 * 60),
93        ..Default::default()
94    })
95    .await
96    .with_context(|| format!("create_or_update_stream {STREAM_EVENTS}"))?;
97    info!(stream = STREAM_EVENTS, "ready");
98
99    // AUDIT — permanent record of operator actions (spec §2.3.1).
100    js.create_or_update_stream(StreamConfig {
101        name: STREAM_AUDIT.into(),
102        subjects: vec!["audit.>".into()],
103        ..Default::default()
104    })
105    .await
106    .with_context(|| format!("create_or_update_stream {STREAM_AUDIT}"))?;
107    info!(stream = STREAM_AUDIT, "ready");
108
109    // OBS_EVENTS — per-PC observability timeline (Issue #246). The
110    // 90-day window matches `obs_events` table retention so a
111    // backend bootstrapping after long downtime can catch up but
112    // doesn't carry data the table will discard anyway. Subject
113    // filter `obs.>` catches every PC without a per-PC subscription.
114    //
115    // Days-to-seconds is spelt out once instead of `90 * 24 * 60 *
116    // 60` open-coded across bootstrap + cleanup; the matching prune
117    // window in `kanade-backend::cleanup` quotes the same number
118    // separately (SQLite-relative string syntax there, not a
119    // duration), so it can't share a constant — but a single
120    // arithmetic spell-out here makes the relationship grep-able.
121    const SECS_PER_DAY: u64 = 24 * 60 * 60;
122    const OBS_EVENTS_RETENTION_DAYS: u64 = 90;
123    js.create_or_update_stream(StreamConfig {
124        name: STREAM_OBS_EVENTS.into(),
125        subjects: vec!["obs.>".into()],
126        max_age: Duration::from_secs(OBS_EVENTS_RETENTION_DAYS * SECS_PER_DAY),
127        ..Default::default()
128    })
129    .await
130    .with_context(|| format!("create_or_update_stream {STREAM_OBS_EVENTS}"))?;
131    info!(stream = STREAM_OBS_EVENTS, "ready");
132
133    // ── KV buckets ───────────────────────────────────────────────
134    // script_current — cmd_id → version (spec §2.6 Layer 2).
135    js.create_or_update_key_value(KvConfig {
136        bucket: BUCKET_SCRIPT_CURRENT.into(),
137        history: 5,
138        ..Default::default()
139    })
140    .await
141    .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_CURRENT}"))?;
142    info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
143
144    // script_status — cmd_id → ACTIVE / REVOKED.
145    js.create_or_update_key_value(KvConfig {
146        bucket: BUCKET_SCRIPT_STATUS.into(),
147        history: 5,
148        ..Default::default()
149    })
150    .await
151    .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_STATUS}"))?;
152    info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
153
154    // agents_state — pc_id → latest hw snapshot (history=1).
155    js.create_or_update_key_value(KvConfig {
156        bucket: BUCKET_AGENTS_STATE.into(),
157        history: 1,
158        ..Default::default()
159    })
160    .await
161    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENTS_STATE}"))?;
162    info!(bucket = BUCKET_AGENTS_STATE, "ready");
163
164    // agent_config — Sprint 6 layered scopes (global / groups.* /
165    // pcs.*) plus the legacy target_version key.
166    js.create_or_update_key_value(KvConfig {
167        bucket: BUCKET_AGENT_CONFIG.into(),
168        history: 5,
169        ..Default::default()
170    })
171    .await
172    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_CONFIG}"))?;
173    info!(bucket = BUCKET_AGENT_CONFIG, "ready");
174
175    // agent_groups — Sprint 5 per-pc group membership.
176    js.create_or_update_key_value(KvConfig {
177        bucket: BUCKET_AGENT_GROUPS.into(),
178        history: 5,
179        ..Default::default()
180    })
181    .await
182    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_GROUPS}"))?;
183    info!(bucket = BUCKET_AGENT_GROUPS, "ready");
184
185    // schedules — admin-API CRUD'd cron table (spec §2.5.3).
186    // Backend's scheduler.rs also creates this on startup; calling
187    // twice is harmless.
188    js.create_or_update_key_value(KvConfig {
189        bucket: BUCKET_SCHEDULES.into(),
190        history: 5,
191        ..Default::default()
192    })
193    .await
194    .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES}"))?;
195    info!(bucket = BUCKET_SCHEDULES, "ready");
196
197    // jobs — v0.15 operator-registered Manifest catalog. Schedules
198    // reference rows here by id; editing a job rewrites what future
199    // schedule fires exec.
200    js.create_or_update_key_value(KvConfig {
201        bucket: BUCKET_JOBS.into(),
202        history: 5,
203        ..Default::default()
204    })
205    .await
206    .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS}"))?;
207    info!(bucket = BUCKET_JOBS, "ready");
208
209    // fleet_config — #418 Phase 5 fleet-wide singletons (the global
210    // change-freeze under KEY_FREEZE). history: 1 — only the current
211    // state matters; both schedulers watch it.
212    js.create_or_update_key_value(KvConfig {
213        bucket: BUCKET_FLEET_CONFIG.into(),
214        history: 1,
215        ..Default::default()
216    })
217    .await
218    .with_context(|| format!("create_or_update_key_value {BUCKET_FLEET_CONFIG}"))?;
219    info!(bucket = BUCKET_FLEET_CONFIG, "ready");
220
221    // jobs_yaml / schedules_yaml — operator source-of-truth YAML
222    // alongside the JSON catalogs above. Same key shape (manifest id
223    // / schedule id), but the value is the raw YAML bytes so the
224    // SPA's YAML editor preserves comments + script block-scalar
225    // indentation across edits. Agents/scheduler don't read these.
226    js.create_or_update_key_value(KvConfig {
227        bucket: BUCKET_JOBS_YAML.into(),
228        history: 5,
229        ..Default::default()
230    })
231    .await
232    .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS_YAML}"))?;
233    info!(bucket = BUCKET_JOBS_YAML, "ready");
234
235    js.create_or_update_key_value(KvConfig {
236        bucket: BUCKET_SCHEDULES_YAML.into(),
237        history: 5,
238        ..Default::default()
239    })
240    .await
241    .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES_YAML}"))?;
242    info!(bucket = BUCKET_SCHEDULES_YAML, "ready");
243
244    // ── Object Store ─────────────────────────────────────────────
245    // agent_releases — one object per version, raw exe bytes.
246    js.create_object_store(ObjectStoreConfig {
247        bucket: OBJECT_AGENT_RELEASES.into(),
248        ..Default::default()
249    })
250    .await
251    .with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
252    info!(store = OBJECT_AGENT_RELEASES, "ready");
253
254    // app_packages — generic operator-uploaded binary distribution
255    // (kanade-client today; third-party installers like Webex /
256    // Teams once those flows land). Object keys are
257    // `<name>/<version>`; see `kanade-shared::kv::OBJECT_APP_PACKAGES`
258    // for the full rationale.
259    js.create_object_store(ObjectStoreConfig {
260        bucket: OBJECT_APP_PACKAGES.into(),
261        ..Default::default()
262    })
263    .await
264    .with_context(|| format!("create_object_store {OBJECT_APP_PACKAGES}"))?;
265    info!(store = OBJECT_APP_PACKAGES, "ready");
266
267    // scripts — manifest script bodies referenced by
268    // `Execute::script_object` (SPEC §2.4.1). Sibling of
269    // `app_packages`; see `kanade-shared::kv::OBJECT_SCRIPTS` for
270    // the bucket-split rationale (smaller payloads + manifest-
271    // coupled lifecycle vs operator-curated installers).
272    js.create_object_store(ObjectStoreConfig {
273        bucket: OBJECT_SCRIPTS.into(),
274        ..Default::default()
275    })
276    .await
277    .with_context(|| format!("create_object_store {OBJECT_SCRIPTS}"))?;
278    info!(store = OBJECT_SCRIPTS, "ready");
279
280    // result_output — overflow stdout / stderr blobs for the
281    // `ExecResult` wire kind (#227). Anything larger than the agent's
282    // 256 KB inline threshold gets uploaded here under
283    // `<request_id>/{stdout,stderr}`; the backend's results
284    // projector derefs the pointer fields before INSERT so SQLite
285    // + the SPA see the full text inline. 30-day max_age matches
286    // STREAM_RESULTS so the lifetimes stay in lockstep — a row still
287    // resolvable in execution_results never points at a missing
288    // blob.
289    js.create_object_store(ObjectStoreConfig {
290        bucket: OBJECT_RESULT_OUTPUT.into(),
291        max_age: Duration::from_secs(SECS_PER_DAY * 30),
292        ..Default::default()
293    })
294    .await
295    .with_context(|| format!("create_object_store {OBJECT_RESULT_OUTPUT}"))?;
296    info!(store = OBJECT_RESULT_OUTPUT, "ready");
297
298    Ok(())
299}