Skip to main content

kanade_shared/
bootstrap.rs

1//! Idempotent JetStream bootstrap (Sprint 6.x follow-up).
2//!
3//! Lists every NATS JetStream resource the kanade fleet expects —
4//! streams, KV buckets, Object Stores — and asks the broker to
5//! create-or-update them. v0.25.0 switched from `create_*` to
6//! `create_or_update_*`: the old form returned error 10058 ("name
7//! already in use with a different configuration") when a release
8//! widened a stream's subjects or changed its retention policy on
9//! a broker that still held the older config. With the new form the
10//! broker reconciles its definition to the one in this file, so
11//! version bumps no longer require operator-side data wipes.
12//!
13//! Centralising the list here means a future "we added a new
14//! bucket" change touches one place and both the operator CLI +
15//! the auto-bootstrap path pick it up.
16
17use std::time::Duration;
18
19use anyhow::{Context, Result};
20use async_nats::jetstream::{
21    self,
22    kv::Config as KvConfig,
23    object_store::Config as ObjectStoreConfig,
24    stream::{Config as StreamConfig, DiscardPolicy},
25};
26use tracing::info;
27
28use crate::kv::{
29    BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_JOBS, BUCKET_JOBS_YAML,
30    BUCKET_SCHEDULES, BUCKET_SCHEDULES_YAML, BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS,
31    OBJECT_AGENT_RELEASES, STREAM_AUDIT, STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY,
32    STREAM_RESULTS,
33};
34
35/// Idempotently create every NATS JetStream resource the kanade
36/// fleet relies on. Calling repeatedly is safe — `create_*` returns
37/// the existing resource if it's already configured.
38///
39/// Returns once every resource is in place. The function is async
40/// so backends can `await` it as part of their startup sequence
41/// (one round-trip per resource — ~10 RTTs total).
42pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
43    // ── Streams ──────────────────────────────────────────────────
44    // INVENTORY — 90-day rolling history (spec §2.3.1).
45    js.create_or_update_stream(StreamConfig {
46        name: STREAM_INVENTORY.into(),
47        subjects: vec!["inventory.>".into()],
48        max_age: Duration::from_secs(90 * 24 * 60 * 60),
49        ..Default::default()
50    })
51    .await
52    .with_context(|| format!("create_or_update_stream {STREAM_INVENTORY}"))?;
53    info!(stream = STREAM_INVENTORY, "ready");
54
55    // RESULTS — 30-day rolling history.
56    js.create_or_update_stream(StreamConfig {
57        name: STREAM_RESULTS.into(),
58        subjects: vec!["results.>".into()],
59        max_age: Duration::from_secs(30 * 24 * 60 * 60),
60        ..Default::default()
61    })
62    .await
63    .with_context(|| format!("create_or_update_stream {STREAM_RESULTS}"))?;
64    info!(stream = STREAM_RESULTS, "ready");
65
66    // EXEC — latest-per-subject only (spec §2.6 Layer 1). v0.22.1:
67    // catch the existing `commands.{all,group.X,pc.Y}` subjects so a
68    // single backend publish lands in BOTH the agent's live core
69    // subscription AND the stream's retention store. Reconnecting
70    // agents catch up via a durable consumer with
71    // `DeliverPolicy::LastPerSubject` — they receive the most
72    // recent Command per subject they care about, no matter how
73    // long they were offline (within `max_age`).
74    js.create_or_update_stream(StreamConfig {
75        name: STREAM_EXEC.into(),
76        subjects: vec!["commands.>".into()],
77        max_messages_per_subject: 1,
78        discard: DiscardPolicy::Old,
79        max_age: Duration::from_secs(7 * 24 * 60 * 60),
80        ..Default::default()
81    })
82    .await
83    .with_context(|| format!("create_or_update_stream {STREAM_EXEC}"))?;
84    info!(stream = STREAM_EXEC, "ready");
85
86    // EVENTS — short-lived broadcast bus for kill / revoke / etc.
87    // 7-day window matches the EXEC spec window.
88    js.create_or_update_stream(StreamConfig {
89        name: STREAM_EVENTS.into(),
90        subjects: vec!["events.>".into()],
91        max_age: Duration::from_secs(7 * 24 * 60 * 60),
92        ..Default::default()
93    })
94    .await
95    .with_context(|| format!("create_or_update_stream {STREAM_EVENTS}"))?;
96    info!(stream = STREAM_EVENTS, "ready");
97
98    // AUDIT — permanent record of operator actions (spec §2.3.1).
99    js.create_or_update_stream(StreamConfig {
100        name: STREAM_AUDIT.into(),
101        subjects: vec!["audit.>".into()],
102        ..Default::default()
103    })
104    .await
105    .with_context(|| format!("create_or_update_stream {STREAM_AUDIT}"))?;
106    info!(stream = STREAM_AUDIT, "ready");
107
108    // ── KV buckets ───────────────────────────────────────────────
109    // script_current — cmd_id → version (spec §2.6 Layer 2).
110    js.create_or_update_key_value(KvConfig {
111        bucket: BUCKET_SCRIPT_CURRENT.into(),
112        history: 5,
113        ..Default::default()
114    })
115    .await
116    .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_CURRENT}"))?;
117    info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
118
119    // script_status — cmd_id → ACTIVE / REVOKED.
120    js.create_or_update_key_value(KvConfig {
121        bucket: BUCKET_SCRIPT_STATUS.into(),
122        history: 5,
123        ..Default::default()
124    })
125    .await
126    .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_STATUS}"))?;
127    info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
128
129    // agents_state — pc_id → latest hw snapshot (history=1).
130    js.create_or_update_key_value(KvConfig {
131        bucket: BUCKET_AGENTS_STATE.into(),
132        history: 1,
133        ..Default::default()
134    })
135    .await
136    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENTS_STATE}"))?;
137    info!(bucket = BUCKET_AGENTS_STATE, "ready");
138
139    // agent_config — Sprint 6 layered scopes (global / groups.* /
140    // pcs.*) plus the legacy target_version key.
141    js.create_or_update_key_value(KvConfig {
142        bucket: BUCKET_AGENT_CONFIG.into(),
143        history: 5,
144        ..Default::default()
145    })
146    .await
147    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_CONFIG}"))?;
148    info!(bucket = BUCKET_AGENT_CONFIG, "ready");
149
150    // agent_groups — Sprint 5 per-pc group membership.
151    js.create_or_update_key_value(KvConfig {
152        bucket: BUCKET_AGENT_GROUPS.into(),
153        history: 5,
154        ..Default::default()
155    })
156    .await
157    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_GROUPS}"))?;
158    info!(bucket = BUCKET_AGENT_GROUPS, "ready");
159
160    // schedules — admin-API CRUD'd cron table (spec §2.5.3).
161    // Backend's scheduler.rs also creates this on startup; calling
162    // twice is harmless.
163    js.create_or_update_key_value(KvConfig {
164        bucket: BUCKET_SCHEDULES.into(),
165        history: 5,
166        ..Default::default()
167    })
168    .await
169    .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES}"))?;
170    info!(bucket = BUCKET_SCHEDULES, "ready");
171
172    // jobs — v0.15 operator-registered Manifest catalog. Schedules
173    // reference rows here by id; editing a job rewrites what future
174    // schedule fires exec.
175    js.create_or_update_key_value(KvConfig {
176        bucket: BUCKET_JOBS.into(),
177        history: 5,
178        ..Default::default()
179    })
180    .await
181    .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS}"))?;
182    info!(bucket = BUCKET_JOBS, "ready");
183
184    // jobs_yaml / schedules_yaml — operator source-of-truth YAML
185    // alongside the JSON catalogs above. Same key shape (manifest id
186    // / schedule id), but the value is the raw YAML bytes so the
187    // SPA's YAML editor preserves comments + script block-scalar
188    // indentation across edits. Agents/scheduler don't read these.
189    js.create_or_update_key_value(KvConfig {
190        bucket: BUCKET_JOBS_YAML.into(),
191        history: 5,
192        ..Default::default()
193    })
194    .await
195    .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS_YAML}"))?;
196    info!(bucket = BUCKET_JOBS_YAML, "ready");
197
198    js.create_or_update_key_value(KvConfig {
199        bucket: BUCKET_SCHEDULES_YAML.into(),
200        history: 5,
201        ..Default::default()
202    })
203    .await
204    .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES_YAML}"))?;
205    info!(bucket = BUCKET_SCHEDULES_YAML, "ready");
206
207    // ── Object Store ─────────────────────────────────────────────
208    // agent_releases — one object per version, raw exe bytes.
209    js.create_object_store(ObjectStoreConfig {
210        bucket: OBJECT_AGENT_RELEASES.into(),
211        ..Default::default()
212    })
213    .await
214    .with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
215    info!(store = OBJECT_AGENT_RELEASES, "ready");
216
217    Ok(())
218}