athena_rs 3.22.1

Hyper performant polyglot Database driver
Documentation
use actix_web::{HttpRequest, HttpResponse};
use athena_auth_core::{
    ValidateSessionInput, ValidatedSessionContext, extract_session_token_from_headers,
    validate_session,
};
use chrono::Utc;
use once_cell::sync::Lazy;
use serde::Serialize;
use sqlx::FromRow;

use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::client_context::auth_pool;
use crate::api::headers::x_organization_id::get_x_organization_id;
use crate::api::headers::x_user_id::get_x_user_id;
use crate::api::response::service_unavailable;
use crate::config::Config;

const SESSION_TOKEN_HEADER: &str = "X-Athena-Auth-Session-Token";
const SESSION_COOKIE_NAME: &str = "athena-auth.session-token";
const LEGACY_IDENTITY_ENV: &str = "ATHENA_STORAGE_ALLOW_LEGACY_IDENTITY_HEADERS";

static STORAGE_ALLOW_LEGACY_IDENTITY_HEADERS: Lazy<bool> = Lazy::new(|| {
    Config::load()
        .map(|config| config.get_storage_allow_legacy_identity_headers())
        .unwrap_or_else(|_| {
            std::env::var(LEGACY_IDENTITY_ENV)
                .ok()
                .map(|value| {
                    matches!(
                        value.trim().to_ascii_lowercase().as_str(),
                        "1" | "true" | "yes" | "on"
                    )
                })
                .unwrap_or(false)
        })
});

#[derive(Debug, Clone, Serialize)]
pub struct StorageActor {
    pub user_id: String,
    pub active_organization_id: Option<String>,
    pub organization_ids: Vec<String>,
    pub role_ids: Vec<String>,
    pub is_admin: bool,
    pub session_id: Option<String>,
}

#[derive(Serialize)]
struct StorageUnauthorizedBody {
    status: &'static str,
    message: &'static str,
    error: &'static str,
    code: &'static str,
}

#[derive(Debug, FromRow)]
struct BearerActorRow {
    user_id: String,
    user_role: Option<String>,
    organization_ids: Vec<String>,
    member_roles: Vec<String>,
    has_admin_scope: bool,
}

pub async fn require_storage_actor(
    req: &HttpRequest,
    app_state: &AppState,
) -> Result<StorageActor, HttpResponse> {
    resolve_optional_storage_actor(req, app_state)
        .await?
        .ok_or_else(storage_unauthorized_response)
}

pub async fn resolve_optional_storage_actor(
    req: &HttpRequest,
    app_state: &AppState,
) -> Result<Option<StorageActor>, HttpResponse> {
    if authorize_static_admin_key(req).is_ok() {
        let user_id = non_empty(get_x_user_id(req)).unwrap_or_else(|| "admin".to_string());
        let active_organization_id = non_empty(get_x_organization_id(req));
        let organization_ids = active_organization_id.clone().into_iter().collect();
        return Ok(Some(StorageActor {
            user_id,
            active_organization_id,
            organization_ids,
            role_ids: vec!["admin".to_string()],
            is_admin: true,
            session_id: None,
        }));
    }

    if let Some(session_token) = session_token_from_request(req) {
        let pool = auth_pool(app_state)?;
        let context = validate_session(
            &pool,
            ValidateSessionInput {
                token: &session_token,
                now: Utc::now(),
                include_user: true,
                include_organization: true,
            },
        )
        .await
        .map_err(|err| {
            service_unavailable(
                "Storage auth store unavailable",
                format!("failed to verify Athena Auth session: {err}"),
            )
        })?;

        if let Some(context) = context {
            return Ok(Some(actor_from_validated_session(context)));
        }

        return Ok(None);
    }

    if let Some(bearer_token) = bearer_token_from_request(req) {
        let pool = auth_pool(app_state)?;
        let row = sqlx::query_as::<_, BearerActorRow>(
            r#"
            SELECT
                a.user_id,
                u.role AS user_role,
                COALESCE(
                    array_agg(DISTINCT m.organization_id) FILTER (WHERE m.organization_id IS NOT NULL),
                    ARRAY[]::text[]
                ) AS organization_ids,
                COALESCE(
                    array_agg(DISTINCT m.role) FILTER (WHERE m.role IS NOT NULL),
                    ARRAY[]::text[]
                ) AS member_roles,
                EXISTS (
                    SELECT 1
                    FROM public.user_permission_scopes AS ups
                    WHERE ups.user_id = a.user_id
                      AND ups.enabled = true
                      AND (
                        ups.global = true
                        OR ups.scope IN ('admin', 'storage.admin', 'storage:admin', 'storage:*')
                      )
                ) AS has_admin_scope
            FROM public.apikey AS a
            INNER JOIN public.user AS u
                ON u.id = a.user_id
            LEFT JOIN public.member AS m
                ON m.user_id = a.user_id
            WHERE a.key = $1
              AND COALESCE(a.enabled, true) = true
              AND COALESCE(u.banned, false) = false
            GROUP BY a.user_id, u.role
            LIMIT 1
            "#,
        )
        .bind(bearer_token)
        .fetch_optional(&pool)
        .await
        .map_err(|err| {
            service_unavailable(
                "Storage auth store unavailable",
                format!("failed to verify Athena Auth bearer token: {err}"),
            )
        })?;

        if let Some(row) = row {
            return Ok(Some(actor_from_bearer_row(row)));
        }

        return Ok(None);
    }

    if *STORAGE_ALLOW_LEGACY_IDENTITY_HEADERS {
        return Ok(legacy_actor_from_headers(req));
    }

    Ok(None)
}

