athena_rs 3.18.0

Hyper performant polyglot Database driver
Documentation
use actix_web::{HttpRequest, HttpResponse};
use once_cell::sync::Lazy;
use serde::Serialize;
use sqlx::FromRow;

use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::client_context::auth_pool;
use crate::api::headers::x_organization_id::get_x_organization_id;
use crate::api::headers::x_user_id::get_x_user_id;
use crate::api::response::service_unavailable;
use crate::config::Config;

const SESSION_TOKEN_HEADER: &str = "X-Athena-Auth-Session-Token";
const SESSION_COOKIE_NAME: &str = "athena-auth.session-token";
const LEGACY_IDENTITY_ENV: &str = "ATHENA_STORAGE_ALLOW_LEGACY_IDENTITY_HEADERS";

static STORAGE_ALLOW_LEGACY_IDENTITY_HEADERS: Lazy<bool> = Lazy::new(|| {
    Config::load()
        .map(|config| config.get_storage_allow_legacy_identity_headers())
        .unwrap_or_else(|_| {
            std::env::var(LEGACY_IDENTITY_ENV)
                .ok()
                .map(|value| {
                    matches!(
                        value.trim().to_ascii_lowercase().as_str(),
                        "1" | "true" | "yes" | "on"
                    )
                })
                .unwrap_or(false)
        })
});

#[derive(Debug, Clone, Serialize)]
pub struct StorageActor {
    pub user_id: String,
    pub active_organization_id: Option<String>,
    pub organization_ids: Vec<String>,
    pub role_ids: Vec<String>,
    pub is_admin: bool,
    pub session_id: Option<String>,
}

#[derive(Serialize)]
struct StorageUnauthorizedBody {
    status: &'static str,
    message: &'static str,
    error: &'static str,
    code: &'static str,
}

#[derive(Debug, FromRow)]
struct SessionActorRow {
    session_id: String,
    user_id: String,
    active_organization_id: Option<String>,
    user_role: Option<String>,
    organization_ids: Vec<String>,
    member_roles: Vec<String>,
    has_admin_scope: bool,
}

#[derive(Debug, FromRow)]
struct BearerActorRow {
    user_id: String,
    user_role: Option<String>,
    organization_ids: Vec<String>,
    member_roles: Vec<String>,
    has_admin_scope: bool,
}

pub async fn require_storage_actor(
    req: &HttpRequest,
    app_state: &AppState,
) -> Result<StorageActor, HttpResponse> {
    resolve_optional_storage_actor(req, app_state)
        .await?
        .ok_or_else(storage_unauthorized_response)
}

pub async fn resolve_optional_storage_actor(
    req: &HttpRequest,
    app_state: &AppState,
) -> Result<Option<StorageActor>, HttpResponse> {
    if authorize_static_admin_key(req).is_ok() {
        let user_id = non_empty(get_x_user_id(req)).unwrap_or_else(|| "admin".to_string());
        let active_organization_id = non_empty(get_x_organization_id(req));
        let organization_ids = active_organization_id.clone().into_iter().collect();
        return Ok(Some(StorageActor {
            user_id,
            active_organization_id,
            organization_ids,
            role_ids: vec!["admin".to_string()],
            is_admin: true,
            session_id: None,
        }));
    }

    if let Some(session_token) = session_token_from_request(req) {
        let pool = auth_pool(app_state)?;
        let row = sqlx::query_as::<_, SessionActorRow>(
            r#"
            SELECT
                s.id AS session_id,
                s.user_id,
                s.active_organization_id,
                u.role AS user_role,
                COALESCE(
                    array_agg(DISTINCT m.organization_id) FILTER (WHERE m.organization_id IS NOT NULL),
                    ARRAY[]::text[]
                ) AS organization_ids,
                COALESCE(
                    array_agg(DISTINCT m.role) FILTER (WHERE m.role IS NOT NULL),
                    ARRAY[]::text[]
                ) AS member_roles,
                EXISTS (
                    SELECT 1
                    FROM public.user_permission_scopes AS ups
                    WHERE ups.user_id = s.user_id
                      AND ups.enabled = true
                      AND (
                        ups.global = true
                        OR ups.scope IN ('admin', 'storage.admin', 'storage:admin', 'storage:*')
                      )
                ) AS has_admin_scope
            FROM public.session AS s
            INNER JOIN public.user AS u
                ON u.id = s.user_id
            LEFT JOIN public.member AS m
                ON m.user_id = s.user_id
            WHERE s.token = $1
              AND s.expires_at > now()
              AND COALESCE(u.banned, false) = false
            GROUP BY s.id, s.user_id, s.active_organization_id, u.role
            LIMIT 1
            "#,
        )
        .bind(session_token)
        .fetch_optional(&pool)
        .await
        .map_err(|err| {
            service_unavailable(
                "Storage auth store unavailable",
                format!("failed to verify Athena Auth session: {err}"),
            )
        })?;

        if let Some(row) = row {
            return Ok(Some(actor_from_session_row(row)));
        }

        return Ok(None);
    }

    if let Some(bearer_token) = bearer_token_from_request(req) {
        let pool = auth_pool(app_state)?;
        let row = sqlx::query_as::<_, BearerActorRow>(
            r#"
            SELECT
                a.user_id,
                u.role AS user_role,
                COALESCE(
                    array_agg(DISTINCT m.organization_id) FILTER (WHERE m.organization_id IS NOT NULL),
                    ARRAY[]::text[]
                ) AS organization_ids,
                COALESCE(
                    array_agg(DISTINCT m.role) FILTER (WHERE m.role IS NOT NULL),
                    ARRAY[]::text[]
                ) AS member_roles,
                EXISTS (
                    SELECT 1
                    FROM public.user_permission_scopes AS ups
                    WHERE ups.user_id = a.user_id
                      AND ups.enabled = true
                      AND (
                        ups.global = true
                        OR ups.scope IN ('admin', 'storage.admin', 'storage:admin', 'storage:*')
                      )
                ) AS has_admin_scope
            FROM public.apikey AS a
            INNER JOIN public.user AS u
                ON u.id = a.user_id
            LEFT JOIN public.member AS m
                ON m.user_id = a.user_id
            WHERE a.key = $1
              AND COALESCE(a.enabled, true) = true
              AND COALESCE(u.banned, false) = false
            GROUP BY a.user_id, u.role
            LIMIT 1
            "#,
        )
        .bind(bearer_token)
        .fetch_optional(&pool)
        .await
        .map_err(|err| {
            service_unavailable(
                "Storage auth store unavailable",
                format!("failed to verify Athena Auth bearer token: {err}"),
            )
        })?;

        if let Some(row) = row {
            return Ok(Some(actor_from_bearer_row(row)));
        }

        return Ok(None);
    }

    if *STORAGE_ALLOW_LEGACY_IDENTITY_HEADERS {
        return Ok(legacy_actor_from_headers(req));
    }

    Ok(None)
}

