Skip to main content

kanade_shared/
bootstrap.rs

1//! Idempotent JetStream bootstrap (Sprint 6.x follow-up).
2//!
3//! Lists every NATS JetStream resource the kanade fleet expects —
4//! streams, KV buckets, Object Stores — and asks the broker to
5//! create them. `create_*` is a no-op when the resource already
6//! exists, so this is safe to call from every backend startup and
7//! from the operator-side `kanade jetstream setup` CLI.
8//!
9//! Centralising the list here means a future "we added a new
10//! bucket" change touches one place and both the operator CLI +
11//! the auto-bootstrap path pick it up.
12
13use std::time::Duration;
14
15use anyhow::{Context, Result};
16use async_nats::jetstream::{
17    self,
18    kv::Config as KvConfig,
19    object_store::Config as ObjectStoreConfig,
20    stream::{Config as StreamConfig, DiscardPolicy},
21};
22use tracing::info;
23
24use crate::kv::{
25    BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_SCHEDULES,
26    BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, STREAM_AUDIT,
27    STREAM_DEPLOY, STREAM_EVENTS, STREAM_INVENTORY, STREAM_RESULTS,
28};
29
30/// Idempotently create every NATS JetStream resource the kanade
31/// fleet relies on. Calling repeatedly is safe — `create_*` returns
32/// the existing resource if it's already configured.
33///
34/// Returns once every resource is in place. The function is async
35/// so backends can `await` it as part of their startup sequence
36/// (one round-trip per resource — ~10 RTTs total).
37pub async fn ensure_jetstream_resources(js: &jetstream::Context) -> Result<()> {
38    // ── Streams ──────────────────────────────────────────────────
39    // INVENTORY — 90-day rolling history (spec §2.3.1).
40    js.create_stream(StreamConfig {
41        name: STREAM_INVENTORY.into(),
42        subjects: vec!["inventory.>".into()],
43        max_age: Duration::from_secs(90 * 24 * 60 * 60),
44        ..Default::default()
45    })
46    .await
47    .with_context(|| format!("create_stream {STREAM_INVENTORY}"))?;
48    info!(stream = STREAM_INVENTORY, "ready");
49
50    // RESULTS — 30-day rolling history.
51    js.create_stream(StreamConfig {
52        name: STREAM_RESULTS.into(),
53        subjects: vec!["results.>".into()],
54        max_age: Duration::from_secs(30 * 24 * 60 * 60),
55        ..Default::default()
56    })
57    .await
58    .with_context(|| format!("create_stream {STREAM_RESULTS}"))?;
59    info!(stream = STREAM_RESULTS, "ready");
60
61    // DEPLOY — latest-per-subject only (spec §2.6 Layer 1).
62    js.create_stream(StreamConfig {
63        name: STREAM_DEPLOY.into(),
64        subjects: vec!["commands.deploy.>".into()],
65        max_messages_per_subject: 1,
66        discard: DiscardPolicy::Old,
67        max_age: Duration::from_secs(7 * 24 * 60 * 60),
68        ..Default::default()
69    })
70    .await
71    .with_context(|| format!("create_stream {STREAM_DEPLOY}"))?;
72    info!(stream = STREAM_DEPLOY, "ready");
73
74    // EVENTS — short-lived broadcast bus for kill / revoke / etc.
75    // 7-day window matches the deploy spec window.
76    js.create_stream(StreamConfig {
77        name: STREAM_EVENTS.into(),
78        subjects: vec!["events.>".into()],
79        max_age: Duration::from_secs(7 * 24 * 60 * 60),
80        ..Default::default()
81    })
82    .await
83    .with_context(|| format!("create_stream {STREAM_EVENTS}"))?;
84    info!(stream = STREAM_EVENTS, "ready");
85
86    // AUDIT — permanent record of operator actions (spec §2.3.1).
87    js.create_stream(StreamConfig {
88        name: STREAM_AUDIT.into(),
89        subjects: vec!["audit.>".into()],
90        ..Default::default()
91    })
92    .await
93    .with_context(|| format!("create_stream {STREAM_AUDIT}"))?;
94    info!(stream = STREAM_AUDIT, "ready");
95
96    // ── KV buckets ───────────────────────────────────────────────
97    // script_current — cmd_id → version (spec §2.6 Layer 2).
98    js.create_key_value(KvConfig {
99        bucket: BUCKET_SCRIPT_CURRENT.into(),
100        history: 5,
101        ..Default::default()
102    })
103    .await
104    .with_context(|| format!("create_key_value {BUCKET_SCRIPT_CURRENT}"))?;
105    info!(bucket = BUCKET_SCRIPT_CURRENT, "ready");
106
107    // script_status — cmd_id → ACTIVE / REVOKED.
108    js.create_key_value(KvConfig {
109        bucket: BUCKET_SCRIPT_STATUS.into(),
110        history: 5,
111        ..Default::default()
112    })
113    .await
114    .with_context(|| format!("create_key_value {BUCKET_SCRIPT_STATUS}"))?;
115    info!(bucket = BUCKET_SCRIPT_STATUS, "ready");
116
117    // agents_state — pc_id → latest hw snapshot (history=1).
118    js.create_key_value(KvConfig {
119        bucket: BUCKET_AGENTS_STATE.into(),
120        history: 1,
121        ..Default::default()
122    })
123    .await
124    .with_context(|| format!("create_key_value {BUCKET_AGENTS_STATE}"))?;
125    info!(bucket = BUCKET_AGENTS_STATE, "ready");
126
127    // agent_config — Sprint 6 layered scopes (global / groups.* /
128    // pcs.*) plus the legacy target_version key.
129    js.create_key_value(KvConfig {
130        bucket: BUCKET_AGENT_CONFIG.into(),
131        history: 5,
132        ..Default::default()
133    })
134    .await
135    .with_context(|| format!("create_key_value {BUCKET_AGENT_CONFIG}"))?;
136    info!(bucket = BUCKET_AGENT_CONFIG, "ready");
137
138    // agent_groups — Sprint 5 per-pc group membership.
139    js.create_key_value(KvConfig {
140        bucket: BUCKET_AGENT_GROUPS.into(),
141        history: 5,
142        ..Default::default()
143    })
144    .await
145    .with_context(|| format!("create_key_value {BUCKET_AGENT_GROUPS}"))?;
146    info!(bucket = BUCKET_AGENT_GROUPS, "ready");
147
148    // schedules — admin-API CRUD'd cron table (spec §2.5.3).
149    // Backend's scheduler.rs also creates this on startup; calling
150    // twice is harmless.
151    js.create_key_value(KvConfig {
152        bucket: BUCKET_SCHEDULES.into(),
153        history: 5,
154        ..Default::default()
155    })
156    .await
157    .with_context(|| format!("create_key_value {BUCKET_SCHEDULES}"))?;
158    info!(bucket = BUCKET_SCHEDULES, "ready");
159
160    // ── Object Store ─────────────────────────────────────────────
161    // agent_releases — one object per version, raw exe bytes.
162    js.create_object_store(ObjectStoreConfig {
163        bucket: OBJECT_AGENT_RELEASES.into(),
164        ..Default::default()
165    })
166    .await
167    .with_context(|| format!("create_object_store {OBJECT_AGENT_RELEASES}"))?;
168    info!(store = OBJECT_AGENT_RELEASES, "ready");
169
170    Ok(())
171}