Skip to main content

kanade_shared/
bootstrap.rs

1//! Idempotent JetStream bootstrap (Sprint 6.x follow-up).
2//!
3//! Lists every NATS JetStream resource the kanade fleet expects —
4//! streams, KV buckets, Object Stores — and asks the broker to
5//! create-or-update them. v0.25.0 switched from `create_*` to
6//! `create_or_update_*`: the old form returned error 10058 ("name
7//! already in use with a different configuration") when a release
8//! widened a stream's subjects or changed its retention policy on
9//! a broker that still held the older config. With the new form the
10//! broker reconciles its definition to the one in this file, so
11//! version bumps no longer require operator-side data wipes.
12//!
13//! Centralising the list here means a future "we added a new
14//! bucket" change touches one place and both the operator CLI +
15//! the auto-bootstrap path pick it up.
16
17use std::time::Duration;
18
19use anyhow::{Context, Result};
20use async_nats::jetstream::{
21    self,
22    kv::Config as KvConfig,
23    object_store::Config as ObjectStoreConfig,
24    stream::{Config as StreamConfig, DiscardPolicy},
25};
26use tracing::info;
27
28use crate::kv::{
29    BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_JOBS, BUCKET_SCHEDULES,
30    BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, STREAM_AUDIT,
31    STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY, STREAM_RESULTS,
32};
33
34/// Idempotently create every NATS JetStream resource the kanade
35/// fleet relies on. Calling repeatedly is safe — `create_*` returns
36/// the existing resource if it's already configured.
37///
38/// Returns once every resource is in place. The function is async
39/// so backends can `await` it as part of their startup sequence
40/// (one round-trip per resource — ~10 RTTs total).
41pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
42    // ── Streams ──────────────────────────────────────────────────
43    // INVENTORY — 90-day rolling history (spec §2.3.1).
44    js.create_or_update_stream(StreamConfig {
45        name: STREAM_INVENTORY.into(),
46        subjects: vec!["inventory.>".into()],
47        max_age: Duration::from_secs(90 * 24 * 60 * 60),
48        ..Default::default()
49    })
50    .await
51    .with_context(|| format!("create_or_update_stream {STREAM_INVENTORY}"))?;
52    info!(stream = STREAM_INVENTORY, "ready");
53
54    // RESULTS — 30-day rolling history.
55    js.create_or_update_stream(StreamConfig {
56        name: STREAM_RESULTS.into(),
57        subjects: vec!["results.>".into()],
58        max_age: Duration::from_secs(30 * 24 * 60 * 60),
59        ..Default::default()
60    })
61    .await
62    .with_context(|| format!("create_or_update_stream {STREAM_RESULTS}"))?;
63    info!(stream = STREAM_RESULTS, "ready");
64
65    // EXEC — latest-per-subject only (spec §2.6 Layer 1). v0.22.1:
66    // catch the existing `commands.{all,group.X,pc.Y}` subjects so a
67    // single backend publish lands in BOTH the agent's live core
68    // subscription AND the stream's retention store. Reconnecting
69    // agents catch up via a durable consumer with
70    // `DeliverPolicy::LastPerSubject` — they receive the most
71    // recent Command per subject they care about, no matter how
72    // long they were offline (within `max_age`).
73    js.create_or_update_stream(StreamConfig {
74        name: STREAM_EXEC.into(),
75        subjects: vec!["commands.>".into()],
76        max_messages_per_subject: 1,
77        discard: DiscardPolicy::Old,
78        max_age: Duration::from_secs(7 * 24 * 60 * 60),
79        ..Default::default()
80    })
81    .await
82    .with_context(|| format!("create_or_update_stream {STREAM_EXEC}"))?;
83    info!(stream = STREAM_EXEC, "ready");
84
85    // EVENTS — short-lived broadcast bus for kill / revoke / etc.
86    // 7-day window matches the EXEC spec window.
87    js.create_or_update_stream(StreamConfig {
88        name: STREAM_EVENTS.into(),
89        subjects: vec!["events.>".into()],
90        max_age: Duration::from_secs(7 * 24 * 60 * 60),
91        ..Default::default()
92    })
93    .await
94    .with_context(|| format!("create_or_update_stream {STREAM_EVENTS}"))?;
95    info!(stream = STREAM_EVENTS, "ready");
96
97    // AUDIT — permanent record of operator actions (spec §2.3.1).
98    js.create_or_update_stream(StreamConfig {
99        name: STREAM_AUDIT.into(),
100        subjects: vec!["audit.>".into()],
101        ..Default::default()
102    })
103    .await
104    .with_context(|| format!("create_or_update_stream {STREAM_AUDIT}"))?;
105    info!(stream = STREAM_AUDIT, "ready");
106
107    // ── KV buckets ───────────────────────────────────────────────
108    // script_current — cmd_id → version (spec §2.6 Layer 2).
109    js.create_or_update_key_value(KvConfig {
110        bucket: BUCKET_SCRIPT_CURRENT.into(),
111        history: 5,
112        ..Default::default()
113    })
114    .await
115    .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_CURRENT}"))?;
116    info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
117
118    // script_status — cmd_id → ACTIVE / REVOKED.
119    js.create_or_update_key_value(KvConfig {
120        bucket: BUCKET_SCRIPT_STATUS.into(),
121        history: 5,
122        ..Default::default()
123    })
124    .await
125    .with_context(|| format!("create_or_update_key_value {BUCKET_SCRIPT_STATUS}"))?;
126    info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
127
128    // agents_state — pc_id → latest hw snapshot (history=1).
129    js.create_or_update_key_value(KvConfig {
130        bucket: BUCKET_AGENTS_STATE.into(),
131        history: 1,
132        ..Default::default()
133    })
134    .await
135    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENTS_STATE}"))?;
136    info!(bucket = BUCKET_AGENTS_STATE, "ready");
137
138    // agent_config — Sprint 6 layered scopes (global / groups.* /
139    // pcs.*) plus the legacy target_version key.
140    js.create_or_update_key_value(KvConfig {
141        bucket: BUCKET_AGENT_CONFIG.into(),
142        history: 5,
143        ..Default::default()
144    })
145    .await
146    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_CONFIG}"))?;
147    info!(bucket = BUCKET_AGENT_CONFIG, "ready");
148
149    // agent_groups — Sprint 5 per-pc group membership.
150    js.create_or_update_key_value(KvConfig {
151        bucket: BUCKET_AGENT_GROUPS.into(),
152        history: 5,
153        ..Default::default()
154    })
155    .await
156    .with_context(|| format!("create_or_update_key_value {BUCKET_AGENT_GROUPS}"))?;
157    info!(bucket = BUCKET_AGENT_GROUPS, "ready");
158
159    // schedules — admin-API CRUD'd cron table (spec §2.5.3).
160    // Backend's scheduler.rs also creates this on startup; calling
161    // twice is harmless.
162    js.create_or_update_key_value(KvConfig {
163        bucket: BUCKET_SCHEDULES.into(),
164        history: 5,
165        ..Default::default()
166    })
167    .await
168    .with_context(|| format!("create_or_update_key_value {BUCKET_SCHEDULES}"))?;
169    info!(bucket = BUCKET_SCHEDULES, "ready");
170
171    // jobs — v0.15 operator-registered Manifest catalog. Schedules
172    // reference rows here by id; editing a job rewrites what future
173    // schedule fires exec.
174    js.create_or_update_key_value(KvConfig {
175        bucket: BUCKET_JOBS.into(),
176        history: 5,
177        ..Default::default()
178    })
179    .await
180    .with_context(|| format!("create_or_update_key_value {BUCKET_JOBS}"))?;
181    info!(bucket = BUCKET_JOBS, "ready");
182
183    // ── Object Store ─────────────────────────────────────────────
184    // agent_releases — one object per version, raw exe bytes.
185    js.create_object_store(ObjectStoreConfig {
186        bucket: OBJECT_AGENT_RELEASES.into(),
187        ..Default::default()
188    })
189    .await
190    .with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
191    info!(store = OBJECT_AGENT_RELEASES, "ready");
192
193    Ok(())
194}