pub fn storage_unauthorized_response() -> HttpResponse {
    HttpResponse::Unauthorized().json(StorageUnauthorizedBody {
        status: "error",
        message: "Storage authentication required",
        error: "Missing or invalid Athena Auth session",
        code: "STORAGE_UNAUTHORIZED",
    })
}

fn actor_from_session_row(row: SessionActorRow) -> StorageActor {
    let mut organization_ids = row.organization_ids;
    if let Some(active_organization_id) = row.active_organization_id.as_ref() {
        if !organization_ids
            .iter()
            .any(|value| value == active_organization_id)
        {
            organization_ids.push(active_organization_id.clone());
        }
    }

    let role_ids = merged_roles(row.user_role, row.member_roles);
    let is_admin = actor_is_admin(&role_ids, row.has_admin_scope);

    StorageActor {
        user_id: row.user_id,
        active_organization_id: row.active_organization_id,
        organization_ids,
        role_ids,
        is_admin,
        session_id: Some(row.session_id),
    }
}

fn actor_from_bearer_row(row: BearerActorRow) -> StorageActor {
    let role_ids = merged_roles(row.user_role, row.member_roles);
    let is_admin = actor_is_admin(&role_ids, row.has_admin_scope);

    StorageActor {
        user_id: row.user_id,
        active_organization_id: row.organization_ids.first().cloned(),
        organization_ids: row.organization_ids,
        role_ids,
        is_admin,
        session_id: None,
    }
}

fn legacy_actor_from_headers(req: &HttpRequest) -> Option<StorageActor> {
    let user_id = non_empty(get_x_user_id(req))?;
    let active_organization_id = non_empty(get_x_organization_id(req));
    let organization_ids = active_organization_id.clone().into_iter().collect();
    Some(StorageActor {
        user_id,
        active_organization_id,
        organization_ids,
        role_ids: Vec::new(),
        is_admin: false,
        session_id: None,
    })
}

fn actor_is_admin(role_ids: &[String], has_admin_scope: bool) -> bool {
    has_admin_scope
        || role_ids.iter().any(|role| {
            matches!(
                role.as_str(),
                "admin" | "super_admin" | "owner" | "storage_admin" | "storage-admin"
            )
        })
}

fn merged_roles(user_role: Option<String>, member_roles: Vec<String>) -> Vec<String> {
    let mut roles = Vec::new();
    if let Some(user_role) = normalize_value(user_role.as_deref()) {
        roles.push(user_role);
    }
    for role in member_roles {
        if let Some(role) = normalize_value(Some(role.as_str())) {
            if !roles.iter().any(|existing| existing == &role) {
                roles.push(role);
            }
        }
    }
    roles
}

fn session_token_from_request(req: &HttpRequest) -> Option<String> {
    req.headers()
        .get(SESSION_TOKEN_HEADER)
        .and_then(|value| value.to_str().ok())
        .and_then(|value| normalize_value(Some(value)))
        .or_else(|| {
            req.headers()
                .get("cookie")
                .and_then(|value| value.to_str().ok())
                .and_then(session_token_from_cookie_header)
        })
}

fn session_token_from_cookie_header(header: &str) -> Option<String> {
    header
        .split(';')
        .filter_map(|segment| segment.split_once('='))
        .find_map(|(name, value)| {
            if name.trim() == SESSION_COOKIE_NAME {
                normalize_value(Some(value))
            } else {
                None
            }
        })
}

fn bearer_token_from_request(req: &HttpRequest) -> Option<String> {
    req.headers()
        .get("X-Athena-Auth-Bearer-Token")
        .and_then(|value| value.to_str().ok())
        .and_then(|value| normalize_value(Some(value)))
        .or_else(|| {
            req.headers()
                .get("authorization")
                .and_then(|value| value.to_str().ok())
                .and_then(|value| {
                    value
                        .strip_prefix("Bearer ")
                        .or_else(|| value.strip_prefix("bearer "))
                        .and_then(|token| normalize_value(Some(token)))
                })
        })
}

fn non_empty(value: Option<String>) -> Option<String> {
    value.and_then(|value| normalize_value(Some(value.as_str())))
}

fn normalize_value(value: Option<&str>) -> Option<String> {
    value
        .map(str::trim)
        .filter(|value| !value.is_empty())
        .map(str::to_string)
}