pub fn storage_unauthorized_response() -> HttpResponse {
    HttpResponse::Unauthorized().json(StorageUnauthorizedBody {
        status: "error",
        message: "Storage authentication required",
        error: "Missing or invalid Athena Auth session",
        code: "STORAGE_UNAUTHORIZED",
    })
}

fn actor_from_validated_session(context: ValidatedSessionContext) -> StorageActor {
    let role_ids = merged_roles(context.role, context.member_roles);
    let is_admin = actor_is_admin(&role_ids, context.has_admin_scope);
    StorageActor {
        user_id: context.user_id,
        active_organization_id: context.active_organization_id,
        organization_ids: context.organization_ids,
        role_ids,
        is_admin,
        session_id: Some(context.session_id),
    }
}

fn actor_from_bearer_row(row: BearerActorRow) -> StorageActor {
    let role_ids = merged_roles(row.user_role, row.member_roles);
    let is_admin = actor_is_admin(&role_ids, row.has_admin_scope);

    StorageActor {
        user_id: row.user_id,
        active_organization_id: row.organization_ids.first().cloned(),
        organization_ids: row.organization_ids,
        role_ids,
        is_admin,
        session_id: None,
    }
}

fn legacy_actor_from_headers(req: &HttpRequest) -> Option<StorageActor> {
    let user_id = non_empty(get_x_user_id(req))?;
    let active_organization_id = non_empty(get_x_organization_id(req));
    let organization_ids = active_organization_id.clone().into_iter().collect();
    Some(StorageActor {
        user_id,
        active_organization_id,
        organization_ids,
        role_ids: Vec::new(),
        is_admin: false,
        session_id: None,
    })
}

fn actor_is_admin(role_ids: &[String], has_admin_scope: bool) -> bool {
    has_admin_scope
        || role_ids.iter().any(|role| {
            matches!(
                role.as_str(),
                "admin" | "super_admin" | "owner" | "storage_admin" | "storage-admin"
            )
        })
}

fn merged_roles(user_role: Option<String>, member_roles: Vec<String>) -> Vec<String> {
    let mut roles = Vec::new();
    if let Some(user_role) = normalize_value(user_role.as_deref()) {
        roles.push(user_role);
    }
    for role in member_roles {
        if let Some(role) = normalize_value(Some(role.as_str())) {
            if !roles.iter().any(|existing| existing == &role) {
                roles.push(role);
            }
        }
    }
    roles
}

fn session_token_from_request(req: &HttpRequest) -> Option<String> {
    req.headers()
        .get(SESSION_TOKEN_HEADER)
        .and_then(|value| value.to_str().ok())
        .and_then(|value| normalize_value(Some(value)))
        .or_else(|| {
            extract_session_token_from_headers(
                None,
                req.headers()
                    .get("cookie")
                    .and_then(|value| value.to_str().ok()),
                Some(SESSION_COOKIE_NAME),
            )
            .map(|token| token.token)
        })
}

fn bearer_token_from_request(req: &HttpRequest) -> Option<String> {
    req.headers()
        .get("X-Athena-Auth-Bearer-Token")
        .and_then(|value| value.to_str().ok())
        .and_then(|value| normalize_value(Some(value)))
        .or_else(|| {
            req.headers()
                .get("authorization")
                .and_then(|value| value.to_str().ok())
                .and_then(|value| {
                    value
                        .strip_prefix("Bearer ")
                        .or_else(|| value.strip_prefix("bearer "))
                        .and_then(|token| normalize_value(Some(token)))
                })
        })
}

fn non_empty(value: Option<String>) -> Option<String> {
    value.and_then(|value| normalize_value(Some(value.as_str())))
}

fn normalize_value(value: Option<&str>) -> Option<String> {
    value
        .map(str::trim)
        .filter(|value| !value.is_empty())
        .map(str::to_string)
}

#[cfg(test)]
mod tests {
    use super::*;
    use actix_web::test::TestRequest;

    #[test]
    fn session_token_prefers_explicit_storage_header() {
        let req = TestRequest::default()
            .insert_header((SESSION_TOKEN_HEADER, "session_header"))
            .insert_header(("cookie", "athena-auth.session-token=session_cookie"))
            .to_http_request();

        assert_eq!(
            session_token_from_request(&req).as_deref(),
            Some("session_header")
        );
    }

    #[test]
    fn session_token_falls_back_to_cookie_parsing_from_auth_core() {
        let req = TestRequest::default()
            .insert_header((
                "cookie",
                "foo=bar; athena-auth.session-token=session_cookie",
            ))
            .to_http_request();

        assert_eq!(
            session_token_from_request(&req).as_deref(),
            Some("session_cookie")
        );
    }

    #[test]
    fn validated_session_maps_into_storage_actor() {
        let actor = actor_from_validated_session(ValidatedSessionContext {
            session_id: "session-1".to_string(),
            user_id: "user-1".to_string(),
            email: Some("user@example.com".to_string()),
            name: Some("User".to_string()),
            role: Some("admin".to_string()),
            active_organization_id: Some("org-2".to_string()),
            organization_ids: vec!["org-1".to_string()],
            member_roles: vec!["owner".to_string()],
            permissions: vec!["storage:*".to_string()],
            has_admin_scope: true,
            expires_at: Utc::now(),
        });

        assert_eq!(actor.user_id, "user-1");
        assert_eq!(actor.session_id.as_deref(), Some("session-1"));
        assert_eq!(actor.organization_ids, vec!["org-1".to_string()]);
        assert_eq!(
            actor.role_ids,
            vec!["admin".to_string(), "owner".to_string()]
        );
        assert!(actor.is_admin);
    }
}