Skip to main content

kanade_shared/
bootstrap.rs

1//! Idempotent JetStream bootstrap (Sprint 6.x follow-up).
2//!
3//! Lists every NATS JetStream resource the kanade fleet expects —
4//! streams, KV buckets, Object Stores — and asks the broker to
5//! create them. `create_*` is a no-op when the resource already
6//! exists, so this is safe to call from every backend startup and
7//! from the operator-side `kanade jetstream setup` CLI.
8//!
9//! Centralising the list here means a future "we added a new
10//! bucket" change touches one place and both the operator CLI +
11//! the auto-bootstrap path pick it up.
12
13use std::time::Duration;
14
15use anyhow::{Context, Result};
16use async_nats::jetstream::{
17    self,
18    kv::Config as KvConfig,
19    object_store::Config as ObjectStoreConfig,
20    stream::{Config as StreamConfig, DiscardPolicy},
21};
22use tracing::info;
23
24use crate::kv::{
25    BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_JOBS, BUCKET_SCHEDULES,
26    BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, STREAM_AUDIT,
27    STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY, STREAM_RESULTS,
28};
29
30/// Idempotently create every NATS JetStream resource the kanade
31/// fleet relies on. Calling repeatedly is safe — `create_*` returns
32/// the existing resource if it's already configured.
33///
34/// Returns once every resource is in place. The function is async
35/// so backends can `await` it as part of their startup sequence
36/// (one round-trip per resource — ~10 RTTs total).
37pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
38    // ── Streams ──────────────────────────────────────────────────
39    // INVENTORY — 90-day rolling history (spec §2.3.1).
40    js.create_stream(StreamConfig {
41        name: STREAM_INVENTORY.into(),
42        subjects: vec!["inventory.>".into()],
43        max_age: Duration::from_secs(90 * 24 * 60 * 60),
44        ..Default::default()
45    })
46    .await
47    .with_context(|| format!("create_stream {STREAM_INVENTORY}"))?;
48    info!(stream = STREAM_INVENTORY, "ready");
49
50    // RESULTS — 30-day rolling history.
51    js.create_stream(StreamConfig {
52        name: STREAM_RESULTS.into(),
53        subjects: vec!["results.>".into()],
54        max_age: Duration::from_secs(30 * 24 * 60 * 60),
55        ..Default::default()
56    })
57    .await
58    .with_context(|| format!("create_stream {STREAM_RESULTS}"))?;
59    info!(stream = STREAM_RESULTS, "ready");
60
61    // EXEC — latest-per-subject only (spec §2.6 Layer 1). v0.22.1:
62    // catch the existing `commands.{all,group.X,pc.Y}` subjects so a
63    // single backend publish lands in BOTH the agent's live core
64    // subscription AND the stream's retention store. Reconnecting
65    // agents catch up via a durable consumer with
66    // `DeliverPolicy::LastPerSubject` — they receive the most
67    // recent Command per subject they care about, no matter how
68    // long they were offline (within `max_age`).
69    js.create_stream(StreamConfig {
70        name: STREAM_EXEC.into(),
71        subjects: vec!["commands.>".into()],
72        max_messages_per_subject: 1,
73        discard: DiscardPolicy::Old,
74        max_age: Duration::from_secs(7 * 24 * 60 * 60),
75        ..Default::default()
76    })
77    .await
78    .with_context(|| format!("create_stream {STREAM_EXEC}"))?;
79    info!(stream = STREAM_EXEC, "ready");
80
81    // EVENTS — short-lived broadcast bus for kill / revoke / etc.
82    // 7-day window matches the EXEC spec window.
83    js.create_stream(StreamConfig {
84        name: STREAM_EVENTS.into(),
85        subjects: vec!["events.>".into()],
86        max_age: Duration::from_secs(7 * 24 * 60 * 60),
87        ..Default::default()
88    })
89    .await
90    .with_context(|| format!("create_stream {STREAM_EVENTS}"))?;
91    info!(stream = STREAM_EVENTS, "ready");
92
93    // AUDIT — permanent record of operator actions (spec §2.3.1).
94    js.create_stream(StreamConfig {
95        name: STREAM_AUDIT.into(),
96        subjects: vec!["audit.>".into()],
97        ..Default::default()
98    })
99    .await
100    .with_context(|| format!("create_stream {STREAM_AUDIT}"))?;
101    info!(stream = STREAM_AUDIT, "ready");
102
103    // ── KV buckets ───────────────────────────────────────────────
104    // script_current — cmd_id → version (spec §2.6 Layer 2).
105    js.create_key_value(KvConfig {
106        bucket: BUCKET_SCRIPT_CURRENT.into(),
107        history: 5,
108        ..Default::default()
109    })
110    .await
111    .with_context(|| format!("create_key_value {BUCKET_SCRIPT_CURRENT}"))?;
112    info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
113
114    // script_status — cmd_id → ACTIVE / REVOKED.
115    js.create_key_value(KvConfig {
116        bucket: BUCKET_SCRIPT_STATUS.into(),
117        history: 5,
118        ..Default::default()
119    })
120    .await
121    .with_context(|| format!("create_key_value {BUCKET_SCRIPT_STATUS}"))?;
122    info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
123
124    // agents_state — pc_id → latest hw snapshot (history=1).
125    js.create_key_value(KvConfig {
126        bucket: BUCKET_AGENTS_STATE.into(),
127        history: 1,
128        ..Default::default()
129    })
130    .await
131    .with_context(|| format!("create_key_value {BUCKET_AGENTS_STATE}"))?;
132    info!(bucket = BUCKET_AGENTS_STATE, "ready");
133
134    // agent_config — Sprint 6 layered scopes (global / groups.* /
135    // pcs.*) plus the legacy target_version key.
136    js.create_key_value(KvConfig {
137        bucket: BUCKET_AGENT_CONFIG.into(),
138        history: 5,
139        ..Default::default()
140    })
141    .await
142    .with_context(|| format!("create_key_value {BUCKET_AGENT_CONFIG}"))?;
143    info!(bucket = BUCKET_AGENT_CONFIG, "ready");
144
145    // agent_groups — Sprint 5 per-pc group membership.
146    js.create_key_value(KvConfig {
147        bucket: BUCKET_AGENT_GROUPS.into(),
148        history: 5,
149        ..Default::default()
150    })
151    .await
152    .with_context(|| format!("create_key_value {BUCKET_AGENT_GROUPS}"))?;
153    info!(bucket = BUCKET_AGENT_GROUPS, "ready");
154
155    // schedules — admin-API CRUD'd cron table (spec §2.5.3).
156    // Backend's scheduler.rs also creates this on startup; calling
157    // twice is harmless.
158    js.create_key_value(KvConfig {
159        bucket: BUCKET_SCHEDULES.into(),
160        history: 5,
161        ..Default::default()
162    })
163    .await
164    .with_context(|| format!("create_key_value {BUCKET_SCHEDULES}"))?;
165    info!(bucket = BUCKET_SCHEDULES, "ready");
166
167    // jobs — v0.15 operator-registered Manifest catalog. Schedules
168    // reference rows here by id; editing a job rewrites what future
169    // schedule fires exec.
170    js.create_key_value(KvConfig {
171        bucket: BUCKET_JOBS.into(),
172        history: 5,
173        ..Default::default()
174    })
175    .await
176    .with_context(|| format!("create_key_value {BUCKET_JOBS}"))?;
177    info!(bucket = BUCKET_JOBS, "ready");
178
179    // ── Object Store ─────────────────────────────────────────────
180    // agent_releases — one object per version, raw exe bytes.
181    js.create_object_store(ObjectStoreConfig {
182        bucket: OBJECT_AGENT_RELEASES.into(),
183        ..Default::default()
184    })
185    .await
186    .with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
187    info!(store = OBJECT_AGENT_RELEASES, "ready");
188
189    Ok(())
190}