Skip to main content

heldar_kernel/routes/
auth.rs

1//! Stage 4 auth + user/API-key administration.
2//!
3//! `/auth/login` exchanges username+password for a bearer session token; `/auth/logout` revokes it;
4//! `/auth/me` reports the caller. `/users` and `/api-keys` are admin-only management surfaces. All
5//! mutations are written to the immutable audit log.
6
7use axum::extract::{Path, State};
8use axum::http::{header, HeaderMap, StatusCode};
9use axum::response::{AppendHeaders, IntoResponse};
10use axum::routing::{get, post};
11use axum::{Json, Router};
12use chrono::Utc;
13use serde_json::{json, Value};
14use uuid::Uuid;
15
16use crate::auth::{self, Principal, Role};
17use crate::error::{AppError, AppResult};
18use crate::models::{
19    ApiKey, ApiKeyCreate, ApiKeyView, LoginRequest, User, UserCreate, UserUpdate, UserView,
20};
21use crate::state::AppState;
22
23pub fn router() -> Router<AppState> {
24    Router::new()
25        .route("/api/v1/auth/login", post(login))
26        .route("/api/v1/auth/logout", post(logout))
27        .route("/api/v1/auth/me", get(me))
28        .route("/api/v1/users", get(list_users).post(create_user))
29        .route(
30            "/api/v1/users/{id}",
31            axum::routing::patch(update_user).delete(delete_user),
32        )
33        .route("/api/v1/users/{id}/unlock", post(unlock_user))
34        .route("/api/v1/api-keys", get(list_api_keys).post(create_api_key))
35        .route(
36            "/api/v1/api-keys/{id}",
37            axum::routing::delete(delete_api_key),
38        )
39}
40
41const MIN_PASSWORD_LEN: usize = 8;
42
43async fn login(
44    State(st): State<AppState>,
45    Json(body): Json<LoginRequest>,
46) -> AppResult<impl IntoResponse> {
47    let candidate = sqlx::query_as::<_, User>("SELECT * FROM users WHERE username = ?")
48        .bind(body.username.trim())
49        .fetch_optional(&st.pool)
50        .await?;
51    // Always run argon2 verification (against a dummy hash when the user is missing/disabled) so
52    // login latency does not reveal whether an account exists. The error is uniform too.
53    let phc = candidate
54        .as_ref()
55        .map(|u| u.password_hash.as_str())
56        .unwrap_or_else(|| auth::dummy_password_hash());
57    let password_ok = auth::verify_password(&body.password, phc);
58
59    let now = Utc::now();
60    let lockout = st.cfg.login_lockout_enabled();
61    let locked = lockout
62        && candidate
63            .as_ref()
64            .and_then(|u| u.locked_until)
65            .is_some_and(|until| until > now);
66    let user = match candidate {
67        // A locked account is refused EVEN with the correct password, and its counter is NOT advanced
68        // (a locked-out attacker can't extend the lock). The response stays the uniform 401 — a distinct
69        // "account locked" message would be a lock/enumeration oracle.
70        Some(_) if locked => return Err(AppError::Unauthorized("invalid credentials".into())),
71        Some(u) if u.active && password_ok => u,
72        // Failed login for a real, active account: bump the counter and lock at the threshold.
73        Some(u) if lockout && u.active => {
74            register_login_failure(&st.pool, &st.cfg, &u, now).await;
75            return Err(AppError::Unauthorized("invalid credentials".into()));
76        }
77        _ => return Err(AppError::Unauthorized("invalid credentials".into())),
78    };
79
80    // Success: clear any prior failure/lock state before issuing the session.
81    if lockout && (user.failed_login_count != 0 || user.locked_until.is_some()) {
82        let _ = sqlx::query(
83            "UPDATE users SET failed_login_count = 0, locked_until = NULL WHERE id = ?",
84        )
85        .bind(&user.id)
86        .execute(&st.pool)
87        .await;
88    }
89
90    let (token, expires_at) = auth::issue_session(&st.pool, &st.cfg, &user.id).await?;
91    let principal = Principal {
92        id: user.id.clone(),
93        name: user
94            .display_name
95            .clone()
96            .unwrap_or_else(|| user.username.clone()),
97        role: Role::parse(&user.role).unwrap_or(Role::Viewer),
98        kind: crate::auth::PrincipalKind::User,
99    };
100    auth::audit(&st.pool, &principal, "login", "user", &user.id, json!({})).await;
101    // Set the session as an HttpOnly cookie (browser auth: not JS-readable, so XSS can't exfiltrate
102    // it; the media plane gets it automatically since the SPA is same-origin). The token is still in
103    // the body for non-browser clients; browsers should ignore it and rely on the cookie.
104    let cookie = auth::session_cookie(&token, &st.cfg);
105    let body = Json(json!({
106        "token": token,
107        "expires_at": expires_at,
108        "user": UserView::from(user),
109    }));
110    Ok((AppendHeaders([(header::SET_COOKIE, cookie)]), body))
111}
112
113/// Record a failed login for a real, active account; lock it once `login_max_failures` consecutive
114/// failures are reached. Audits the lock transition (once) so a brute-force attempt is visible.
115async fn register_login_failure(
116    pool: &sqlx::SqlitePool,
117    cfg: &crate::config::Config,
118    u: &User,
119    now: chrono::DateTime<Utc>,
120) {
121    let new_count = u.failed_login_count + 1;
122    if new_count >= cfg.login_max_failures {
123        let until = now + chrono::Duration::minutes(cfg.login_lockout_min);
124        let _ =
125            sqlx::query("UPDATE users SET failed_login_count = ?, locked_until = ? WHERE id = ?")
126                .bind(new_count)
127                .bind(until)
128                .bind(&u.id)
129                .execute(pool)
130                .await;
131        let principal = Principal {
132            id: u.id.clone(),
133            name: u.display_name.clone().unwrap_or_else(|| u.username.clone()),
134            role: Role::parse(&u.role).unwrap_or(Role::Viewer),
135            kind: auth::PrincipalKind::User,
136        };
137        auth::audit(
138            pool,
139            &principal,
140            "login_locked",
141            "user",
142            &u.id,
143            json!({ "locked_until": until }),
144        )
145        .await;
146    } else {
147        let _ = sqlx::query("UPDATE users SET failed_login_count = ? WHERE id = ?")
148            .bind(new_count)
149            .bind(&u.id)
150            .execute(pool)
151            .await;
152    }
153}
154
155async fn logout(State(st): State<AppState>, headers: HeaderMap) -> AppResult<impl IntoResponse> {
156    if let Some(tok) = auth::token_from_headers(&headers) {
157        auth::revoke_session(&st.pool, &tok).await?;
158    }
159    // Clear the session cookie regardless (idempotent logout).
160    let cookie = auth::clear_session_cookie(&st.cfg);
161    Ok((
162        StatusCode::NO_CONTENT,
163        AppendHeaders([(header::SET_COOKIE, cookie)]),
164    ))
165}
166
167async fn me(principal: Principal) -> AppResult<Json<Value>> {
168    Ok(Json(json!({
169        "id": principal.id,
170        "name": principal.name,
171        "role": principal.role.as_str(),
172        "kind": match principal.kind {
173            crate::auth::PrincipalKind::User => "user",
174            crate::auth::PrincipalKind::ApiKey => "api_key",
175            crate::auth::PrincipalKind::System => "system",
176        },
177    })))
178}
179
180async fn list_users(
181    State(st): State<AppState>,
182    principal: Principal,
183) -> AppResult<Json<Vec<UserView>>> {
184    principal.require(principal.can_admin(), "manage users")?;
185    let users = sqlx::query_as::<_, User>("SELECT * FROM users ORDER BY username ASC")
186        .fetch_all(&st.pool)
187        .await?;
188    Ok(Json(users.into_iter().map(UserView::from).collect()))
189}
190
191async fn create_user(
192    State(st): State<AppState>,
193    principal: Principal,
194    Json(body): Json<UserCreate>,
195) -> AppResult<(StatusCode, Json<UserView>)> {
196    principal.require(principal.can_admin(), "create users")?;
197    let username = body.username.trim();
198    if username.is_empty() {
199        return Err(AppError::BadRequest("`username` is required".into()));
200    }
201    if body.password.len() < MIN_PASSWORD_LEN {
202        return Err(AppError::BadRequest(format!(
203            "`password` must be at least {MIN_PASSWORD_LEN} characters"
204        )));
205    }
206    let role = body.role.as_deref().unwrap_or("viewer");
207    if !Role::is_valid(role) {
208        return Err(AppError::BadRequest(
209            "`role` must be admin|manager|guard|viewer|integration".into(),
210        ));
211    }
212    let hash = auth::hash_password(&body.password)?;
213    let id = format!("usr_{}", Uuid::new_v4().simple());
214    let now = Utc::now();
215    sqlx::query(
216        "INSERT INTO users (id, username, password_hash, role, display_name, active, created_at, updated_at)
217         VALUES (?,?,?,?,?,?,?,?)",
218    )
219    .bind(&id)
220    .bind(username)
221    .bind(hash)
222    .bind(role)
223    .bind(&body.display_name)
224    .bind(body.active.unwrap_or(true))
225    .bind(now)
226    .bind(now)
227    .execute(&st.pool)
228    .await?;
229    auth::audit(
230        &st.pool,
231        &principal,
232        "create_user",
233        "user",
234        &id,
235        json!({ "role": role }),
236    )
237    .await;
238    let user = sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = ?")
239        .bind(&id)
240        .fetch_one(&st.pool)
241        .await?;
242    Ok((StatusCode::CREATED, Json(UserView::from(user))))
243}
244
245async fn update_user(
246    State(st): State<AppState>,
247    principal: Principal,
248    Path(id): Path<String>,
249    Json(body): Json<UserUpdate>,
250) -> AppResult<Json<UserView>> {
251    principal.require(principal.can_admin(), "modify users")?;
252    let cur = sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = ?")
253        .bind(&id)
254        .fetch_optional(&st.pool)
255        .await?
256        .ok_or_else(|| AppError::NotFound(format!("user {id} not found")))?;
257
258    let role = body.role.unwrap_or_else(|| cur.role.clone());
259    if !Role::is_valid(&role) {
260        return Err(AppError::BadRequest(
261            "`role` must be admin|manager|guard|viewer|integration".into(),
262        ));
263    }
264    let active = body.active.unwrap_or(cur.active);
265    let display_name = body.display_name.or(cur.display_name);
266    let password_hash = match body.password {
267        Some(p) if p.len() >= MIN_PASSWORD_LEN => auth::hash_password(&p)?,
268        Some(_) => {
269            return Err(AppError::BadRequest(format!(
270                "`password` must be at least {MIN_PASSWORD_LEN} characters"
271            )))
272        }
273        None => cur.password_hash,
274    };
275    // Lockout guard, ATOMIC: when this change demotes/disables an admin, the UPDATE only applies if
276    // ANOTHER active admin still exists at write time. SQLite serializes writers, so two concurrent
277    // demotions of different admins cannot both succeed — the second finds the EXISTS false and is
278    // rejected, always leaving an admin standing. (A separate COUNT-then-UPDATE would race.)
279    let demoting_admin = cur.role == "admin" && (role != "admin" || !active);
280    let affected = if demoting_admin {
281        sqlx::query(
282            "UPDATE users SET password_hash=?, role=?, display_name=?, active=?, updated_at=?, \
283             failed_login_count=0, locked_until=NULL \
284             WHERE id=? AND EXISTS (SELECT 1 FROM users WHERE role='admin' AND active=1 AND id != ?)",
285        )
286        .bind(&password_hash)
287        .bind(&role)
288        .bind(&display_name)
289        .bind(active)
290        .bind(Utc::now())
291        .bind(&id)
292        .bind(&id)
293        .execute(&st.pool)
294        .await?
295        .rows_affected()
296    } else {
297        sqlx::query(
298            "UPDATE users SET password_hash=?, role=?, display_name=?, active=?, updated_at=?, \
299             failed_login_count=0, locked_until=NULL WHERE id=?",
300        )
301        .bind(&password_hash)
302        .bind(&role)
303        .bind(&display_name)
304        .bind(active)
305        .bind(Utc::now())
306        .bind(&id)
307        .execute(&st.pool)
308        .await?
309        .rows_affected()
310    };
311    if demoting_admin && affected == 0 {
312        return Err(AppError::BadRequest(
313            "cannot demote or disable the last active admin".into(),
314        ));
315    }
316    // Revoke sessions if the account was disabled.
317    if !active {
318        let _ = sqlx::query("DELETE FROM sessions WHERE user_id = ?")
319            .bind(&id)
320            .execute(&st.pool)
321            .await;
322    }
323    auth::audit(
324        &st.pool,
325        &principal,
326        "update_user",
327        "user",
328        &id,
329        json!({ "role": role, "active": active }),
330    )
331    .await;
332    let user = sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = ?")
333        .bind(&id)
334        .fetch_one(&st.pool)
335        .await?;
336    Ok(Json(UserView::from(user)))
337}
338
339/// Admin-only: clear a user's brute-force lockout (reset the failure counter + unlock immediately),
340/// without otherwise editing the account. Auto-unlock still happens on its own once the window passes.
341async fn unlock_user(
342    State(st): State<AppState>,
343    principal: Principal,
344    Path(id): Path<String>,
345) -> AppResult<Json<UserView>> {
346    principal.require(principal.can_admin(), "unlock users")?;
347    let res =
348        sqlx::query("UPDATE users SET failed_login_count = 0, locked_until = NULL WHERE id = ?")
349            .bind(&id)
350            .execute(&st.pool)
351            .await?;
352    if res.rows_affected() == 0 {
353        return Err(AppError::NotFound(format!("user {id} not found")));
354    }
355    auth::audit(&st.pool, &principal, "unlock_user", "user", &id, json!({})).await;
356    let user = sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = ?")
357        .bind(&id)
358        .fetch_one(&st.pool)
359        .await?;
360    Ok(Json(UserView::from(user)))
361}
362
363async fn delete_user(
364    State(st): State<AppState>,
365    principal: Principal,
366    Path(id): Path<String>,
367) -> AppResult<StatusCode> {
368    principal.require(principal.can_admin(), "delete users")?;
369    if principal.id == id {
370        return Err(AppError::BadRequest(
371            "cannot delete your own account".into(),
372        ));
373    }
374    let cur = sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = ?")
375        .bind(&id)
376        .fetch_optional(&st.pool)
377        .await?
378        .ok_or_else(|| AppError::NotFound(format!("user {id} not found")))?;
379    // Atomic last-admin guard (see update_user): the conditional DELETE removes an admin only if
380    // another active admin still exists, so concurrent deletes cannot drain the admins to zero.
381    let affected = if cur.role == "admin" {
382        sqlx::query(
383            "DELETE FROM users WHERE id = ? AND EXISTS (SELECT 1 FROM users WHERE role='admin' AND active=1 AND id != ?)",
384        )
385        .bind(&id)
386        .bind(&id)
387        .execute(&st.pool)
388        .await?
389        .rows_affected()
390    } else {
391        sqlx::query("DELETE FROM users WHERE id = ?")
392            .bind(&id)
393            .execute(&st.pool)
394            .await?
395            .rows_affected()
396    };
397    if cur.role == "admin" && affected == 0 {
398        return Err(AppError::BadRequest(
399            "cannot delete the last active admin".into(),
400        ));
401    }
402    auth::audit(&st.pool, &principal, "delete_user", "user", &id, json!({})).await;
403    Ok(StatusCode::NO_CONTENT)
404}
405
406async fn list_api_keys(
407    State(st): State<AppState>,
408    principal: Principal,
409) -> AppResult<Json<Vec<ApiKeyView>>> {
410    principal.require(principal.can_admin(), "manage API keys")?;
411    let keys = sqlx::query_as::<_, ApiKey>("SELECT * FROM api_keys ORDER BY created_at DESC")
412        .fetch_all(&st.pool)
413        .await?;
414    Ok(Json(keys.into_iter().map(ApiKeyView::from).collect()))
415}
416
417async fn create_api_key(
418    State(st): State<AppState>,
419    principal: Principal,
420    Json(body): Json<ApiKeyCreate>,
421) -> AppResult<(StatusCode, Json<Value>)> {
422    principal.require(principal.can_admin(), "create API keys")?;
423    if body.name.trim().is_empty() {
424        return Err(AppError::BadRequest("`name` is required".into()));
425    }
426    let role = body.role.as_deref().unwrap_or("integration");
427    if !Role::is_valid(role) {
428        return Err(AppError::BadRequest(
429            "`role` must be admin|manager|guard|viewer|integration".into(),
430        ));
431    }
432    let key = auth::random_token(auth::APIKEY_PREFIX);
433    let prefix: String = key.chars().take(12).collect();
434    let id = format!("key_{}", Uuid::new_v4().simple());
435    sqlx::query(
436        "INSERT INTO api_keys (id, name, key_hash, key_prefix, role, active, created_at)
437         VALUES (?,?,?,?,?,1,?)",
438    )
439    .bind(&id)
440    .bind(body.name.trim())
441    .bind(auth::token_hash(&key))
442    .bind(&prefix)
443    .bind(role)
444    .bind(Utc::now())
445    .execute(&st.pool)
446    .await?;
447    auth::audit(
448        &st.pool,
449        &principal,
450        "create_api_key",
451        "api_key",
452        &id,
453        json!({ "role": role }),
454    )
455    .await;
456    // The full key is returned exactly once; only its hash is stored.
457    Ok((
458        StatusCode::CREATED,
459        Json(json!({ "id": id, "name": body.name.trim(), "role": role, "key": key })),
460    ))
461}
462
463async fn delete_api_key(
464    State(st): State<AppState>,
465    principal: Principal,
466    Path(id): Path<String>,
467) -> AppResult<StatusCode> {
468    principal.require(principal.can_admin(), "delete API keys")?;
469    let res = sqlx::query("DELETE FROM api_keys WHERE id = ?")
470        .bind(&id)
471        .execute(&st.pool)
472        .await?;
473    if res.rows_affected() == 0 {
474        return Err(AppError::NotFound(format!("api key {id} not found")));
475    }
476    auth::audit(
477        &st.pool,
478        &principal,
479        "delete_api_key",
480        "api_key",
481        &id,
482        json!({}),
483    )
484    .await;
485    Ok(StatusCode::NO_CONTENT)
486}
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491    use crate::config::Config;
492    use crate::services::recorder::RecorderManager;
493    use crate::services::sampler::SamplerManager;
494    use std::sync::Arc;
495
496    /// Build a minimal in-memory AppState (single connection so the :memory: DB persists across
497    /// queries) with real migrations applied, mirroring the helper used by the other route tests.
498    async fn test_state(auth_enabled: bool) -> AppState {
499        let pool = sqlx::sqlite::SqlitePoolOptions::new()
500            .max_connections(1)
501            .connect("sqlite::memory:")
502            .await
503            .unwrap();
504        crate::db::run_migrations(&pool).await.unwrap();
505        let mut cfg = Config::from_env();
506        cfg.auth_enabled = auth_enabled;
507        let cfg = Arc::new(cfg);
508        AppState {
509            recorder: RecorderManager::new(pool.clone(), cfg.clone()),
510            sampler: SamplerManager::new(pool.clone(), cfg.clone()),
511            mirror: None,
512            consumers: Arc::new(Vec::new()),
513            modules: Arc::new(Vec::new()),
514            catalog: Arc::new(crate::services::registry::CatalogService::new(&cfg)),
515            http: reqwest::Client::new(),
516            started_at: chrono::Utc::now(),
517            pool,
518            cfg,
519        }
520    }
521
522    fn viewer() -> Principal {
523        Principal {
524            id: "usr_viewer".into(),
525            name: "vee".into(),
526            role: Role::Viewer,
527            kind: auth::PrincipalKind::User,
528        }
529    }
530
531    #[tokio::test]
532    async fn me_reports_principal_role_and_kind() {
533        // System admin (auth-disabled implicit principal) reports role=admin, kind=system.
534        let Json(v) = me(Principal::system_admin()).await.unwrap();
535        assert_eq!(v["id"], "system");
536        assert_eq!(v["name"], "system");
537        assert_eq!(v["role"], "admin");
538        assert_eq!(v["kind"], "system");
539
540        // A user-kind principal maps to kind=user and echoes its role.
541        let Json(v) = me(viewer()).await.unwrap();
542        assert_eq!(v["role"], "viewer");
543        assert_eq!(v["kind"], "user");
544    }
545
546    #[tokio::test]
547    async fn create_user_validation_rejects_bad_input() {
548        let st = test_state(false).await;
549
550        // Empty (whitespace-only) username.
551        let err = create_user(
552            State(st.clone()),
553            Principal::system_admin(),
554            Json(UserCreate {
555                username: "   ".into(),
556                password: "x".repeat(MIN_PASSWORD_LEN),
557                role: None,
558                display_name: None,
559                active: None,
560            }),
561        )
562        .await
563        .err()
564        .unwrap();
565        match err {
566            AppError::BadRequest(m) => assert!(m.contains("username")),
567            other => panic!("expected BadRequest, got {other:?}"),
568        }
569
570        // Password shorter than MIN_PASSWORD_LEN.
571        let err = create_user(
572            State(st.clone()),
573            Principal::system_admin(),
574            Json(UserCreate {
575                username: "joe".into(),
576                password: "x".repeat(MIN_PASSWORD_LEN - 1),
577                role: None,
578                display_name: None,
579                active: None,
580            }),
581        )
582        .await
583        .err()
584        .unwrap();
585        match err {
586            AppError::BadRequest(m) => assert!(m.contains("password")),
587            other => panic!("expected BadRequest, got {other:?}"),
588        }
589
590        // Unrecognized role.
591        let err = create_user(
592            State(st.clone()),
593            Principal::system_admin(),
594            Json(UserCreate {
595                username: "joe".into(),
596                password: "x".repeat(MIN_PASSWORD_LEN),
597                role: Some("superuser".into()),
598                display_name: None,
599                active: None,
600            }),
601        )
602        .await
603        .err()
604        .unwrap();
605        match err {
606            AppError::BadRequest(m) => assert!(m.contains("role")),
607            other => panic!("expected BadRequest, got {other:?}"),
608        }
609    }
610
611    #[tokio::test]
612    async fn create_user_defaults_and_list_orders() {
613        let st = test_state(false).await;
614
615        // Surrounding whitespace is trimmed; role defaults to viewer; active defaults to true.
616        let (status, Json(uv)) = create_user(
617            State(st.clone()),
618            Principal::system_admin(),
619            Json(UserCreate {
620                username: "  bravo  ".into(),
621                password: "x".repeat(MIN_PASSWORD_LEN),
622                role: None,
623                display_name: None,
624                active: None,
625            }),
626        )
627        .await
628        .unwrap();
629        assert_eq!(status, StatusCode::CREATED);
630        assert_eq!(uv.username, "bravo");
631        assert_eq!(uv.role, "viewer");
632        assert!(uv.active);
633
634        let _ = create_user(
635            State(st.clone()),
636            Principal::system_admin(),
637            Json(UserCreate {
638                username: "alpha".into(),
639                password: "x".repeat(MIN_PASSWORD_LEN),
640                role: Some("manager".into()),
641                display_name: Some("Al".into()),
642                active: None,
643            }),
644        )
645        .await
646        .unwrap();
647
648        // list_users is ordered by username ASC.
649        let Json(users) = list_users(State(st.clone()), Principal::system_admin())
650            .await
651            .unwrap();
652        assert_eq!(users.len(), 2);
653        assert_eq!(users[0].username, "alpha");
654        assert_eq!(users[1].username, "bravo");
655        assert_eq!(users[0].role, "manager");
656    }
657
658    #[tokio::test]
659    async fn non_admin_is_forbidden() {
660        let st = test_state(false).await;
661
662        let err = list_users(State(st.clone()), viewer()).await.err().unwrap();
663        assert!(matches!(err, AppError::Forbidden(_)));
664
665        let err = create_api_key(
666            State(st.clone()),
667            viewer(),
668            Json(ApiKeyCreate {
669                name: "k".into(),
670                role: None,
671            }),
672        )
673        .await
674        .err()
675        .unwrap();
676        assert!(matches!(err, AppError::Forbidden(_)));
677    }
678
679    #[tokio::test]
680    async fn delete_user_rejects_self() {
681        let st = test_state(false).await;
682        // system_admin has id "system"; deleting that same id hits the self-deletion guard before
683        // any existence check.
684        let err = delete_user(
685            State(st.clone()),
686            Principal::system_admin(),
687            Path("system".to_string()),
688        )
689        .await
690        .err()
691        .unwrap();
692        assert!(matches!(err, AppError::BadRequest(_)));
693    }
694
695    #[tokio::test]
696    async fn update_user_protects_last_admin() {
697        let st = test_state(false).await;
698
699        // The only admin in the table.
700        let (_, Json(admin)) = create_user(
701            State(st.clone()),
702            Principal::system_admin(),
703            Json(UserCreate {
704                username: "rootadmin".into(),
705                password: "x".repeat(MIN_PASSWORD_LEN),
706                role: Some("admin".into()),
707                display_name: None,
708                active: None,
709            }),
710        )
711        .await
712        .unwrap();
713
714        // Demoting the last active admin is refused.
715        let err = update_user(
716            State(st.clone()),
717            Principal::system_admin(),
718            Path(admin.id.clone()),
719            Json(UserUpdate {
720                role: Some("viewer".into()),
721                ..Default::default()
722            }),
723        )
724        .await
725        .err()
726        .unwrap();
727        assert!(matches!(err, AppError::BadRequest(_)));
728    }
729
730    /// AppState around a caller-provided pool — for concurrency tests needing a shared,
731    /// multi-connection DB (the single-connection in-memory `test_state` would serialize the race).
732    async fn state_with_pool(pool: sqlx::SqlitePool) -> AppState {
733        let mut cfg = Config::from_env();
734        cfg.auth_enabled = false;
735        let cfg = std::sync::Arc::new(cfg);
736        AppState {
737            recorder: RecorderManager::new(pool.clone(), cfg.clone()),
738            sampler: SamplerManager::new(pool.clone(), cfg.clone()),
739            mirror: None,
740            consumers: std::sync::Arc::new(Vec::new()),
741            modules: std::sync::Arc::new(Vec::new()),
742            catalog: std::sync::Arc::new(crate::services::registry::CatalogService::new(&cfg)),
743            http: reqwest::Client::new(),
744            started_at: chrono::Utc::now(),
745            pool,
746            cfg,
747        }
748    }
749
750    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
751    async fn concurrent_demotion_cannot_drain_the_last_admin() {
752        // Temp-FILE DB so the pool's connections see each other's committed writes — the
753        // single-connection in-memory pool used elsewhere would serialize and hide the race.
754        let dbpath =
755            std::env::temp_dir().join(format!("heldar-authrace-{}.db", std::process::id()));
756        let _ = std::fs::remove_file(&dbpath);
757        let url = format!("sqlite://{}?mode=rwc", dbpath.display());
758        let pool = sqlx::sqlite::SqlitePoolOptions::new()
759            .max_connections(4)
760            .connect(&url)
761            .await
762            .unwrap();
763        crate::db::run_migrations(&pool).await.unwrap();
764        let st = state_with_pool(pool.clone()).await;
765
766        // Exactly two active admins.
767        let mut ids = Vec::new();
768        for u in ["admin_a", "admin_b"] {
769            let (_, Json(v)) = create_user(
770                State(st.clone()),
771                Principal::system_admin(),
772                Json(UserCreate {
773                    username: u.into(),
774                    password: "x".repeat(MIN_PASSWORD_LEN),
775                    role: Some("admin".into()),
776                    display_name: None,
777                    active: None,
778                }),
779            )
780            .await
781            .unwrap();
782            ids.push(v.id);
783        }
784
785        let demote = || {
786            Json(UserUpdate {
787                role: Some("viewer".into()),
788                ..Default::default()
789            })
790        };
791        // Demote BOTH admins at once. Old check-then-act: both pass -> zero admins. Atomic guard:
792        // at least one is rejected, an admin always remains.
793        let (r1, r2) = tokio::join!(
794            update_user(
795                State(st.clone()),
796                Principal::system_admin(),
797                Path(ids[0].clone()),
798                demote(),
799            ),
800            update_user(
801                State(st.clone()),
802                Principal::system_admin(),
803                Path(ids[1].clone()),
804                demote(),
805            ),
806        );
807
808        let rejected = [r1.is_err(), r2.is_err()]
809            .into_iter()
810            .filter(|e| *e)
811            .count();
812        let remaining: i64 =
813            sqlx::query_scalar("SELECT COUNT(*) FROM users WHERE role='admin' AND active=1")
814                .fetch_one(&pool)
815                .await
816                .unwrap();
817        let _ = std::fs::remove_file(&dbpath);
818        assert!(
819            remaining >= 1,
820            "LOCKOUT: concurrent demotions drained all active admins (remaining={remaining})"
821        );
822        assert!(
823            rejected >= 1,
824            "at least one of two concurrent last-admin demotions must be rejected"
825        );
826    }
827
828    #[tokio::test]
829    async fn create_api_key_shape_and_validation() {
830        let st = test_state(false).await;
831
832        // Empty name is rejected.
833        let err = create_api_key(
834            State(st.clone()),
835            Principal::system_admin(),
836            Json(ApiKeyCreate {
837                name: "  ".into(),
838                role: None,
839            }),
840        )
841        .await
842        .err()
843        .unwrap();
844        assert!(matches!(err, AppError::BadRequest(_)));
845
846        // Valid creation: role defaults to integration, the secret is prefixed and returned once.
847        let (status, Json(v)) = create_api_key(
848            State(st.clone()),
849            Principal::system_admin(),
850            Json(ApiKeyCreate {
851                name: "  cam-bridge  ".into(),
852                role: None,
853            }),
854        )
855        .await
856        .unwrap();
857        assert_eq!(status, StatusCode::CREATED);
858        assert_eq!(v["name"], "cam-bridge");
859        assert_eq!(v["role"], "integration");
860        let key = v["key"].as_str().unwrap();
861        assert!(key.starts_with(auth::APIKEY_PREFIX));
862    }
863
864    #[tokio::test]
865    async fn login_unknown_wrong_then_success() {
866        let st = test_state(false).await;
867
868        // No users yet -> unknown user is uniformly Unauthorized.
869        let err = login(
870            State(st.clone()),
871            Json(LoginRequest {
872                username: "ghost".into(),
873                password: "whatever1".into(),
874            }),
875        )
876        .await
877        .err()
878        .unwrap();
879        assert!(matches!(err, AppError::Unauthorized(_)));
880
881        // Seed an operator.
882        let _ = create_user(
883            State(st.clone()),
884            Principal::system_admin(),
885            Json(UserCreate {
886                username: "operator".into(),
887                password: "operator-pass".into(),
888                role: Some("manager".into()),
889                display_name: None,
890                active: None,
891            }),
892        )
893        .await
894        .unwrap();
895
896        // Wrong password for an existing user is also Unauthorized.
897        let err = login(
898            State(st.clone()),
899            Json(LoginRequest {
900                username: "operator".into(),
901                password: "not-the-pass".into(),
902            }),
903        )
904        .await
905        .err()
906        .unwrap();
907        assert!(matches!(err, AppError::Unauthorized(_)));
908
909        // Correct credentials succeed: 200, an HttpOnly session cookie, and one persisted session.
910        let resp = login(
911            State(st.clone()),
912            Json(LoginRequest {
913                username: "operator".into(),
914                password: "operator-pass".into(),
915            }),
916        )
917        .await
918        .unwrap()
919        .into_response();
920        assert_eq!(resp.status(), StatusCode::OK);
921        let set_cookie = resp
922            .headers()
923            .get(header::SET_COOKIE)
924            .unwrap()
925            .to_str()
926            .unwrap();
927        assert!(set_cookie.contains(auth::SESSION_COOKIE));
928        assert!(set_cookie.contains("HttpOnly"));
929
930        let sessions: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sessions")
931            .fetch_one(&st.pool)
932            .await
933            .unwrap();
934        assert_eq!(sessions, 1);
935    }
936
937    // ---- Per-account login lockout ----
938
939    async fn test_state_lockout(max_failures: i64, lockout_min: i64) -> AppState {
940        let pool = sqlx::sqlite::SqlitePoolOptions::new()
941            .max_connections(1)
942            .connect("sqlite::memory:")
943            .await
944            .unwrap();
945        crate::db::run_migrations(&pool).await.unwrap();
946        let mut cfg = Config::from_env();
947        cfg.auth_enabled = true;
948        cfg.login_max_failures = max_failures;
949        cfg.login_lockout_min = lockout_min;
950        let cfg = Arc::new(cfg);
951        AppState {
952            recorder: RecorderManager::new(pool.clone(), cfg.clone()),
953            sampler: SamplerManager::new(pool.clone(), cfg.clone()),
954            mirror: None,
955            consumers: Arc::new(Vec::new()),
956            modules: Arc::new(Vec::new()),
957            catalog: Arc::new(crate::services::registry::CatalogService::new(&cfg)),
958            http: reqwest::Client::new(),
959            started_at: chrono::Utc::now(),
960            pool,
961            cfg,
962        }
963    }
964
965    async fn seed_user(st: &AppState, username: &str, password: &str) {
966        let _ = create_user(
967            State(st.clone()),
968            Principal::system_admin(),
969            Json(UserCreate {
970                username: username.into(),
971                password: password.into(),
972                role: Some("manager".into()),
973                display_name: None,
974                active: None,
975            }),
976        )
977        .await
978        .unwrap();
979    }
980
981    async fn try_login(
982        st: &AppState,
983        username: &str,
984        password: &str,
985    ) -> AppResult<impl IntoResponse> {
986        login(
987            State(st.clone()),
988            Json(LoginRequest {
989                username: username.into(),
990                password: password.into(),
991            }),
992        )
993        .await
994    }
995
996    #[tokio::test]
997    async fn login_locks_after_max_failures_and_rejects_correct_password() {
998        let st = test_state_lockout(3, 15).await;
999        seed_user(&st, "op", "correct-pass").await;
1000        for _ in 0..3 {
1001            assert!(matches!(
1002                try_login(&st, "op", "wrong").await.err().unwrap(),
1003                AppError::Unauthorized(_)
1004            ));
1005        }
1006        let (count, locked): (i64, Option<String>) = sqlx::query_as(
1007            "SELECT failed_login_count, locked_until FROM users WHERE username='op'",
1008        )
1009        .fetch_one(&st.pool)
1010        .await
1011        .unwrap();
1012        assert_eq!(count, 3);
1013        assert!(
1014            locked.is_some(),
1015            "account should be locked after 3 failures"
1016        );
1017
1018        // The CORRECT password is now refused, and NO session is created.
1019        assert!(matches!(
1020            try_login(&st, "op", "correct-pass").await.err().unwrap(),
1021            AppError::Unauthorized(_)
1022        ));
1023        let sessions: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sessions")
1024            .fetch_one(&st.pool)
1025            .await
1026            .unwrap();
1027        assert_eq!(sessions, 0, "a locked account must not get a session");
1028    }
1029
1030    #[tokio::test]
1031    async fn successful_login_resets_failure_count() {
1032        let st = test_state_lockout(3, 15).await;
1033        seed_user(&st, "op", "correct-pass").await;
1034        let _ = try_login(&st, "op", "wrong").await;
1035        let _ = try_login(&st, "op", "wrong").await; // 2 < threshold 3
1036        let resp = try_login(&st, "op", "correct-pass")
1037            .await
1038            .unwrap()
1039            .into_response();
1040        assert_eq!(resp.status(), StatusCode::OK);
1041        let (count, locked): (i64, Option<String>) = sqlx::query_as(
1042            "SELECT failed_login_count, locked_until FROM users WHERE username='op'",
1043        )
1044        .fetch_one(&st.pool)
1045        .await
1046        .unwrap();
1047        assert_eq!(count, 0);
1048        assert!(locked.is_none());
1049    }
1050
1051    #[tokio::test]
1052    async fn auto_unlock_after_window() {
1053        let st = test_state_lockout(2, 15).await;
1054        seed_user(&st, "op", "correct-pass").await;
1055        let _ = try_login(&st, "op", "wrong").await;
1056        let _ = try_login(&st, "op", "wrong").await;
1057        // Simulate the lock window elapsing by backdating locked_until.
1058        sqlx::query("UPDATE users SET locked_until = ? WHERE username='op'")
1059            .bind(Utc::now() - chrono::Duration::minutes(1))
1060            .execute(&st.pool)
1061            .await
1062            .unwrap();
1063        let resp = try_login(&st, "op", "correct-pass")
1064            .await
1065            .unwrap()
1066            .into_response();
1067        assert_eq!(resp.status(), StatusCode::OK);
1068    }
1069
1070    #[tokio::test]
1071    async fn manual_unlock_clears_lock() {
1072        let st = test_state_lockout(2, 15).await;
1073        seed_user(&st, "op", "correct-pass").await;
1074        let _ = try_login(&st, "op", "wrong").await;
1075        let _ = try_login(&st, "op", "wrong").await;
1076        let uid: String = sqlx::query_scalar("SELECT id FROM users WHERE username='op'")
1077            .fetch_one(&st.pool)
1078            .await
1079            .unwrap();
1080        let _ = unlock_user(State(st.clone()), Principal::system_admin(), Path(uid))
1081            .await
1082            .unwrap();
1083        let resp = try_login(&st, "op", "correct-pass")
1084            .await
1085            .unwrap()
1086            .into_response();
1087        assert_eq!(resp.status(), StatusCode::OK);
1088    }
1089
1090    #[tokio::test]
1091    async fn lockout_disabled_when_zero_never_locks() {
1092        let st = test_state_lockout(0, 15).await; // disabled
1093        seed_user(&st, "op", "correct-pass").await;
1094        for _ in 0..10 {
1095            let _ = try_login(&st, "op", "wrong").await;
1096        }
1097        let locked: Option<String> =
1098            sqlx::query_scalar("SELECT locked_until FROM users WHERE username='op'")
1099                .fetch_one(&st.pool)
1100                .await
1101                .unwrap();
1102        assert!(locked.is_none(), "lockout disabled must never lock");
1103        let resp = try_login(&st, "op", "correct-pass")
1104            .await
1105            .unwrap()
1106            .into_response();
1107        assert_eq!(resp.status(), StatusCode::OK);
1108    }
1109
1110    #[tokio::test]
1111    async fn logout_is_no_content_and_clears_cookie() {
1112        let st = test_state(false).await;
1113        // No credentials present -> still a clean, idempotent logout.
1114        let resp = logout(State(st.clone()), HeaderMap::new())
1115            .await
1116            .unwrap()
1117            .into_response();
1118        assert_eq!(resp.status(), StatusCode::NO_CONTENT);
1119        let set_cookie = resp
1120            .headers()
1121            .get(header::SET_COOKIE)
1122            .unwrap()
1123            .to_str()
1124            .unwrap();
1125        assert!(set_cookie.contains(auth::SESSION_COOKIE));
1126        assert!(set_cookie.contains("Max-Age=0"));
1127    }
1128}