1use axum::body::{Body, Bytes};
10use axum::extract::{OriginalUri, Path, State};
11use axum::http::{HeaderMap, Method, StatusCode};
12use axum::response::Response;
13use axum::routing::{any, get};
14use axum::{Json, Router};
15use serde_json::json;
16
17use crate::auth::{self, Principal};
18use crate::error::{AppError, AppResult};
19use crate::modules::{ModuleDetail, ModuleManifest, ModuleRegisterRequest, ModuleRegistered};
20use crate::services;
21use crate::state::AppState;
22
23pub fn router() -> Router<AppState> {
24 Router::new()
25 .route("/api/v1/modules", get(list).post(register))
26 .route("/api/v1/modules/{id}", get(detail).delete(unregister))
27 .route("/m/{id}", any(proxy_root))
29 .route("/m/{id}/", any(proxy_root))
30 .route("/m/{id}/{*rest}", any(proxy_sub))
31}
32
33async fn list(
35 State(st): State<AppState>,
36 principal: Principal,
37) -> AppResult<Json<Vec<ModuleManifest>>> {
38 principal.require(principal.can_view(), "list modules")?;
39 let mut out: Vec<ModuleManifest> = st.modules.as_ref().clone();
40 for r in services::modules::list_registered(&st.pool).await? {
41 out.push(r.to_manifest());
42 }
43 Ok(Json(out))
44}
45
46async fn register(
48 State(st): State<AppState>,
49 principal: Principal,
50 Json(req): Json<ModuleRegisterRequest>,
51) -> AppResult<(StatusCode, Json<ModuleRegistered>)> {
52 principal.require(principal.can_admin(), "register a module")?;
53 let reserved: Vec<String> = st.modules.iter().map(|m| m.id.clone()).collect();
54 let (row, api_key, webhook_secret) =
55 services::modules::register(&st.pool, req, &reserved).await?;
56 auth::audit(
57 &st.pool,
58 &principal,
59 "register_module",
60 "module",
61 &row.id,
62 json!({ "name": row.name, "base_url": row.base_url, "role": row.role }),
63 )
64 .await;
65 Ok((
66 StatusCode::CREATED,
67 Json(ModuleRegistered {
68 module: ModuleDetail::from(&row),
69 api_key,
70 webhook_secret,
71 }),
72 ))
73}
74
75async fn detail(
77 State(st): State<AppState>,
78 principal: Principal,
79 Path(id): Path<String>,
80) -> AppResult<Json<ModuleDetail>> {
81 principal.require(principal.can_admin(), "view module detail")?;
82 let row = services::modules::get_registered(&st.pool, &id)
83 .await?
84 .ok_or_else(|| AppError::NotFound(format!("module `{id}` not found")))?;
85 Ok(Json(ModuleDetail::from(&row)))
86}
87
88async fn unregister(
90 State(st): State<AppState>,
91 principal: Principal,
92 Path(id): Path<String>,
93) -> AppResult<StatusCode> {
94 principal.require(principal.can_admin(), "unregister a module")?;
95 services::modules::unregister(&st.pool, &id).await?;
96 auth::audit(
97 &st.pool,
98 &principal,
99 "unregister_module",
100 "module",
101 &id,
102 json!({}),
103 )
104 .await;
105 Ok(StatusCode::NO_CONTENT)
106}
107
108const HOP_BY_HOP: &[&str] = &[
114 "connection",
115 "keep-alive",
116 "proxy-authenticate",
117 "proxy-authorization",
118 "te",
119 "trailer",
120 "transfer-encoding",
121 "upgrade",
122 "content-length",
123 "host",
124];
125
126async fn proxy_root(
127 State(st): State<AppState>,
128 principal: Principal,
129 method: Method,
130 Path(id): Path<String>,
131 uri: OriginalUri,
132 headers: HeaderMap,
133 body: Bytes,
134) -> AppResult<Response> {
135 forward(&st, &principal, &id, "", uri, method, headers, body).await
136}
137
138async fn proxy_sub(
139 State(st): State<AppState>,
140 principal: Principal,
141 method: Method,
142 Path((id, rest)): Path<(String, String)>,
143 uri: OriginalUri,
144 headers: HeaderMap,
145 body: Bytes,
146) -> AppResult<Response> {
147 forward(&st, &principal, &id, &rest, uri, method, headers, body).await
148}
149
150#[allow(clippy::too_many_arguments)]
151async fn forward(
152 st: &AppState,
153 principal: &Principal,
154 id: &str,
155 rest: &str,
156 uri: OriginalUri,
157 method: Method,
158 headers: HeaderMap,
159 body: Bytes,
160) -> AppResult<Response> {
161 principal.require(principal.can_view(), "access a module")?;
162 let row = services::modules::get_registered(&st.pool, id)
163 .await?
164 .ok_or_else(|| AppError::NotFound(format!("module `{id}` not found")))?;
165
166 let query = uri.0.query().map(|q| format!("?{q}")).unwrap_or_default();
167 let target = format!("{}/{}{}", row.base_url, rest, query);
168
169 let mut rb = st.http.request(method, &target);
170 for (k, v) in headers.iter() {
171 let name = k.as_str().to_ascii_lowercase();
172 if HOP_BY_HOP.contains(&name.as_str()) || name == "cookie" || name == "authorization" {
175 continue;
176 }
177 rb = rb.header(k, v);
178 }
179 if !body.is_empty() {
180 rb = rb.body(body.to_vec());
181 }
182 let resp = rb.send().await.map_err(|e| {
183 AppError::Other(anyhow::anyhow!(
184 "module `{id}` proxy to {target} failed: {e}"
185 ))
186 })?;
187
188 let status = resp.status();
189 let mut out = Response::builder().status(status);
190 for (k, v) in resp.headers().iter() {
191 if HOP_BY_HOP.contains(&k.as_str().to_ascii_lowercase().as_str()) {
192 continue;
193 }
194 out = out.header(k, v);
195 }
196 let bytes = resp
197 .bytes()
198 .await
199 .map_err(|e| AppError::Other(anyhow::anyhow!("module `{id}` proxy read failed: {e}")))?;
200 out.body(Body::from(bytes))
201 .map_err(|e| AppError::Other(anyhow::anyhow!("module `{id}` proxy response build: {e}")))
202}
203
204#[cfg(test)]
205mod tests {
206 use crate::config::Config;
207 use crate::modules::{ModuleKind, ModuleManifest, NavEntry};
208 use crate::services::recorder::RecorderManager;
209 use crate::services::sampler::SamplerManager;
210 use crate::state::AppState;
211 use axum::body::Body;
212 use axum::http::Request;
213 use serde_json::json;
214 use std::sync::Arc;
215 use tower::Service;
216
217 async fn state_with(modules: Vec<ModuleManifest>) -> AppState {
218 let pool = sqlx::sqlite::SqlitePoolOptions::new()
219 .max_connections(1)
220 .connect("sqlite::memory:")
221 .await
222 .unwrap();
223 crate::db::run_migrations(&pool).await.unwrap();
224 let mut cfg = Config::from_env();
225 cfg.auth_enabled = false; let cfg = Arc::new(cfg);
227 AppState {
228 recorder: RecorderManager::new(pool.clone(), cfg.clone()),
229 sampler: SamplerManager::new(pool.clone(), cfg.clone()),
230 mirror: None,
231 consumers: Arc::new(Vec::new()),
232 modules: Arc::new(modules),
233 http: reqwest::Client::new(),
234 started_at: chrono::Utc::now(),
235 pool,
236 cfg,
237 }
238 }
239
240 #[tokio::test]
242 async fn lists_loaded_modules() {
243 let m = ModuleManifest::new(
244 "entry",
245 "Access Control",
246 "9.9.9",
247 "Heldar",
248 ModuleKind::Core,
249 "desc",
250 vec![NavEntry::new("/entry", "Entry", "entry")],
251 );
252 let st = state_with(vec![m]).await;
253 let mut app = super::router().with_state(st);
254 let res = app
255 .call(
256 Request::builder()
257 .uri("/api/v1/modules")
258 .body(Body::empty())
259 .unwrap(),
260 )
261 .await
262 .unwrap();
263 assert_eq!(res.status(), 200);
264 let bytes = axum::body::to_bytes(res.into_body(), usize::MAX)
265 .await
266 .unwrap();
267 let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
268 let arr = json.as_array().expect("array");
269 assert_eq!(arr.len(), 1);
270 assert_eq!(json[0]["id"], "entry");
271 assert_eq!(json[0]["kind"], "core"); assert_eq!(json[0]["nav"][0]["path"], "/entry");
273 }
274
275 #[tokio::test]
277 async fn empty_when_no_modules() {
278 let st = state_with(vec![]).await;
279 let mut app = super::router().with_state(st);
280 let res = app
281 .call(
282 Request::builder()
283 .uri("/api/v1/modules")
284 .body(Body::empty())
285 .unwrap(),
286 )
287 .await
288 .unwrap();
289 assert_eq!(res.status(), 200);
290 let bytes = axum::body::to_bytes(res.into_body(), usize::MAX)
291 .await
292 .unwrap();
293 assert_eq!(&bytes[..], b"[]");
294 }
295
296 async fn send(st: AppState, req: Request<Body>) -> (axum::http::StatusCode, serde_json::Value) {
298 let mut app = super::router().with_state(st);
299 let res = app.call(req).await.unwrap();
300 let status = res.status();
301 let bytes = axum::body::to_bytes(res.into_body(), usize::MAX)
302 .await
303 .unwrap();
304 let json = if bytes.is_empty() {
305 serde_json::Value::Null
306 } else {
307 serde_json::from_slice(&bytes).unwrap_or(serde_json::Value::Null)
308 };
309 (status, json)
310 }
311
312 fn post_json(uri: &str, body: serde_json::Value) -> Request<Body> {
313 Request::builder()
314 .method("POST")
315 .uri(uri)
316 .header("content-type", "application/json")
317 .body(Body::from(body.to_string()))
318 .unwrap()
319 }
320
321 fn compiled_entry() -> ModuleManifest {
322 ModuleManifest::new(
323 "entry",
324 "Access Control",
325 "9.9.9",
326 "Heldar",
327 ModuleKind::Core,
328 "d",
329 vec![NavEntry::new("/entry", "Entry", "entry")],
330 )
331 }
332
333 #[tokio::test]
336 async fn register_list_unregister_lifecycle() {
337 let st = state_with(vec![compiled_entry()]).await;
338
339 let (status, json) = send(
340 st.clone(),
341 post_json(
342 "/api/v1/modules",
343 json!({
344 "id": "hello",
345 "name": "Hello Plugin",
346 "version": "1.0.0",
347 "publisher": "ACME",
348 "base_url": "http://127.0.0.1:9123",
349 "subscribes": ["zone_enter"],
350 "role": "integration"
351 }),
352 ),
353 )
354 .await;
355 assert_eq!(status, 201);
356 assert!(json["api_key"].as_str().unwrap().starts_with("vok_"));
357 assert!(json["webhook_secret"]
358 .as_str()
359 .unwrap()
360 .starts_with("whsec_"));
361 assert_eq!(json["module"]["base_url"], "http://127.0.0.1:9123");
362
363 let (_, list) = send(
364 st.clone(),
365 Request::builder()
366 .uri("/api/v1/modules")
367 .body(Body::empty())
368 .unwrap(),
369 )
370 .await;
371 let arr = list.as_array().unwrap();
372 assert_eq!(arr.len(), 2);
373 let hello = arr.iter().find(|m| m["id"] == "hello").unwrap();
374 assert_eq!(hello["kind"], "imported");
375 assert_eq!(hello["mount"], "iframe");
376 assert_eq!(hello["nav"][0]["path"], "/hello"); let role: Option<String> =
380 sqlx::query_scalar("SELECT role FROM api_keys WHERE name = 'module:hello'")
381 .fetch_optional(&st.pool)
382 .await
383 .unwrap();
384 assert_eq!(role.as_deref(), Some("integration"));
385 let url: Option<String> =
386 sqlx::query_scalar("SELECT url FROM webhook_subscriptions WHERE name = 'module:hello'")
387 .fetch_optional(&st.pool)
388 .await
389 .unwrap();
390 assert_eq!(url.as_deref(), Some("http://127.0.0.1:9123/heldar/events"));
391
392 let (status, _) = send(
393 st.clone(),
394 Request::builder()
395 .method("DELETE")
396 .uri("/api/v1/modules/hello")
397 .body(Body::empty())
398 .unwrap(),
399 )
400 .await;
401 assert_eq!(status, 204);
402
403 let (_, list) = send(
404 st.clone(),
405 Request::builder()
406 .uri("/api/v1/modules")
407 .body(Body::empty())
408 .unwrap(),
409 )
410 .await;
411 assert_eq!(list.as_array().unwrap().len(), 1);
412 let key_gone: Option<String> =
413 sqlx::query_scalar("SELECT id FROM api_keys WHERE name = 'module:hello'")
414 .fetch_optional(&st.pool)
415 .await
416 .unwrap();
417 assert!(key_gone.is_none());
418 let wh_gone: Option<String> =
419 sqlx::query_scalar("SELECT id FROM webhook_subscriptions WHERE name = 'module:hello'")
420 .fetch_optional(&st.pool)
421 .await
422 .unwrap();
423 assert!(wh_gone.is_none());
424 }
425
426 #[tokio::test]
428 async fn rejects_reserved_id() {
429 let st = state_with(vec![compiled_entry()]).await;
430 let (status, _) = send(
431 st,
432 post_json(
433 "/api/v1/modules",
434 json!({ "id": "entry", "name": "x", "base_url": "http://127.0.0.1:1" }),
435 ),
436 )
437 .await;
438 assert_eq!(status, 409);
439 }
440
441 #[tokio::test]
443 async fn rejects_privileged_role() {
444 let st = state_with(vec![]).await;
445 let (status, _) = send(
446 st,
447 post_json(
448 "/api/v1/modules",
449 json!({ "id": "x", "name": "x", "base_url": "http://127.0.0.1:1", "role": "admin" }),
450 ),
451 )
452 .await;
453 assert_eq!(status, 400);
454 }
455}