1use axum::body::{Body, Bytes};
10use axum::extract::{OriginalUri, Path, State};
11use axum::http::{HeaderMap, Method, StatusCode};
12use axum::response::Response;
13use axum::routing::{any, get};
14use axum::{Json, Router};
15use serde_json::json;
16
17use crate::auth::{self, Principal};
18use crate::error::{AppError, AppResult};
19use crate::modules::{ModuleDetail, ModuleManifest, ModuleRegisterRequest, ModuleRegistered};
20use crate::services;
21use crate::state::AppState;
22
23pub fn router() -> Router<AppState> {
24 Router::new()
25 .route("/api/v1/modules", get(list).post(register))
26 .route("/api/v1/modules/{id}", get(detail).delete(unregister))
27 .route("/m/{id}", any(proxy_root))
29 .route("/m/{id}/", any(proxy_root))
30 .route("/m/{id}/{*rest}", any(proxy_sub))
31}
32
33async fn list(
35 State(st): State<AppState>,
36 principal: Principal,
37) -> AppResult<Json<Vec<ModuleManifest>>> {
38 principal.require(principal.can_view(), "list modules")?;
39 let mut out: Vec<ModuleManifest> = st.modules.as_ref().clone();
40 for r in services::modules::list_registered(&st.pool).await? {
41 out.push(r.to_manifest());
42 }
43 Ok(Json(out))
44}
45
46async fn register(
48 State(st): State<AppState>,
49 principal: Principal,
50 Json(req): Json<ModuleRegisterRequest>,
51) -> AppResult<(StatusCode, Json<ModuleRegistered>)> {
52 principal.require(principal.can_admin(), "register a module")?;
53 let reserved: Vec<String> = st.modules.iter().map(|m| m.id.clone()).collect();
54 let (row, api_key, webhook_secret) =
55 services::modules::register(&st.pool, req, &reserved).await?;
56 auth::audit(
57 &st.pool,
58 &principal,
59 "register_module",
60 "module",
61 &row.id,
62 json!({ "name": row.name, "base_url": row.base_url, "role": row.role }),
63 )
64 .await;
65 Ok((
66 StatusCode::CREATED,
67 Json(ModuleRegistered {
68 module: ModuleDetail::from(&row),
69 api_key,
70 webhook_secret,
71 }),
72 ))
73}
74
75async fn detail(
77 State(st): State<AppState>,
78 principal: Principal,
79 Path(id): Path<String>,
80) -> AppResult<Json<ModuleDetail>> {
81 principal.require(principal.can_admin(), "view module detail")?;
82 let row = services::modules::get_registered(&st.pool, &id)
83 .await?
84 .ok_or_else(|| AppError::NotFound(format!("module `{id}` not found")))?;
85 Ok(Json(ModuleDetail::from(&row)))
86}
87
88async fn unregister(
90 State(st): State<AppState>,
91 principal: Principal,
92 Path(id): Path<String>,
93) -> AppResult<StatusCode> {
94 principal.require(principal.can_admin(), "unregister a module")?;
95 services::modules::unregister(&st.pool, &id).await?;
96 auth::audit(
97 &st.pool,
98 &principal,
99 "unregister_module",
100 "module",
101 &id,
102 json!({}),
103 )
104 .await;
105 Ok(StatusCode::NO_CONTENT)
106}
107
108const HOP_BY_HOP: &[&str] = &[
114 "connection",
115 "keep-alive",
116 "proxy-authenticate",
117 "proxy-authorization",
118 "te",
119 "trailer",
120 "transfer-encoding",
121 "upgrade",
122 "content-length",
123 "host",
124];
125
126async fn proxy_root(
127 State(st): State<AppState>,
128 principal: Principal,
129 method: Method,
130 Path(id): Path<String>,
131 uri: OriginalUri,
132 headers: HeaderMap,
133 body: Bytes,
134) -> AppResult<Response> {
135 forward(&st, &principal, &id, "", uri, method, headers, body).await
136}
137
138async fn proxy_sub(
139 State(st): State<AppState>,
140 principal: Principal,
141 method: Method,
142 Path((id, rest)): Path<(String, String)>,
143 uri: OriginalUri,
144 headers: HeaderMap,
145 body: Bytes,
146) -> AppResult<Response> {
147 forward(&st, &principal, &id, &rest, uri, method, headers, body).await
148}
149
150#[allow(clippy::too_many_arguments)]
151async fn forward(
152 st: &AppState,
153 principal: &Principal,
154 id: &str,
155 rest: &str,
156 uri: OriginalUri,
157 method: Method,
158 headers: HeaderMap,
159 body: Bytes,
160) -> AppResult<Response> {
161 principal.require(principal.can_view(), "access a module")?;
162 let row = services::modules::get_registered(&st.pool, id)
163 .await?
164 .ok_or_else(|| AppError::NotFound(format!("module `{id}` not found")))?;
165
166 let query = uri.0.query().map(|q| format!("?{q}")).unwrap_or_default();
167 let target = format!("{}/{}{}", row.base_url, rest, query);
168
169 let mut rb = st.http.request(method, &target);
170 for (k, v) in headers.iter() {
171 let name = k.as_str().to_ascii_lowercase();
172 if HOP_BY_HOP.contains(&name.as_str()) || name == "cookie" || name == "authorization" {
175 continue;
176 }
177 rb = rb.header(k, v);
178 }
179 if !body.is_empty() {
180 rb = rb.body(body.to_vec());
181 }
182 let resp = rb.send().await.map_err(|e| {
183 AppError::Other(anyhow::anyhow!(
184 "module `{id}` proxy to {target} failed: {e}"
185 ))
186 })?;
187
188 let status = resp.status();
189 let mut out = Response::builder().status(status);
190 for (k, v) in resp.headers().iter() {
191 if HOP_BY_HOP.contains(&k.as_str().to_ascii_lowercase().as_str()) {
192 continue;
193 }
194 out = out.header(k, v);
195 }
196 let bytes = resp
197 .bytes()
198 .await
199 .map_err(|e| AppError::Other(anyhow::anyhow!("module `{id}` proxy read failed: {e}")))?;
200 out.body(Body::from(bytes))
201 .map_err(|e| AppError::Other(anyhow::anyhow!("module `{id}` proxy response build: {e}")))
202}
203
204#[cfg(test)]
205mod tests {
206 use crate::config::Config;
207 use crate::modules::{ModuleKind, ModuleManifest, NavEntry};
208 use crate::services::recorder::RecorderManager;
209 use crate::services::sampler::SamplerManager;
210 use crate::state::AppState;
211 use axum::body::Body;
212 use axum::http::Request;
213 use serde_json::json;
214 use std::sync::Arc;
215 use tower::Service;
216
217 async fn state_with(modules: Vec<ModuleManifest>) -> AppState {
218 let pool = sqlx::sqlite::SqlitePoolOptions::new()
219 .max_connections(1)
220 .connect("sqlite::memory:")
221 .await
222 .unwrap();
223 crate::db::run_migrations(&pool).await.unwrap();
224 let mut cfg = Config::from_env();
225 cfg.auth_enabled = false; let cfg = Arc::new(cfg);
227 AppState {
228 recorder: RecorderManager::new(pool.clone(), cfg.clone()),
229 sampler: SamplerManager::new(pool.clone(), cfg.clone()),
230 mirror: None,
231 consumers: Arc::new(Vec::new()),
232 modules: Arc::new(modules),
233 catalog: Arc::new(crate::services::registry::CatalogService::new(&cfg)),
234 http: reqwest::Client::new(),
235 started_at: chrono::Utc::now(),
236 pool,
237 cfg,
238 }
239 }
240
241 #[tokio::test]
243 async fn lists_loaded_modules() {
244 let m = ModuleManifest::new(
245 "entry",
246 "Access Control",
247 "9.9.9",
248 "Heldar",
249 ModuleKind::Core,
250 "desc",
251 vec![NavEntry::new("/entry", "Entry", "entry")],
252 );
253 let st = state_with(vec![m]).await;
254 let mut app = super::router().with_state(st);
255 let res = app
256 .call(
257 Request::builder()
258 .uri("/api/v1/modules")
259 .body(Body::empty())
260 .unwrap(),
261 )
262 .await
263 .unwrap();
264 assert_eq!(res.status(), 200);
265 let bytes = axum::body::to_bytes(res.into_body(), usize::MAX)
266 .await
267 .unwrap();
268 let json: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
269 let arr = json.as_array().expect("array");
270 assert_eq!(arr.len(), 1);
271 assert_eq!(json[0]["id"], "entry");
272 assert_eq!(json[0]["kind"], "core"); assert_eq!(json[0]["nav"][0]["path"], "/entry");
274 }
275
276 #[tokio::test]
278 async fn empty_when_no_modules() {
279 let st = state_with(vec![]).await;
280 let mut app = super::router().with_state(st);
281 let res = app
282 .call(
283 Request::builder()
284 .uri("/api/v1/modules")
285 .body(Body::empty())
286 .unwrap(),
287 )
288 .await
289 .unwrap();
290 assert_eq!(res.status(), 200);
291 let bytes = axum::body::to_bytes(res.into_body(), usize::MAX)
292 .await
293 .unwrap();
294 assert_eq!(&bytes[..], b"[]");
295 }
296
297 async fn send(st: AppState, req: Request<Body>) -> (axum::http::StatusCode, serde_json::Value) {
299 let mut app = super::router().with_state(st);
300 let res = app.call(req).await.unwrap();
301 let status = res.status();
302 let bytes = axum::body::to_bytes(res.into_body(), usize::MAX)
303 .await
304 .unwrap();
305 let json = if bytes.is_empty() {
306 serde_json::Value::Null
307 } else {
308 serde_json::from_slice(&bytes).unwrap_or(serde_json::Value::Null)
309 };
310 (status, json)
311 }
312
313 fn post_json(uri: &str, body: serde_json::Value) -> Request<Body> {
314 Request::builder()
315 .method("POST")
316 .uri(uri)
317 .header("content-type", "application/json")
318 .body(Body::from(body.to_string()))
319 .unwrap()
320 }
321
322 fn compiled_entry() -> ModuleManifest {
323 ModuleManifest::new(
324 "entry",
325 "Access Control",
326 "9.9.9",
327 "Heldar",
328 ModuleKind::Core,
329 "d",
330 vec![NavEntry::new("/entry", "Entry", "entry")],
331 )
332 }
333
334 #[tokio::test]
337 async fn register_list_unregister_lifecycle() {
338 let st = state_with(vec![compiled_entry()]).await;
339
340 let (status, json) = send(
341 st.clone(),
342 post_json(
343 "/api/v1/modules",
344 json!({
345 "id": "hello",
346 "name": "Hello Plugin",
347 "version": "1.0.0",
348 "publisher": "ACME",
349 "base_url": "http://127.0.0.1:9123",
350 "subscribes": ["zone_enter"],
351 "role": "integration"
352 }),
353 ),
354 )
355 .await;
356 assert_eq!(status, 201);
357 assert!(json["api_key"].as_str().unwrap().starts_with("vok_"));
358 assert!(json["webhook_secret"]
359 .as_str()
360 .unwrap()
361 .starts_with("whsec_"));
362 assert_eq!(json["module"]["base_url"], "http://127.0.0.1:9123");
363
364 let (_, list) = send(
365 st.clone(),
366 Request::builder()
367 .uri("/api/v1/modules")
368 .body(Body::empty())
369 .unwrap(),
370 )
371 .await;
372 let arr = list.as_array().unwrap();
373 assert_eq!(arr.len(), 2);
374 let hello = arr.iter().find(|m| m["id"] == "hello").unwrap();
375 assert_eq!(hello["kind"], "imported");
376 assert_eq!(hello["mount"], "iframe");
377 assert_eq!(hello["nav"][0]["path"], "/hello"); let role: Option<String> =
381 sqlx::query_scalar("SELECT role FROM api_keys WHERE name = 'module:hello'")
382 .fetch_optional(&st.pool)
383 .await
384 .unwrap();
385 assert_eq!(role.as_deref(), Some("integration"));
386 let url: Option<String> =
387 sqlx::query_scalar("SELECT url FROM webhook_subscriptions WHERE name = 'module:hello'")
388 .fetch_optional(&st.pool)
389 .await
390 .unwrap();
391 assert_eq!(url.as_deref(), Some("http://127.0.0.1:9123/heldar/events"));
392
393 let (status, _) = send(
394 st.clone(),
395 Request::builder()
396 .method("DELETE")
397 .uri("/api/v1/modules/hello")
398 .body(Body::empty())
399 .unwrap(),
400 )
401 .await;
402 assert_eq!(status, 204);
403
404 let (_, list) = send(
405 st.clone(),
406 Request::builder()
407 .uri("/api/v1/modules")
408 .body(Body::empty())
409 .unwrap(),
410 )
411 .await;
412 assert_eq!(list.as_array().unwrap().len(), 1);
413 let key_gone: Option<String> =
414 sqlx::query_scalar("SELECT id FROM api_keys WHERE name = 'module:hello'")
415 .fetch_optional(&st.pool)
416 .await
417 .unwrap();
418 assert!(key_gone.is_none());
419 let wh_gone: Option<String> =
420 sqlx::query_scalar("SELECT id FROM webhook_subscriptions WHERE name = 'module:hello'")
421 .fetch_optional(&st.pool)
422 .await
423 .unwrap();
424 assert!(wh_gone.is_none());
425 }
426
427 #[tokio::test]
429 async fn rejects_reserved_id() {
430 let st = state_with(vec![compiled_entry()]).await;
431 let (status, _) = send(
432 st,
433 post_json(
434 "/api/v1/modules",
435 json!({ "id": "entry", "name": "x", "base_url": "http://127.0.0.1:1" }),
436 ),
437 )
438 .await;
439 assert_eq!(status, 409);
440 }
441
442 #[tokio::test]
444 async fn rejects_privileged_role() {
445 let st = state_with(vec![]).await;
446 let (status, _) = send(
447 st,
448 post_json(
449 "/api/v1/modules",
450 json!({ "id": "x", "name": "x", "base_url": "http://127.0.0.1:1", "role": "admin" }),
451 ),
452 )
453 .await;
454 assert_eq!(status, 400);
455 }
456}