use std::time::Duration;
use chrono::Utc;
use sqlx::types::Json as SqlxJson;
use sqlx::SqlitePool;
use uuid::Uuid;
use crate::auth;
use crate::error::{AppError, AppResult};
use crate::modules::{ModuleRegisterRequest, ModuleRegistration, NavEntry};
pub const WEBHOOK_EVENTS_PATH: &str = "/heldar/events";
pub const HEALTH_PATH: &str = "/heldar/health";
const WEBHOOK_SECRET_PREFIX: &str = "whsec_";
fn validate_plugin_role(role: &str) -> AppResult<&'static str> {
match role {
"viewer" => Ok("viewer"),
"integration" => Ok("integration"),
_ => Err(AppError::BadRequest(
"`role` must be viewer|integration".into(),
)),
}
}
fn validate_id(id: &str) -> AppResult<()> {
let ok = !id.is_empty()
&& id.len() <= 64
&& id
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_');
if !ok {
return Err(AppError::BadRequest(
"`id` must be a slug of [A-Za-z0-9_-], 1..=64 chars".into(),
));
}
Ok(())
}
fn normalize_base_url(url: &str) -> AppResult<String> {
let u = url.trim().trim_end_matches('/');
if !(u.starts_with("http://") || u.starts_with("https://")) {
return Err(AppError::BadRequest(
"`base_url` must be an http(s) URL".into(),
));
}
let after_scheme = u.split_once("://").map(|(_, rest)| rest).unwrap_or("");
if after_scheme.is_empty() || after_scheme.starts_with('/') {
return Err(AppError::BadRequest(
"`base_url` must include a host".into(),
));
}
Ok(u.to_string())
}
pub async fn register(
pool: &SqlitePool,
req: ModuleRegisterRequest,
reserved_ids: &[String],
) -> AppResult<(ModuleRegistration, String, String)> {
validate_id(&req.id)?;
if reserved_ids.iter().any(|r| r == &req.id) {
return Err(AppError::Conflict(format!(
"module id `{}` is reserved by a built-in module",
req.id
)));
}
if get_registered(pool, &req.id).await?.is_some() {
return Err(AppError::Conflict(format!(
"module `{}` is already registered",
req.id
)));
}
let name = req.name.trim();
if name.is_empty() {
return Err(AppError::BadRequest("`name` is required".into()));
}
let base_url = normalize_base_url(&req.base_url)?;
let role = validate_plugin_role(req.role.as_deref().unwrap_or("integration").trim())?;
let nav = if req.nav.is_empty() {
vec![NavEntry::new(&format!("/{}", req.id), name, &req.id)]
} else {
req.nav.clone()
};
let subscribes: Vec<String> = req
.subscribes
.clone()
.map(|s| {
s.into_iter()
.map(|t| t.trim().to_string())
.filter(|t| !t.is_empty())
.collect::<Vec<_>>()
})
.filter(|s| !s.is_empty())
.unwrap_or_else(|| vec!["*".to_string()]);
let api_key = auth::random_token(auth::APIKEY_PREFIX);
let key_prefix: String = api_key.chars().take(12).collect();
let api_key_id = format!("key_{}", Uuid::new_v4().simple());
let webhook_secret = auth::random_token(WEBHOOK_SECRET_PREFIX);
let webhook_id = format!("whs_{}", Uuid::new_v4().simple());
let webhook_url = format!("{base_url}{WEBHOOK_EVENTS_PATH}");
let now = Utc::now();
let mut tx = pool.begin().await?;
sqlx::query(
"INSERT INTO api_keys (id, name, key_hash, key_prefix, role, active, created_at)
VALUES (?,?,?,?,?,1,?)",
)
.bind(&api_key_id)
.bind(format!("module:{}", req.id))
.bind(auth::token_hash(&api_key))
.bind(&key_prefix)
.bind(role)
.bind(now)
.execute(&mut *tx)
.await?;
sqlx::query(
"INSERT INTO webhook_subscriptions
(id, name, url, event_types, min_severity, secret, enabled, cursor_at, created_at, updated_at)
VALUES (?,?,?,?,?,?,1,?,?,?)",
)
.bind(&webhook_id)
.bind(format!("module:{}", req.id))
.bind(&webhook_url)
.bind(SqlxJson(&subscribes))
.bind("info")
.bind(&webhook_secret)
.bind(now)
.bind(now)
.bind(now)
.execute(&mut *tx)
.await?;
sqlx::query(
"INSERT INTO module_registrations
(id, name, version, publisher, description, base_url, nav, subscribes, role,
api_key_id, webhook_id, health, created_at, updated_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
)
.bind(&req.id)
.bind(name)
.bind(req.version.trim())
.bind(req.publisher.trim())
.bind(req.description.trim())
.bind(&base_url)
.bind(SqlxJson(&nav))
.bind(SqlxJson(&subscribes))
.bind(role)
.bind(&api_key_id)
.bind(&webhook_id)
.bind("unknown")
.bind(now)
.bind(now)
.execute(&mut *tx)
.await?;
tx.commit().await?;
let row = get_registered(pool, &req.id)
.await?
.ok_or_else(|| AppError::Other(anyhow::anyhow!("module row missing after insert")))?;
Ok((row, api_key, webhook_secret))
}
pub async fn unregister(pool: &SqlitePool, id: &str) -> AppResult<()> {
let row = get_registered(pool, id)
.await?
.ok_or_else(|| AppError::NotFound(format!("module `{id}` not found")))?;
let mut tx = pool.begin().await?;
sqlx::query("DELETE FROM module_registrations WHERE id = ?")
.bind(id)
.execute(&mut *tx)
.await?;
if let Some(key_id) = &row.api_key_id {
sqlx::query("DELETE FROM api_keys WHERE id = ?")
.bind(key_id)
.execute(&mut *tx)
.await?;
}
if let Some(webhook_id) = &row.webhook_id {
sqlx::query("DELETE FROM webhook_subscriptions WHERE id = ?")
.bind(webhook_id)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
pub async fn list_registered(pool: &SqlitePool) -> AppResult<Vec<ModuleRegistration>> {
Ok(sqlx::query_as::<_, ModuleRegistration>(
"SELECT * FROM module_registrations ORDER BY created_at ASC",
)
.fetch_all(pool)
.await?)
}
pub async fn get_registered(pool: &SqlitePool, id: &str) -> AppResult<Option<ModuleRegistration>> {
Ok(
sqlx::query_as::<_, ModuleRegistration>("SELECT * FROM module_registrations WHERE id = ?")
.bind(id)
.fetch_all(pool)
.await?
.into_iter()
.next(),
)
}
pub async fn run(pool: SqlitePool) {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(5))
.build()
.unwrap_or_default();
let mut tick = tokio::time::interval(Duration::from_secs(30));
loop {
tick.tick().await;
let mods = match list_registered(&pool).await {
Ok(m) => m,
Err(e) => {
tracing::warn!(error = %e, "modules: health sweep failed to list registrations");
continue;
}
};
for m in mods {
let url = format!("{}{}", m.base_url, HEALTH_PATH);
let health = match client.get(&url).send().await {
Ok(r) if r.status().is_success() => "healthy",
_ => "unreachable",
};
if let Err(e) = sqlx::query(
"UPDATE module_registrations SET health = ?, health_checked_at = ? WHERE id = ?",
)
.bind(health)
.bind(Utc::now())
.bind(&m.id)
.execute(&pool)
.await
{
tracing::warn!(module = %m.id, error = %e, "modules: failed to record health");
}
}
}
}