Skip to main content

heldar_kernel/services/
modules.rs

1//! Sidecar module registration + health (Phase B of the plugin platform).
2//!
3//! Registering a sidecar atomically mints three reversible things: a scoped API key the sidecar uses
4//! to call kernel APIs, a webhook subscription that feeds it the events it subscribes to, and (via the
5//! stored row) a reverse-proxy mount at `/m/{id}/*`. Unregistering deletes all three. A background
6//! loop probes each sidecar's `/heldar/health` and records reachability.
7
8use std::time::Duration;
9
10use chrono::Utc;
11use sqlx::types::Json as SqlxJson;
12use sqlx::SqlitePool;
13use uuid::Uuid;
14
15use crate::auth;
16use crate::error::{AppError, AppResult};
17use crate::modules::{ModuleRegisterRequest, ModuleRegistration, NavEntry};
18
19/// Path on the sidecar that receives signed event deliveries (the minted webhook points here).
20pub const WEBHOOK_EVENTS_PATH: &str = "/heldar/events";
21/// Path on the sidecar the health probe hits; a 2xx means healthy.
22pub const HEALTH_PATH: &str = "/heldar/health";
23const WEBHOOK_SECRET_PREFIX: &str = "whsec_";
24
25/// Plugin keys are least-privilege: only `viewer` (read) or `integration` (read + ingest) are
26/// grantable. `admin`/`manager`/`guard` are never minted for a sidecar.
27fn validate_plugin_role(role: &str) -> AppResult<&'static str> {
28    match role {
29        "viewer" => Ok("viewer"),
30        "integration" => Ok("integration"),
31        _ => Err(AppError::BadRequest(
32            "`role` must be viewer|integration".into(),
33        )),
34    }
35}
36
37fn validate_id(id: &str) -> AppResult<()> {
38    let ok = !id.is_empty()
39        && id.len() <= 64
40        && id
41            .chars()
42            .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_');
43    if !ok {
44        return Err(AppError::BadRequest(
45            "`id` must be a slug of [A-Za-z0-9_-], 1..=64 chars".into(),
46        ));
47    }
48    Ok(())
49}
50
51/// Validate + normalize the sidecar origin (http/https, no trailing slash, no path/query).
52fn normalize_base_url(url: &str) -> AppResult<String> {
53    let u = url.trim().trim_end_matches('/');
54    if !(u.starts_with("http://") || u.starts_with("https://")) {
55        return Err(AppError::BadRequest(
56            "`base_url` must be an http(s) URL".into(),
57        ));
58    }
59    // Reject an obviously malformed origin (scheme with no host).
60    let after_scheme = u.split_once("://").map(|(_, rest)| rest).unwrap_or("");
61    if after_scheme.is_empty() || after_scheme.starts_with('/') {
62        return Err(AppError::BadRequest(
63            "`base_url` must include a host".into(),
64        ));
65    }
66    Ok(u.to_string())
67}
68
69/// Register a sidecar: mint its scoped key + webhook subscription and persist the row, atomically.
70/// Returns the stored row plus the once-only credentials (plaintext key + webhook secret).
71pub async fn register(
72    pool: &SqlitePool,
73    req: ModuleRegisterRequest,
74    reserved_ids: &[String],
75) -> AppResult<(ModuleRegistration, String, String)> {
76    validate_id(&req.id)?;
77    if reserved_ids.iter().any(|r| r == &req.id) {
78        return Err(AppError::Conflict(format!(
79            "module id `{}` is reserved by a built-in module",
80            req.id
81        )));
82    }
83    if get_registered(pool, &req.id).await?.is_some() {
84        return Err(AppError::Conflict(format!(
85            "module `{}` is already registered",
86            req.id
87        )));
88    }
89    let name = req.name.trim();
90    if name.is_empty() {
91        return Err(AppError::BadRequest("`name` is required".into()));
92    }
93    let base_url = normalize_base_url(&req.base_url)?;
94    let role = validate_plugin_role(req.role.as_deref().unwrap_or("integration").trim())?;
95
96    // Default to a single nav entry at /{id} if the plugin declared none.
97    let nav = if req.nav.is_empty() {
98        vec![NavEntry::new(&format!("/{}", req.id), name, &req.id)]
99    } else {
100        req.nav.clone()
101    };
102    let subscribes: Vec<String> = req
103        .subscribes
104        .clone()
105        .map(|s| {
106            s.into_iter()
107                .map(|t| t.trim().to_string())
108                .filter(|t| !t.is_empty())
109                .collect::<Vec<_>>()
110        })
111        .filter(|s| !s.is_empty())
112        .unwrap_or_else(|| vec!["*".to_string()]);
113
114    // Mint the scoped API key (plaintext returned once; only its hash is stored).
115    let api_key = auth::random_token(auth::APIKEY_PREFIX);
116    let key_prefix: String = api_key.chars().take(12).collect();
117    let api_key_id = format!("key_{}", Uuid::new_v4().simple());
118    // Mint the webhook subscription that feeds the sidecar.
119    let webhook_secret = auth::random_token(WEBHOOK_SECRET_PREFIX);
120    let webhook_id = format!("whs_{}", Uuid::new_v4().simple());
121    let webhook_url = format!("{base_url}{WEBHOOK_EVENTS_PATH}");
122    let now = Utc::now();
123
124    let mut tx = pool.begin().await?;
125    sqlx::query(
126        "INSERT INTO api_keys (id, name, key_hash, key_prefix, role, active, created_at)
127         VALUES (?,?,?,?,?,1,?)",
128    )
129    .bind(&api_key_id)
130    .bind(format!("module:{}", req.id))
131    .bind(auth::token_hash(&api_key))
132    .bind(&key_prefix)
133    .bind(role)
134    .bind(now)
135    .execute(&mut *tx)
136    .await?;
137    sqlx::query(
138        "INSERT INTO webhook_subscriptions
139           (id, name, url, event_types, min_severity, secret, enabled, cursor_at, created_at, updated_at)
140         VALUES (?,?,?,?,?,?,1,?,?,?)",
141    )
142    .bind(&webhook_id)
143    .bind(format!("module:{}", req.id))
144    .bind(&webhook_url)
145    .bind(SqlxJson(&subscribes))
146    .bind("info")
147    .bind(&webhook_secret)
148    .bind(now)
149    .bind(now)
150    .bind(now)
151    .execute(&mut *tx)
152    .await?;
153    sqlx::query(
154        "INSERT INTO module_registrations
155           (id, name, version, publisher, description, base_url, nav, subscribes, role,
156            api_key_id, webhook_id, health, created_at, updated_at)
157         VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
158    )
159    .bind(&req.id)
160    .bind(name)
161    .bind(req.version.trim())
162    .bind(req.publisher.trim())
163    .bind(req.description.trim())
164    .bind(&base_url)
165    .bind(SqlxJson(&nav))
166    .bind(SqlxJson(&subscribes))
167    .bind(role)
168    .bind(&api_key_id)
169    .bind(&webhook_id)
170    .bind("unknown")
171    .bind(now)
172    .bind(now)
173    .execute(&mut *tx)
174    .await?;
175    tx.commit().await?;
176
177    let row = get_registered(pool, &req.id)
178        .await?
179        .ok_or_else(|| AppError::Other(anyhow::anyhow!("module row missing after insert")))?;
180    Ok((row, api_key, webhook_secret))
181}
182
183/// Unregister a sidecar: delete its row, revoke its API key, and remove its webhook subscription
184/// (its delivery ledger cascades). Idempotent-ish: returns NotFound if the id is unknown.
185pub async fn unregister(pool: &SqlitePool, id: &str) -> AppResult<()> {
186    let row = get_registered(pool, id)
187        .await?
188        .ok_or_else(|| AppError::NotFound(format!("module `{id}` not found")))?;
189    let mut tx = pool.begin().await?;
190    sqlx::query("DELETE FROM module_registrations WHERE id = ?")
191        .bind(id)
192        .execute(&mut *tx)
193        .await?;
194    if let Some(key_id) = &row.api_key_id {
195        sqlx::query("DELETE FROM api_keys WHERE id = ?")
196            .bind(key_id)
197            .execute(&mut *tx)
198            .await?;
199    }
200    if let Some(webhook_id) = &row.webhook_id {
201        sqlx::query("DELETE FROM webhook_subscriptions WHERE id = ?")
202            .bind(webhook_id)
203            .execute(&mut *tx)
204            .await?;
205    }
206    tx.commit().await?;
207    Ok(())
208}
209
210pub async fn list_registered(pool: &SqlitePool) -> AppResult<Vec<ModuleRegistration>> {
211    Ok(sqlx::query_as::<_, ModuleRegistration>(
212        "SELECT * FROM module_registrations ORDER BY created_at ASC",
213    )
214    .fetch_all(pool)
215    .await?)
216}
217
218pub async fn get_registered(pool: &SqlitePool, id: &str) -> AppResult<Option<ModuleRegistration>> {
219    Ok(
220        sqlx::query_as::<_, ModuleRegistration>("SELECT * FROM module_registrations WHERE id = ?")
221            .bind(id)
222            .fetch_all(pool)
223            .await?
224            .into_iter()
225            .next(),
226    )
227}
228
229/// Background loop: probe each registered sidecar's `/heldar/health` and record reachability so the
230/// dashboard can badge healthy/unreachable plugins. Never returns (supervised).
231pub async fn run(pool: SqlitePool) {
232    let client = reqwest::Client::builder()
233        .timeout(Duration::from_secs(5))
234        .build()
235        .unwrap_or_default();
236    let mut tick = tokio::time::interval(Duration::from_secs(30));
237    loop {
238        tick.tick().await;
239        let mods = match list_registered(&pool).await {
240            Ok(m) => m,
241            Err(e) => {
242                tracing::warn!(error = %e, "modules: health sweep failed to list registrations");
243                continue;
244            }
245        };
246        for m in mods {
247            let url = format!("{}{}", m.base_url, HEALTH_PATH);
248            let health = match client.get(&url).send().await {
249                Ok(r) if r.status().is_success() => "healthy",
250                _ => "unreachable",
251            };
252            if let Err(e) = sqlx::query(
253                "UPDATE module_registrations SET health = ?, health_checked_at = ? WHERE id = ?",
254            )
255            .bind(health)
256            .bind(Utc::now())
257            .bind(&m.id)
258            .execute(&pool)
259            .await
260            {
261                tracing::warn!(module = %m.id, error = %e, "modules: failed to record health");
262            }
263        }
264    }
265}