1use std::time::Duration;
9
10use chrono::Utc;
11use sqlx::types::Json as SqlxJson;
12use sqlx::SqlitePool;
13use uuid::Uuid;
14
15use crate::auth;
16use crate::error::{AppError, AppResult};
17use crate::modules::{ModuleRegisterRequest, ModuleRegistration, NavEntry};
18
19pub const WEBHOOK_EVENTS_PATH: &str = "/heldar/events";
21pub const HEALTH_PATH: &str = "/heldar/health";
23const WEBHOOK_SECRET_PREFIX: &str = "whsec_";
24
25fn validate_plugin_role(role: &str) -> AppResult<&'static str> {
28 match role {
29 "viewer" => Ok("viewer"),
30 "integration" => Ok("integration"),
31 _ => Err(AppError::BadRequest(
32 "`role` must be viewer|integration".into(),
33 )),
34 }
35}
36
37fn validate_id(id: &str) -> AppResult<()> {
38 let ok = !id.is_empty()
39 && id.len() <= 64
40 && id
41 .chars()
42 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_');
43 if !ok {
44 return Err(AppError::BadRequest(
45 "`id` must be a slug of [A-Za-z0-9_-], 1..=64 chars".into(),
46 ));
47 }
48 Ok(())
49}
50
51fn normalize_base_url(url: &str) -> AppResult<String> {
53 let u = url.trim().trim_end_matches('/');
54 if !(u.starts_with("http://") || u.starts_with("https://")) {
55 return Err(AppError::BadRequest(
56 "`base_url` must be an http(s) URL".into(),
57 ));
58 }
59 let after_scheme = u.split_once("://").map(|(_, rest)| rest).unwrap_or("");
61 if after_scheme.is_empty() || after_scheme.starts_with('/') {
62 return Err(AppError::BadRequest(
63 "`base_url` must include a host".into(),
64 ));
65 }
66 Ok(u.to_string())
67}
68
69pub async fn register(
72 pool: &SqlitePool,
73 req: ModuleRegisterRequest,
74 reserved_ids: &[String],
75) -> AppResult<(ModuleRegistration, String, String)> {
76 validate_id(&req.id)?;
77 if reserved_ids.iter().any(|r| r == &req.id) {
78 return Err(AppError::Conflict(format!(
79 "module id `{}` is reserved by a built-in module",
80 req.id
81 )));
82 }
83 if get_registered(pool, &req.id).await?.is_some() {
84 return Err(AppError::Conflict(format!(
85 "module `{}` is already registered",
86 req.id
87 )));
88 }
89 let name = req.name.trim();
90 if name.is_empty() {
91 return Err(AppError::BadRequest("`name` is required".into()));
92 }
93 let base_url = normalize_base_url(&req.base_url)?;
94 let role = validate_plugin_role(req.role.as_deref().unwrap_or("integration").trim())?;
95
96 let nav = if req.nav.is_empty() {
98 vec![NavEntry::new(&format!("/{}", req.id), name, &req.id)]
99 } else {
100 req.nav.clone()
101 };
102 let subscribes: Vec<String> = req
103 .subscribes
104 .clone()
105 .map(|s| {
106 s.into_iter()
107 .map(|t| t.trim().to_string())
108 .filter(|t| !t.is_empty())
109 .collect::<Vec<_>>()
110 })
111 .filter(|s| !s.is_empty())
112 .unwrap_or_else(|| vec!["*".to_string()]);
113
114 let api_key = auth::random_token(auth::APIKEY_PREFIX);
116 let key_prefix: String = api_key.chars().take(12).collect();
117 let api_key_id = format!("key_{}", Uuid::new_v4().simple());
118 let webhook_secret = auth::random_token(WEBHOOK_SECRET_PREFIX);
120 let webhook_id = format!("whs_{}", Uuid::new_v4().simple());
121 let webhook_url = format!("{base_url}{WEBHOOK_EVENTS_PATH}");
122 let now = Utc::now();
123
124 let mut tx = pool.begin().await?;
125 sqlx::query(
126 "INSERT INTO api_keys (id, name, key_hash, key_prefix, role, active, created_at)
127 VALUES (?,?,?,?,?,1,?)",
128 )
129 .bind(&api_key_id)
130 .bind(format!("module:{}", req.id))
131 .bind(auth::token_hash(&api_key))
132 .bind(&key_prefix)
133 .bind(role)
134 .bind(now)
135 .execute(&mut *tx)
136 .await?;
137 sqlx::query(
138 "INSERT INTO webhook_subscriptions
139 (id, name, url, event_types, min_severity, secret, enabled, cursor_at, created_at, updated_at)
140 VALUES (?,?,?,?,?,?,1,?,?,?)",
141 )
142 .bind(&webhook_id)
143 .bind(format!("module:{}", req.id))
144 .bind(&webhook_url)
145 .bind(SqlxJson(&subscribes))
146 .bind("info")
147 .bind(&webhook_secret)
148 .bind(now)
149 .bind(now)
150 .bind(now)
151 .execute(&mut *tx)
152 .await?;
153 sqlx::query(
154 "INSERT INTO module_registrations
155 (id, name, version, publisher, description, base_url, nav, subscribes, role,
156 api_key_id, webhook_id, health, created_at, updated_at)
157 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
158 )
159 .bind(&req.id)
160 .bind(name)
161 .bind(req.version.trim())
162 .bind(req.publisher.trim())
163 .bind(req.description.trim())
164 .bind(&base_url)
165 .bind(SqlxJson(&nav))
166 .bind(SqlxJson(&subscribes))
167 .bind(role)
168 .bind(&api_key_id)
169 .bind(&webhook_id)
170 .bind("unknown")
171 .bind(now)
172 .bind(now)
173 .execute(&mut *tx)
174 .await?;
175 tx.commit().await?;
176
177 let row = get_registered(pool, &req.id)
178 .await?
179 .ok_or_else(|| AppError::Other(anyhow::anyhow!("module row missing after insert")))?;
180 Ok((row, api_key, webhook_secret))
181}
182
183pub async fn unregister(pool: &SqlitePool, id: &str) -> AppResult<()> {
186 let row = get_registered(pool, id)
187 .await?
188 .ok_or_else(|| AppError::NotFound(format!("module `{id}` not found")))?;
189 let mut tx = pool.begin().await?;
190 sqlx::query("DELETE FROM module_registrations WHERE id = ?")
191 .bind(id)
192 .execute(&mut *tx)
193 .await?;
194 if let Some(key_id) = &row.api_key_id {
195 sqlx::query("DELETE FROM api_keys WHERE id = ?")
196 .bind(key_id)
197 .execute(&mut *tx)
198 .await?;
199 }
200 if let Some(webhook_id) = &row.webhook_id {
201 sqlx::query("DELETE FROM webhook_subscriptions WHERE id = ?")
202 .bind(webhook_id)
203 .execute(&mut *tx)
204 .await?;
205 }
206 tx.commit().await?;
207 Ok(())
208}
209
210pub async fn list_registered(pool: &SqlitePool) -> AppResult<Vec<ModuleRegistration>> {
211 Ok(sqlx::query_as::<_, ModuleRegistration>(
212 "SELECT * FROM module_registrations ORDER BY created_at ASC",
213 )
214 .fetch_all(pool)
215 .await?)
216}
217
218pub async fn get_registered(pool: &SqlitePool, id: &str) -> AppResult<Option<ModuleRegistration>> {
219 Ok(
220 sqlx::query_as::<_, ModuleRegistration>("SELECT * FROM module_registrations WHERE id = ?")
221 .bind(id)
222 .fetch_all(pool)
223 .await?
224 .into_iter()
225 .next(),
226 )
227}
228
229pub async fn run(pool: SqlitePool) {
232 let client = reqwest::Client::builder()
233 .timeout(Duration::from_secs(5))
234 .build()
235 .unwrap_or_default();
236 let mut tick = tokio::time::interval(Duration::from_secs(30));
237 loop {
238 tick.tick().await;
239 let mods = match list_registered(&pool).await {
240 Ok(m) => m,
241 Err(e) => {
242 tracing::warn!(error = %e, "modules: health sweep failed to list registrations");
243 continue;
244 }
245 };
246 for m in mods {
247 let url = format!("{}{}", m.base_url, HEALTH_PATH);
248 let health = match client.get(&url).send().await {
249 Ok(r) if r.status().is_success() => "healthy",
250 _ => "unreachable",
251 };
252 if let Err(e) = sqlx::query(
253 "UPDATE module_registrations SET health = ?, health_checked_at = ? WHERE id = ?",
254 )
255 .bind(health)
256 .bind(Utc::now())
257 .bind(&m.id)
258 .execute(&pool)
259 .await
260 {
261 tracing::warn!(module = %m.id, error = %e, "modules: failed to record health");
262 }
263 }
264 }
265}