1use axum::extract::{Path, State};
8use axum::http::{header, HeaderMap, StatusCode};
9use axum::response::{AppendHeaders, IntoResponse};
10use axum::routing::{get, post};
11use axum::{Json, Router};
12use chrono::Utc;
13use serde_json::{json, Value};
14use uuid::Uuid;
15
16use crate::auth::{self, Principal, Role};
17use crate::error::{AppError, AppResult};
18use crate::models::{
19 ApiKey, ApiKeyCreate, ApiKeyView, LoginRequest, User, UserCreate, UserUpdate, UserView,
20};
21use crate::state::AppState;
22
23pub fn router() -> Router<AppState> {
24 Router::new()
25 .route("/api/v1/auth/login", post(login))
26 .route("/api/v1/auth/logout", post(logout))
27 .route("/api/v1/auth/me", get(me))
28 .route("/api/v1/users", get(list_users).post(create_user))
29 .route(
30 "/api/v1/users/{id}",
31 axum::routing::patch(update_user).delete(delete_user),
32 )
33 .route("/api/v1/users/{id}/unlock", post(unlock_user))
34 .route("/api/v1/api-keys", get(list_api_keys).post(create_api_key))
35 .route(
36 "/api/v1/api-keys/{id}",
37 axum::routing::delete(delete_api_key),
38 )
39}
40
41const MIN_PASSWORD_LEN: usize = 8;
42
43async fn login(
44 State(st): State<AppState>,
45 Json(body): Json<LoginRequest>,
46) -> AppResult<impl IntoResponse> {
47 let candidate = sqlx::query_as::<_, User>("SELECT * FROM users WHERE username = ?")
48 .bind(body.username.trim())
49 .fetch_optional(&st.pool)
50 .await?;
51 let phc = candidate
54 .as_ref()
55 .map(|u| u.password_hash.as_str())
56 .unwrap_or_else(|| auth::dummy_password_hash());
57 let password_ok = auth::verify_password(&body.password, phc);
58
59 let now = Utc::now();
60 let lockout = st.cfg.login_lockout_enabled();
61 let locked = lockout
62 && candidate
63 .as_ref()
64 .and_then(|u| u.locked_until)
65 .is_some_and(|until| until > now);
66 let user = match candidate {
67 Some(_) if locked => return Err(AppError::Unauthorized("invalid credentials".into())),
71 Some(u) if u.active && password_ok => u,
72 Some(u) if lockout && u.active => {
74 register_login_failure(&st.pool, &st.cfg, &u, now).await;
75 return Err(AppError::Unauthorized("invalid credentials".into()));
76 }
77 _ => return Err(AppError::Unauthorized("invalid credentials".into())),
78 };
79
80 if lockout && (user.failed_login_count != 0 || user.locked_until.is_some()) {
82 let _ = sqlx::query(
83 "UPDATE users SET failed_login_count = 0, locked_until = NULL WHERE id = ?",
84 )
85 .bind(&user.id)
86 .execute(&st.pool)
87 .await;
88 }
89
90 let (token, expires_at) = auth::issue_session(&st.pool, &st.cfg, &user.id).await?;
91 let principal = Principal {
92 id: user.id.clone(),
93 name: user
94 .display_name
95 .clone()
96 .unwrap_or_else(|| user.username.clone()),
97 role: Role::parse(&user.role).unwrap_or(Role::Viewer),
98 kind: crate::auth::PrincipalKind::User,
99 };
100 auth::audit(&st.pool, &principal, "login", "user", &user.id, json!({})).await;
101 let cookie = auth::session_cookie(&token, &st.cfg);
105 let body = Json(json!({
106 "token": token,
107 "expires_at": expires_at,
108 "user": UserView::from(user),
109 }));
110 Ok((AppendHeaders([(header::SET_COOKIE, cookie)]), body))
111}
112
113async fn register_login_failure(
116 pool: &sqlx::SqlitePool,
117 cfg: &crate::config::Config,
118 u: &User,
119 now: chrono::DateTime<Utc>,
120) {
121 let new_count = u.failed_login_count + 1;
122 if new_count >= cfg.login_max_failures {
123 let until = now + chrono::Duration::minutes(cfg.login_lockout_min);
124 let _ =
125 sqlx::query("UPDATE users SET failed_login_count = ?, locked_until = ? WHERE id = ?")
126 .bind(new_count)
127 .bind(until)
128 .bind(&u.id)
129 .execute(pool)
130 .await;
131 let principal = Principal {
132 id: u.id.clone(),
133 name: u.display_name.clone().unwrap_or_else(|| u.username.clone()),
134 role: Role::parse(&u.role).unwrap_or(Role::Viewer),
135 kind: auth::PrincipalKind::User,
136 };
137 auth::audit(
138 pool,
139 &principal,
140 "login_locked",
141 "user",
142 &u.id,
143 json!({ "locked_until": until }),
144 )
145 .await;
146 } else {
147 let _ = sqlx::query("UPDATE users SET failed_login_count = ? WHERE id = ?")
148 .bind(new_count)
149 .bind(&u.id)
150 .execute(pool)
151 .await;
152 }
153}
154
155async fn logout(State(st): State<AppState>, headers: HeaderMap) -> AppResult<impl IntoResponse> {
156 if let Some(tok) = auth::token_from_headers(&headers) {
157 auth::revoke_session(&st.pool, &tok).await?;
158 }
159 let cookie = auth::clear_session_cookie(&st.cfg);
161 Ok((
162 StatusCode::NO_CONTENT,
163 AppendHeaders([(header::SET_COOKIE, cookie)]),
164 ))
165}
166
167async fn me(principal: Principal) -> AppResult<Json<Value>> {
168 Ok(Json(json!({
169 "id": principal.id,
170 "name": principal.name,
171 "role": principal.role.as_str(),
172 "kind": match principal.kind {
173 crate::auth::PrincipalKind::User => "user",
174 crate::auth::PrincipalKind::ApiKey => "api_key",
175 crate::auth::PrincipalKind::System => "system",
176 },
177 })))
178}
179
180async fn list_users(
181 State(st): State<AppState>,
182 principal: Principal,
183) -> AppResult<Json<Vec<UserView>>> {
184 principal.require(principal.can_admin(), "manage users")?;
185 let users = sqlx::query_as::<_, User>("SELECT * FROM users ORDER BY username ASC")
186 .fetch_all(&st.pool)
187 .await?;
188 Ok(Json(users.into_iter().map(UserView::from).collect()))
189}
190
191async fn create_user(
192 State(st): State<AppState>,
193 principal: Principal,
194 Json(body): Json<UserCreate>,
195) -> AppResult<(StatusCode, Json<UserView>)> {
196 principal.require(principal.can_admin(), "create users")?;
197 let username = body.username.trim();
198 if username.is_empty() {
199 return Err(AppError::BadRequest("`username` is required".into()));
200 }
201 if body.password.len() < MIN_PASSWORD_LEN {
202 return Err(AppError::BadRequest(format!(
203 "`password` must be at least {MIN_PASSWORD_LEN} characters"
204 )));
205 }
206 let role = body.role.as_deref().unwrap_or("viewer");
207 if !Role::is_valid(role) {
208 return Err(AppError::BadRequest(
209 "`role` must be admin|manager|guard|viewer|integration".into(),
210 ));
211 }
212 let hash = auth::hash_password(&body.password)?;
213 let id = format!("usr_{}", Uuid::new_v4().simple());
214 let now = Utc::now();
215 sqlx::query(
216 "INSERT INTO users (id, username, password_hash, role, display_name, active, created_at, updated_at)
217 VALUES (?,?,?,?,?,?,?,?)",
218 )
219 .bind(&id)
220 .bind(username)
221 .bind(hash)
222 .bind(role)
223 .bind(&body.display_name)
224 .bind(body.active.unwrap_or(true))
225 .bind(now)
226 .bind(now)
227 .execute(&st.pool)
228 .await?;
229 auth::audit(
230 &st.pool,
231 &principal,
232 "create_user",
233 "user",
234 &id,
235 json!({ "role": role }),
236 )
237 .await;
238 let user = sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = ?")
239 .bind(&id)
240 .fetch_one(&st.pool)
241 .await?;
242 Ok((StatusCode::CREATED, Json(UserView::from(user))))
243}
244
245async fn update_user(
246 State(st): State<AppState>,
247 principal: Principal,
248 Path(id): Path<String>,
249 Json(body): Json<UserUpdate>,
250) -> AppResult<Json<UserView>> {
251 principal.require(principal.can_admin(), "modify users")?;
252 let cur = sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = ?")
253 .bind(&id)
254 .fetch_optional(&st.pool)
255 .await?
256 .ok_or_else(|| AppError::NotFound(format!("user {id} not found")))?;
257
258 let role = body.role.unwrap_or_else(|| cur.role.clone());
259 if !Role::is_valid(&role) {
260 return Err(AppError::BadRequest(
261 "`role` must be admin|manager|guard|viewer|integration".into(),
262 ));
263 }
264 let active = body.active.unwrap_or(cur.active);
265 let display_name = body.display_name.or(cur.display_name);
266 let password_hash = match body.password {
267 Some(p) if p.len() >= MIN_PASSWORD_LEN => auth::hash_password(&p)?,
268 Some(_) => {
269 return Err(AppError::BadRequest(format!(
270 "`password` must be at least {MIN_PASSWORD_LEN} characters"
271 )))
272 }
273 None => cur.password_hash,
274 };
275 let demoting_admin = cur.role == "admin" && (role != "admin" || !active);
280 let affected = if demoting_admin {
281 sqlx::query(
282 "UPDATE users SET password_hash=?, role=?, display_name=?, active=?, updated_at=?, \
283 failed_login_count=0, locked_until=NULL \
284 WHERE id=? AND EXISTS (SELECT 1 FROM users WHERE role='admin' AND active=1 AND id != ?)",
285 )
286 .bind(&password_hash)
287 .bind(&role)
288 .bind(&display_name)
289 .bind(active)
290 .bind(Utc::now())
291 .bind(&id)
292 .bind(&id)
293 .execute(&st.pool)
294 .await?
295 .rows_affected()
296 } else {
297 sqlx::query(
298 "UPDATE users SET password_hash=?, role=?, display_name=?, active=?, updated_at=?, \
299 failed_login_count=0, locked_until=NULL WHERE id=?",
300 )
301 .bind(&password_hash)
302 .bind(&role)
303 .bind(&display_name)
304 .bind(active)
305 .bind(Utc::now())
306 .bind(&id)
307 .execute(&st.pool)
308 .await?
309 .rows_affected()
310 };
311 if demoting_admin && affected == 0 {
312 return Err(AppError::BadRequest(
313 "cannot demote or disable the last active admin".into(),
314 ));
315 }
316 if !active {
318 let _ = sqlx::query("DELETE FROM sessions WHERE user_id = ?")
319 .bind(&id)
320 .execute(&st.pool)
321 .await;
322 }
323 auth::audit(
324 &st.pool,
325 &principal,
326 "update_user",
327 "user",
328 &id,
329 json!({ "role": role, "active": active }),
330 )
331 .await;
332 let user = sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = ?")
333 .bind(&id)
334 .fetch_one(&st.pool)
335 .await?;
336 Ok(Json(UserView::from(user)))
337}
338
339async fn unlock_user(
342 State(st): State<AppState>,
343 principal: Principal,
344 Path(id): Path<String>,
345) -> AppResult<Json<UserView>> {
346 principal.require(principal.can_admin(), "unlock users")?;
347 let res =
348 sqlx::query("UPDATE users SET failed_login_count = 0, locked_until = NULL WHERE id = ?")
349 .bind(&id)
350 .execute(&st.pool)
351 .await?;
352 if res.rows_affected() == 0 {
353 return Err(AppError::NotFound(format!("user {id} not found")));
354 }
355 auth::audit(&st.pool, &principal, "unlock_user", "user", &id, json!({})).await;
356 let user = sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = ?")
357 .bind(&id)
358 .fetch_one(&st.pool)
359 .await?;
360 Ok(Json(UserView::from(user)))
361}
362
363async fn delete_user(
364 State(st): State<AppState>,
365 principal: Principal,
366 Path(id): Path<String>,
367) -> AppResult<StatusCode> {
368 principal.require(principal.can_admin(), "delete users")?;
369 if principal.id == id {
370 return Err(AppError::BadRequest(
371 "cannot delete your own account".into(),
372 ));
373 }
374 let cur = sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = ?")
375 .bind(&id)
376 .fetch_optional(&st.pool)
377 .await?
378 .ok_or_else(|| AppError::NotFound(format!("user {id} not found")))?;
379 let affected = if cur.role == "admin" {
382 sqlx::query(
383 "DELETE FROM users WHERE id = ? AND EXISTS (SELECT 1 FROM users WHERE role='admin' AND active=1 AND id != ?)",
384 )
385 .bind(&id)
386 .bind(&id)
387 .execute(&st.pool)
388 .await?
389 .rows_affected()
390 } else {
391 sqlx::query("DELETE FROM users WHERE id = ?")
392 .bind(&id)
393 .execute(&st.pool)
394 .await?
395 .rows_affected()
396 };
397 if cur.role == "admin" && affected == 0 {
398 return Err(AppError::BadRequest(
399 "cannot delete the last active admin".into(),
400 ));
401 }
402 auth::audit(&st.pool, &principal, "delete_user", "user", &id, json!({})).await;
403 Ok(StatusCode::NO_CONTENT)
404}
405
406async fn list_api_keys(
407 State(st): State<AppState>,
408 principal: Principal,
409) -> AppResult<Json<Vec<ApiKeyView>>> {
410 principal.require(principal.can_admin(), "manage API keys")?;
411 let keys = sqlx::query_as::<_, ApiKey>("SELECT * FROM api_keys ORDER BY created_at DESC")
412 .fetch_all(&st.pool)
413 .await?;
414 Ok(Json(keys.into_iter().map(ApiKeyView::from).collect()))
415}
416
417async fn create_api_key(
418 State(st): State<AppState>,
419 principal: Principal,
420 Json(body): Json<ApiKeyCreate>,
421) -> AppResult<(StatusCode, Json<Value>)> {
422 principal.require(principal.can_admin(), "create API keys")?;
423 if body.name.trim().is_empty() {
424 return Err(AppError::BadRequest("`name` is required".into()));
425 }
426 let role = body.role.as_deref().unwrap_or("integration");
427 if !Role::is_valid(role) {
428 return Err(AppError::BadRequest(
429 "`role` must be admin|manager|guard|viewer|integration".into(),
430 ));
431 }
432 let key = auth::random_token(auth::APIKEY_PREFIX);
433 let prefix: String = key.chars().take(12).collect();
434 let id = format!("key_{}", Uuid::new_v4().simple());
435 sqlx::query(
436 "INSERT INTO api_keys (id, name, key_hash, key_prefix, role, active, created_at)
437 VALUES (?,?,?,?,?,1,?)",
438 )
439 .bind(&id)
440 .bind(body.name.trim())
441 .bind(auth::token_hash(&key))
442 .bind(&prefix)
443 .bind(role)
444 .bind(Utc::now())
445 .execute(&st.pool)
446 .await?;
447 auth::audit(
448 &st.pool,
449 &principal,
450 "create_api_key",
451 "api_key",
452 &id,
453 json!({ "role": role }),
454 )
455 .await;
456 Ok((
458 StatusCode::CREATED,
459 Json(json!({ "id": id, "name": body.name.trim(), "role": role, "key": key })),
460 ))
461}
462
463async fn delete_api_key(
464 State(st): State<AppState>,
465 principal: Principal,
466 Path(id): Path<String>,
467) -> AppResult<StatusCode> {
468 principal.require(principal.can_admin(), "delete API keys")?;
469 let res = sqlx::query("DELETE FROM api_keys WHERE id = ?")
470 .bind(&id)
471 .execute(&st.pool)
472 .await?;
473 if res.rows_affected() == 0 {
474 return Err(AppError::NotFound(format!("api key {id} not found")));
475 }
476 auth::audit(
477 &st.pool,
478 &principal,
479 "delete_api_key",
480 "api_key",
481 &id,
482 json!({}),
483 )
484 .await;
485 Ok(StatusCode::NO_CONTENT)
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491 use crate::config::Config;
492 use crate::services::recorder::RecorderManager;
493 use crate::services::sampler::SamplerManager;
494 use std::sync::Arc;
495
496 async fn test_state(auth_enabled: bool) -> AppState {
499 let pool = sqlx::sqlite::SqlitePoolOptions::new()
500 .max_connections(1)
501 .connect("sqlite::memory:")
502 .await
503 .unwrap();
504 crate::db::run_migrations(&pool).await.unwrap();
505 let mut cfg = Config::from_env();
506 cfg.auth_enabled = auth_enabled;
507 let cfg = Arc::new(cfg);
508 AppState {
509 recorder: RecorderManager::new(pool.clone(), cfg.clone()),
510 sampler: SamplerManager::new(pool.clone(), cfg.clone()),
511 mirror: None,
512 consumers: Arc::new(Vec::new()),
513 modules: Arc::new(Vec::new()),
514 catalog: Arc::new(crate::services::registry::CatalogService::new(&cfg)),
515 http: reqwest::Client::new(),
516 started_at: chrono::Utc::now(),
517 pool,
518 cfg,
519 }
520 }
521
522 fn viewer() -> Principal {
523 Principal {
524 id: "usr_viewer".into(),
525 name: "vee".into(),
526 role: Role::Viewer,
527 kind: auth::PrincipalKind::User,
528 }
529 }
530
531 #[tokio::test]
532 async fn me_reports_principal_role_and_kind() {
533 let Json(v) = me(Principal::system_admin()).await.unwrap();
535 assert_eq!(v["id"], "system");
536 assert_eq!(v["name"], "system");
537 assert_eq!(v["role"], "admin");
538 assert_eq!(v["kind"], "system");
539
540 let Json(v) = me(viewer()).await.unwrap();
542 assert_eq!(v["role"], "viewer");
543 assert_eq!(v["kind"], "user");
544 }
545
546 #[tokio::test]
547 async fn create_user_validation_rejects_bad_input() {
548 let st = test_state(false).await;
549
550 let err = create_user(
552 State(st.clone()),
553 Principal::system_admin(),
554 Json(UserCreate {
555 username: " ".into(),
556 password: "x".repeat(MIN_PASSWORD_LEN),
557 role: None,
558 display_name: None,
559 active: None,
560 }),
561 )
562 .await
563 .err()
564 .unwrap();
565 match err {
566 AppError::BadRequest(m) => assert!(m.contains("username")),
567 other => panic!("expected BadRequest, got {other:?}"),
568 }
569
570 let err = create_user(
572 State(st.clone()),
573 Principal::system_admin(),
574 Json(UserCreate {
575 username: "joe".into(),
576 password: "x".repeat(MIN_PASSWORD_LEN - 1),
577 role: None,
578 display_name: None,
579 active: None,
580 }),
581 )
582 .await
583 .err()
584 .unwrap();
585 match err {
586 AppError::BadRequest(m) => assert!(m.contains("password")),
587 other => panic!("expected BadRequest, got {other:?}"),
588 }
589
590 let err = create_user(
592 State(st.clone()),
593 Principal::system_admin(),
594 Json(UserCreate {
595 username: "joe".into(),
596 password: "x".repeat(MIN_PASSWORD_LEN),
597 role: Some("superuser".into()),
598 display_name: None,
599 active: None,
600 }),
601 )
602 .await
603 .err()
604 .unwrap();
605 match err {
606 AppError::BadRequest(m) => assert!(m.contains("role")),
607 other => panic!("expected BadRequest, got {other:?}"),
608 }
609 }
610
611 #[tokio::test]
612 async fn create_user_defaults_and_list_orders() {
613 let st = test_state(false).await;
614
615 let (status, Json(uv)) = create_user(
617 State(st.clone()),
618 Principal::system_admin(),
619 Json(UserCreate {
620 username: " bravo ".into(),
621 password: "x".repeat(MIN_PASSWORD_LEN),
622 role: None,
623 display_name: None,
624 active: None,
625 }),
626 )
627 .await
628 .unwrap();
629 assert_eq!(status, StatusCode::CREATED);
630 assert_eq!(uv.username, "bravo");
631 assert_eq!(uv.role, "viewer");
632 assert!(uv.active);
633
634 let _ = create_user(
635 State(st.clone()),
636 Principal::system_admin(),
637 Json(UserCreate {
638 username: "alpha".into(),
639 password: "x".repeat(MIN_PASSWORD_LEN),
640 role: Some("manager".into()),
641 display_name: Some("Al".into()),
642 active: None,
643 }),
644 )
645 .await
646 .unwrap();
647
648 let Json(users) = list_users(State(st.clone()), Principal::system_admin())
650 .await
651 .unwrap();
652 assert_eq!(users.len(), 2);
653 assert_eq!(users[0].username, "alpha");
654 assert_eq!(users[1].username, "bravo");
655 assert_eq!(users[0].role, "manager");
656 }
657
658 #[tokio::test]
659 async fn non_admin_is_forbidden() {
660 let st = test_state(false).await;
661
662 let err = list_users(State(st.clone()), viewer()).await.err().unwrap();
663 assert!(matches!(err, AppError::Forbidden(_)));
664
665 let err = create_api_key(
666 State(st.clone()),
667 viewer(),
668 Json(ApiKeyCreate {
669 name: "k".into(),
670 role: None,
671 }),
672 )
673 .await
674 .err()
675 .unwrap();
676 assert!(matches!(err, AppError::Forbidden(_)));
677 }
678
679 #[tokio::test]
680 async fn delete_user_rejects_self() {
681 let st = test_state(false).await;
682 let err = delete_user(
685 State(st.clone()),
686 Principal::system_admin(),
687 Path("system".to_string()),
688 )
689 .await
690 .err()
691 .unwrap();
692 assert!(matches!(err, AppError::BadRequest(_)));
693 }
694
695 #[tokio::test]
696 async fn update_user_protects_last_admin() {
697 let st = test_state(false).await;
698
699 let (_, Json(admin)) = create_user(
701 State(st.clone()),
702 Principal::system_admin(),
703 Json(UserCreate {
704 username: "rootadmin".into(),
705 password: "x".repeat(MIN_PASSWORD_LEN),
706 role: Some("admin".into()),
707 display_name: None,
708 active: None,
709 }),
710 )
711 .await
712 .unwrap();
713
714 let err = update_user(
716 State(st.clone()),
717 Principal::system_admin(),
718 Path(admin.id.clone()),
719 Json(UserUpdate {
720 role: Some("viewer".into()),
721 ..Default::default()
722 }),
723 )
724 .await
725 .err()
726 .unwrap();
727 assert!(matches!(err, AppError::BadRequest(_)));
728 }
729
730 async fn state_with_pool(pool: sqlx::SqlitePool) -> AppState {
733 let mut cfg = Config::from_env();
734 cfg.auth_enabled = false;
735 let cfg = std::sync::Arc::new(cfg);
736 AppState {
737 recorder: RecorderManager::new(pool.clone(), cfg.clone()),
738 sampler: SamplerManager::new(pool.clone(), cfg.clone()),
739 mirror: None,
740 consumers: std::sync::Arc::new(Vec::new()),
741 modules: std::sync::Arc::new(Vec::new()),
742 catalog: std::sync::Arc::new(crate::services::registry::CatalogService::new(&cfg)),
743 http: reqwest::Client::new(),
744 started_at: chrono::Utc::now(),
745 pool,
746 cfg,
747 }
748 }
749
750 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
751 async fn concurrent_demotion_cannot_drain_the_last_admin() {
752 let dbpath =
755 std::env::temp_dir().join(format!("heldar-authrace-{}.db", std::process::id()));
756 let _ = std::fs::remove_file(&dbpath);
757 let url = format!("sqlite://{}?mode=rwc", dbpath.display());
758 let pool = sqlx::sqlite::SqlitePoolOptions::new()
759 .max_connections(4)
760 .connect(&url)
761 .await
762 .unwrap();
763 crate::db::run_migrations(&pool).await.unwrap();
764 let st = state_with_pool(pool.clone()).await;
765
766 let mut ids = Vec::new();
768 for u in ["admin_a", "admin_b"] {
769 let (_, Json(v)) = create_user(
770 State(st.clone()),
771 Principal::system_admin(),
772 Json(UserCreate {
773 username: u.into(),
774 password: "x".repeat(MIN_PASSWORD_LEN),
775 role: Some("admin".into()),
776 display_name: None,
777 active: None,
778 }),
779 )
780 .await
781 .unwrap();
782 ids.push(v.id);
783 }
784
785 let demote = || {
786 Json(UserUpdate {
787 role: Some("viewer".into()),
788 ..Default::default()
789 })
790 };
791 let (r1, r2) = tokio::join!(
794 update_user(
795 State(st.clone()),
796 Principal::system_admin(),
797 Path(ids[0].clone()),
798 demote(),
799 ),
800 update_user(
801 State(st.clone()),
802 Principal::system_admin(),
803 Path(ids[1].clone()),
804 demote(),
805 ),
806 );
807
808 let rejected = [r1.is_err(), r2.is_err()]
809 .into_iter()
810 .filter(|e| *e)
811 .count();
812 let remaining: i64 =
813 sqlx::query_scalar("SELECT COUNT(*) FROM users WHERE role='admin' AND active=1")
814 .fetch_one(&pool)
815 .await
816 .unwrap();
817 let _ = std::fs::remove_file(&dbpath);
818 assert!(
819 remaining >= 1,
820 "LOCKOUT: concurrent demotions drained all active admins (remaining={remaining})"
821 );
822 assert!(
823 rejected >= 1,
824 "at least one of two concurrent last-admin demotions must be rejected"
825 );
826 }
827
828 #[tokio::test]
829 async fn create_api_key_shape_and_validation() {
830 let st = test_state(false).await;
831
832 let err = create_api_key(
834 State(st.clone()),
835 Principal::system_admin(),
836 Json(ApiKeyCreate {
837 name: " ".into(),
838 role: None,
839 }),
840 )
841 .await
842 .err()
843 .unwrap();
844 assert!(matches!(err, AppError::BadRequest(_)));
845
846 let (status, Json(v)) = create_api_key(
848 State(st.clone()),
849 Principal::system_admin(),
850 Json(ApiKeyCreate {
851 name: " cam-bridge ".into(),
852 role: None,
853 }),
854 )
855 .await
856 .unwrap();
857 assert_eq!(status, StatusCode::CREATED);
858 assert_eq!(v["name"], "cam-bridge");
859 assert_eq!(v["role"], "integration");
860 let key = v["key"].as_str().unwrap();
861 assert!(key.starts_with(auth::APIKEY_PREFIX));
862 }
863
864 #[tokio::test]
865 async fn login_unknown_wrong_then_success() {
866 let st = test_state(false).await;
867
868 let err = login(
870 State(st.clone()),
871 Json(LoginRequest {
872 username: "ghost".into(),
873 password: "whatever1".into(),
874 }),
875 )
876 .await
877 .err()
878 .unwrap();
879 assert!(matches!(err, AppError::Unauthorized(_)));
880
881 let _ = create_user(
883 State(st.clone()),
884 Principal::system_admin(),
885 Json(UserCreate {
886 username: "operator".into(),
887 password: "operator-pass".into(),
888 role: Some("manager".into()),
889 display_name: None,
890 active: None,
891 }),
892 )
893 .await
894 .unwrap();
895
896 let err = login(
898 State(st.clone()),
899 Json(LoginRequest {
900 username: "operator".into(),
901 password: "not-the-pass".into(),
902 }),
903 )
904 .await
905 .err()
906 .unwrap();
907 assert!(matches!(err, AppError::Unauthorized(_)));
908
909 let resp = login(
911 State(st.clone()),
912 Json(LoginRequest {
913 username: "operator".into(),
914 password: "operator-pass".into(),
915 }),
916 )
917 .await
918 .unwrap()
919 .into_response();
920 assert_eq!(resp.status(), StatusCode::OK);
921 let set_cookie = resp
922 .headers()
923 .get(header::SET_COOKIE)
924 .unwrap()
925 .to_str()
926 .unwrap();
927 assert!(set_cookie.contains(auth::SESSION_COOKIE));
928 assert!(set_cookie.contains("HttpOnly"));
929
930 let sessions: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sessions")
931 .fetch_one(&st.pool)
932 .await
933 .unwrap();
934 assert_eq!(sessions, 1);
935 }
936
937 async fn test_state_lockout(max_failures: i64, lockout_min: i64) -> AppState {
940 let pool = sqlx::sqlite::SqlitePoolOptions::new()
941 .max_connections(1)
942 .connect("sqlite::memory:")
943 .await
944 .unwrap();
945 crate::db::run_migrations(&pool).await.unwrap();
946 let mut cfg = Config::from_env();
947 cfg.auth_enabled = true;
948 cfg.login_max_failures = max_failures;
949 cfg.login_lockout_min = lockout_min;
950 let cfg = Arc::new(cfg);
951 AppState {
952 recorder: RecorderManager::new(pool.clone(), cfg.clone()),
953 sampler: SamplerManager::new(pool.clone(), cfg.clone()),
954 mirror: None,
955 consumers: Arc::new(Vec::new()),
956 modules: Arc::new(Vec::new()),
957 catalog: Arc::new(crate::services::registry::CatalogService::new(&cfg)),
958 http: reqwest::Client::new(),
959 started_at: chrono::Utc::now(),
960 pool,
961 cfg,
962 }
963 }
964
965 async fn seed_user(st: &AppState, username: &str, password: &str) {
966 let _ = create_user(
967 State(st.clone()),
968 Principal::system_admin(),
969 Json(UserCreate {
970 username: username.into(),
971 password: password.into(),
972 role: Some("manager".into()),
973 display_name: None,
974 active: None,
975 }),
976 )
977 .await
978 .unwrap();
979 }
980
981 async fn try_login(
982 st: &AppState,
983 username: &str,
984 password: &str,
985 ) -> AppResult<impl IntoResponse> {
986 login(
987 State(st.clone()),
988 Json(LoginRequest {
989 username: username.into(),
990 password: password.into(),
991 }),
992 )
993 .await
994 }
995
996 #[tokio::test]
997 async fn login_locks_after_max_failures_and_rejects_correct_password() {
998 let st = test_state_lockout(3, 15).await;
999 seed_user(&st, "op", "correct-pass").await;
1000 for _ in 0..3 {
1001 assert!(matches!(
1002 try_login(&st, "op", "wrong").await.err().unwrap(),
1003 AppError::Unauthorized(_)
1004 ));
1005 }
1006 let (count, locked): (i64, Option<String>) = sqlx::query_as(
1007 "SELECT failed_login_count, locked_until FROM users WHERE username='op'",
1008 )
1009 .fetch_one(&st.pool)
1010 .await
1011 .unwrap();
1012 assert_eq!(count, 3);
1013 assert!(
1014 locked.is_some(),
1015 "account should be locked after 3 failures"
1016 );
1017
1018 assert!(matches!(
1020 try_login(&st, "op", "correct-pass").await.err().unwrap(),
1021 AppError::Unauthorized(_)
1022 ));
1023 let sessions: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sessions")
1024 .fetch_one(&st.pool)
1025 .await
1026 .unwrap();
1027 assert_eq!(sessions, 0, "a locked account must not get a session");
1028 }
1029
1030 #[tokio::test]
1031 async fn successful_login_resets_failure_count() {
1032 let st = test_state_lockout(3, 15).await;
1033 seed_user(&st, "op", "correct-pass").await;
1034 let _ = try_login(&st, "op", "wrong").await;
1035 let _ = try_login(&st, "op", "wrong").await; let resp = try_login(&st, "op", "correct-pass")
1037 .await
1038 .unwrap()
1039 .into_response();
1040 assert_eq!(resp.status(), StatusCode::OK);
1041 let (count, locked): (i64, Option<String>) = sqlx::query_as(
1042 "SELECT failed_login_count, locked_until FROM users WHERE username='op'",
1043 )
1044 .fetch_one(&st.pool)
1045 .await
1046 .unwrap();
1047 assert_eq!(count, 0);
1048 assert!(locked.is_none());
1049 }
1050
1051 #[tokio::test]
1052 async fn auto_unlock_after_window() {
1053 let st = test_state_lockout(2, 15).await;
1054 seed_user(&st, "op", "correct-pass").await;
1055 let _ = try_login(&st, "op", "wrong").await;
1056 let _ = try_login(&st, "op", "wrong").await;
1057 sqlx::query("UPDATE users SET locked_until = ? WHERE username='op'")
1059 .bind(Utc::now() - chrono::Duration::minutes(1))
1060 .execute(&st.pool)
1061 .await
1062 .unwrap();
1063 let resp = try_login(&st, "op", "correct-pass")
1064 .await
1065 .unwrap()
1066 .into_response();
1067 assert_eq!(resp.status(), StatusCode::OK);
1068 }
1069
1070 #[tokio::test]
1071 async fn manual_unlock_clears_lock() {
1072 let st = test_state_lockout(2, 15).await;
1073 seed_user(&st, "op", "correct-pass").await;
1074 let _ = try_login(&st, "op", "wrong").await;
1075 let _ = try_login(&st, "op", "wrong").await;
1076 let uid: String = sqlx::query_scalar("SELECT id FROM users WHERE username='op'")
1077 .fetch_one(&st.pool)
1078 .await
1079 .unwrap();
1080 let _ = unlock_user(State(st.clone()), Principal::system_admin(), Path(uid))
1081 .await
1082 .unwrap();
1083 let resp = try_login(&st, "op", "correct-pass")
1084 .await
1085 .unwrap()
1086 .into_response();
1087 assert_eq!(resp.status(), StatusCode::OK);
1088 }
1089
1090 #[tokio::test]
1091 async fn lockout_disabled_when_zero_never_locks() {
1092 let st = test_state_lockout(0, 15).await; seed_user(&st, "op", "correct-pass").await;
1094 for _ in 0..10 {
1095 let _ = try_login(&st, "op", "wrong").await;
1096 }
1097 let locked: Option<String> =
1098 sqlx::query_scalar("SELECT locked_until FROM users WHERE username='op'")
1099 .fetch_one(&st.pool)
1100 .await
1101 .unwrap();
1102 assert!(locked.is_none(), "lockout disabled must never lock");
1103 let resp = try_login(&st, "op", "correct-pass")
1104 .await
1105 .unwrap()
1106 .into_response();
1107 assert_eq!(resp.status(), StatusCode::OK);
1108 }
1109
1110 #[tokio::test]
1111 async fn logout_is_no_content_and_clears_cookie() {
1112 let st = test_state(false).await;
1113 let resp = logout(State(st.clone()), HeaderMap::new())
1115 .await
1116 .unwrap()
1117 .into_response();
1118 assert_eq!(resp.status(), StatusCode::NO_CONTENT);
1119 let set_cookie = resp
1120 .headers()
1121 .get(header::SET_COOKIE)
1122 .unwrap()
1123 .to_str()
1124 .unwrap();
1125 assert!(set_cookie.contains(auth::SESSION_COOKIE));
1126 assert!(set_cookie.contains("Max-Age=0"));
1127 }
1128}