Skip to main content

kanade_shared/
bootstrap.rs

1//! Idempotent JetStream bootstrap (Sprint 6.x follow-up).
2//!
3//! Lists every NATS JetStream resource the kanade fleet expects —
4//! streams, KV buckets, Object Stores — and asks the broker to
5//! create them. `create_*` is a no-op when the resource already
6//! exists, so this is safe to call from every backend startup and
7//! from the operator-side `kanade jetstream setup` CLI.
8//!
9//! Centralising the list here means a future "we added a new
10//! bucket" change touches one place and both the operator CLI +
11//! the auto-bootstrap path pick it up.
12
13use std::time::Duration;
14
15use anyhow::{Context, Result};
16use async_nats::jetstream::{
17    self,
18    kv::Config as KvConfig,
19    object_store::Config as ObjectStoreConfig,
20    stream::{Config as StreamConfig, DiscardPolicy},
21};
22use tracing::info;
23
24use crate::kv::{
25    BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_JOBS, BUCKET_SCHEDULES,
26    BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, STREAM_AUDIT,
27    STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY, STREAM_RESULTS,
28};
29
30/// Idempotently create every NATS JetStream resource the kanade
31/// fleet relies on. Calling repeatedly is safe — `create_*` returns
32/// the existing resource if it's already configured.
33///
34/// Returns once every resource is in place. The function is async
35/// so backends can `await` it as part of their startup sequence
36/// (one round-trip per resource — ~10 RTTs total).
37pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
38    // ── Streams ──────────────────────────────────────────────────
39    // INVENTORY — 90-day rolling history (spec §2.3.1).
40    js.create_stream(StreamConfig {
41        name: STREAM_INVENTORY.into(),
42        subjects: vec!["inventory.>".into()],
43        max_age: Duration::from_secs(90 * 24 * 60 * 60),
44        ..Default::default()
45    })
46    .await
47    .with_context(|| format!("create_stream {STREAM_INVENTORY}"))?;
48    info!(stream = STREAM_INVENTORY, "ready");
49
50    // RESULTS — 30-day rolling history.
51    js.create_stream(StreamConfig {
52        name: STREAM_RESULTS.into(),
53        subjects: vec!["results.>".into()],
54        max_age: Duration::from_secs(30 * 24 * 60 * 60),
55        ..Default::default()
56    })
57    .await
58    .with_context(|| format!("create_stream {STREAM_RESULTS}"))?;
59    info!(stream = STREAM_RESULTS, "ready");
60
61    // EXEC — latest-per-subject only (spec §2.6 Layer 1). Carries the
62    // `commands.exec.<job_id>` fan-out from `kanade exec` /
63    // scheduler fires.
64    js.create_stream(StreamConfig {
65        name: STREAM_EXEC.into(),
66        subjects: vec!["commands.exec.>".into()],
67        max_messages_per_subject: 1,
68        discard: DiscardPolicy::Old,
69        max_age: Duration::from_secs(7 * 24 * 60 * 60),
70        ..Default::default()
71    })
72    .await
73    .with_context(|| format!("create_stream {STREAM_EXEC}"))?;
74    info!(stream = STREAM_EXEC, "ready");
75
76    // EVENTS — short-lived broadcast bus for kill / revoke / etc.
77    // 7-day window matches the EXEC spec window.
78    js.create_stream(StreamConfig {
79        name: STREAM_EVENTS.into(),
80        subjects: vec!["events.>".into()],
81        max_age: Duration::from_secs(7 * 24 * 60 * 60),
82        ..Default::default()
83    })
84    .await
85    .with_context(|| format!("create_stream {STREAM_EVENTS}"))?;
86    info!(stream = STREAM_EVENTS, "ready");
87
88    // AUDIT — permanent record of operator actions (spec §2.3.1).
89    js.create_stream(StreamConfig {
90        name: STREAM_AUDIT.into(),
91        subjects: vec!["audit.>".into()],
92        ..Default::default()
93    })
94    .await
95    .with_context(|| format!("create_stream {STREAM_AUDIT}"))?;
96    info!(stream = STREAM_AUDIT, "ready");
97
98    // ── KV buckets ───────────────────────────────────────────────
99    // script_current — cmd_id → version (spec §2.6 Layer 2).
100    js.create_key_value(KvConfig {
101        bucket: BUCKET_SCRIPT_CURRENT.into(),
102        history: 5,
103        ..Default::default()
104    })
105    .await
106    .with_context(|| format!("create_key_value {BUCKET_SCRIPT_CURRENT}"))?;
107    info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
108
109    // script_status — cmd_id → ACTIVE / REVOKED.
110    js.create_key_value(KvConfig {
111        bucket: BUCKET_SCRIPT_STATUS.into(),
112        history: 5,
113        ..Default::default()
114    })
115    .await
116    .with_context(|| format!("create_key_value {BUCKET_SCRIPT_STATUS}"))?;
117    info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
118
119    // agents_state — pc_id → latest hw snapshot (history=1).
120    js.create_key_value(KvConfig {
121        bucket: BUCKET_AGENTS_STATE.into(),
122        history: 1,
123        ..Default::default()
124    })
125    .await
126    .with_context(|| format!("create_key_value {BUCKET_AGENTS_STATE}"))?;
127    info!(bucket = BUCKET_AGENTS_STATE, "ready");
128
129    // agent_config — Sprint 6 layered scopes (global / groups.* /
130    // pcs.*) plus the legacy target_version key.
131    js.create_key_value(KvConfig {
132        bucket: BUCKET_AGENT_CONFIG.into(),
133        history: 5,
134        ..Default::default()
135    })
136    .await
137    .with_context(|| format!("create_key_value {BUCKET_AGENT_CONFIG}"))?;
138    info!(bucket = BUCKET_AGENT_CONFIG, "ready");
139
140    // agent_groups — Sprint 5 per-pc group membership.
141    js.create_key_value(KvConfig {
142        bucket: BUCKET_AGENT_GROUPS.into(),
143        history: 5,
144        ..Default::default()
145    })
146    .await
147    .with_context(|| format!("create_key_value {BUCKET_AGENT_GROUPS}"))?;
148    info!(bucket = BUCKET_AGENT_GROUPS, "ready");
149
150    // schedules — admin-API CRUD'd cron table (spec §2.5.3).
151    // Backend's scheduler.rs also creates this on startup; calling
152    // twice is harmless.
153    js.create_key_value(KvConfig {
154        bucket: BUCKET_SCHEDULES.into(),
155        history: 5,
156        ..Default::default()
157    })
158    .await
159    .with_context(|| format!("create_key_value {BUCKET_SCHEDULES}"))?;
160    info!(bucket = BUCKET_SCHEDULES, "ready");
161
162    // jobs — v0.15 operator-registered Manifest catalog. Schedules
163    // reference rows here by id; editing a job rewrites what future
164    // schedule fires exec.
165    js.create_key_value(KvConfig {
166        bucket: BUCKET_JOBS.into(),
167        history: 5,
168        ..Default::default()
169    })
170    .await
171    .with_context(|| format!("create_key_value {BUCKET_JOBS}"))?;
172    info!(bucket = BUCKET_JOBS, "ready");
173
174    // ── Object Store ─────────────────────────────────────────────
175    // agent_releases — one object per version, raw exe bytes.
176    js.create_object_store(ObjectStoreConfig {
177        bucket: OBJECT_AGENT_RELEASES.into(),
178        ..Default::default()
179    })
180    .await
181    .with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
182    info!(store = OBJECT_AGENT_RELEASES, "ready");
183
184    Ok(())
185}