use actix_web::{HttpRequest, HttpResponse};
use athena_auth_core::{
ValidateSessionInput, ValidatedSessionContext, extract_session_token_from_headers,
validate_session,
};
use chrono::Utc;
use once_cell::sync::Lazy;
use serde::Serialize;
use sqlx::FromRow;
use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::client_context::request_auth_pool;
use crate::api::headers::x_organization_id::get_x_organization_id;
use crate::api::headers::x_user_id::get_x_user_id;
use crate::api::response::service_unavailable;
use crate::config::Config;
const SESSION_TOKEN_HEADER: &str = "X-Athena-Auth-Session-Token";
const SESSION_COOKIE_NAME: &str = "athena-auth.session-token";
const LEGACY_IDENTITY_ENV: &str = "ATHENA_STORAGE_ALLOW_LEGACY_IDENTITY_HEADERS";
const ATHENA_BEARER_ACTOR_SQL: &str = r#"
SELECT
a.user_id,
u.role AS user_role,
COALESCE(
array_agg(DISTINCT m.organization_id) FILTER (WHERE m.organization_id IS NOT NULL),
ARRAY[]::text[]
) AS organization_ids,
COALESCE(
array_agg(DISTINCT m.role) FILTER (WHERE m.role IS NOT NULL),
ARRAY[]::text[]
) AS member_roles,
EXISTS (
SELECT 1
FROM athena.user_permission_scopes AS ups
WHERE ups.user_id = a.user_id
AND ups.enabled = true
AND (
ups.global = true
OR ups.scope IN ('admin', 'storage.admin', 'storage:admin', 'storage:*')
)
) AS has_admin_scope
FROM athena.api_keys AS a
INNER JOIN athena.users AS u
ON u.id = a.user_id
LEFT JOIN athena.member AS m
ON m.user_id = a.user_id
WHERE a.key = $1
AND COALESCE(a.enabled, true) = true
AND COALESCE(u.banned, false) = false
GROUP BY a.user_id, u.role
LIMIT 1
"#;
const ATHENA_COMPAT_BEARER_ACTOR_SQL: &str = r#"
SELECT
a.user_id,
u.role AS user_role,
COALESCE(
array_agg(DISTINCT m.organization_id) FILTER (WHERE m.organization_id IS NOT NULL),
ARRAY[]::text[]
) AS organization_ids,
COALESCE(
array_agg(DISTINCT m.role) FILTER (WHERE m.role IS NOT NULL),
ARRAY[]::text[]
) AS member_roles,
EXISTS (
SELECT 1
FROM athena.user_permission_scopes AS ups
WHERE ups.user_id = a.user_id
AND ups.enabled = true
AND (
ups.global = true
OR ups.scope IN ('admin', 'storage.admin', 'storage:admin', 'storage:*')
)
) AS has_admin_scope
FROM athena.apikey AS a
INNER JOIN athena.users AS u
ON u.id = a.user_id
LEFT JOIN athena.member AS m
ON m.user_id = a.user_id
WHERE a.key = $1
AND COALESCE(a.enabled, true) = true
AND COALESCE(u.banned, false) = false
GROUP BY a.user_id, u.role
LIMIT 1
"#;
static STORAGE_ALLOW_LEGACY_IDENTITY_HEADERS: Lazy<bool> = Lazy::new(|| {
Config::load()
.map(|config| config.get_storage_allow_legacy_identity_headers())
.unwrap_or_else(|_| {
std::env::var(LEGACY_IDENTITY_ENV)
.ok()
.map(|value| {
matches!(
value.trim().to_ascii_lowercase().as_str(),
"1" | "true" | "yes" | "on"
)
})
.unwrap_or(false)
})
});
#[derive(Debug, Clone, Serialize)]
pub struct StorageActor {
pub user_id: String,
pub active_organization_id: Option<String>,
pub organization_ids: Vec<String>,
pub role_ids: Vec<String>,
pub is_admin: bool,
pub session_id: Option<String>,
}
#[derive(Serialize)]
struct StorageUnauthorizedBody {
status: &'static str,
message: &'static str,
error: &'static str,
code: &'static str,
}
#[derive(Debug, FromRow)]
struct BearerActorRow {
user_id: String,
user_role: Option<String>,
organization_ids: Vec<String>,
member_roles: Vec<String>,
has_admin_scope: bool,
}
pub async fn require_storage_actor(
req: &HttpRequest,
app_state: &AppState,
) -> Result<StorageActor, HttpResponse> {
resolve_optional_storage_actor(req, app_state)
.await?
.ok_or_else(storage_unauthorized_response)
}
pub async fn resolve_optional_storage_actor(
req: &HttpRequest,
app_state: &AppState,
) -> Result<Option<StorageActor>, HttpResponse> {
if authorize_static_admin_key(req).is_ok() {
let user_id = non_empty(get_x_user_id(req)).unwrap_or_else(|| "admin".to_string());
let active_organization_id = non_empty(get_x_organization_id(req));
let organization_ids = active_organization_id.clone().into_iter().collect();
return Ok(Some(StorageActor {
user_id,
active_organization_id,
organization_ids,
role_ids: vec!["admin".to_string()],
is_admin: true,
session_id: None,
}));
}
if let Some(session_token) = session_token_from_request(req) {
let pool = request_auth_pool(req, app_state).await?;
let context = validate_session(
&pool,
ValidateSessionInput {
token: &session_token,
now: Utc::now(),
include_user: true,
include_organization: true,
},
)
.await
.map_err(|err| {
service_unavailable(
"Storage auth store unavailable",
format!("failed to verify Athena Auth session: {err}"),
)
})?;
if let Some(context) = context {
return Ok(Some(actor_from_validated_session(context)));
}
return Ok(None);
}
if let Some(bearer_token) = bearer_token_from_request(req) {
let pool = request_auth_pool(req, app_state).await?;
let row = fetch_bearer_actor_row(&pool, &bearer_token)
.await
.map_err(|err| {
service_unavailable(
"Storage auth store unavailable",
format!("failed to verify Athena Auth bearer token: {err}"),
)
})?;
if let Some(row) = row {
return Ok(Some(actor_from_bearer_row(row)));
}
return Ok(None);
}
if *STORAGE_ALLOW_LEGACY_IDENTITY_HEADERS {
return Ok(legacy_actor_from_headers(req));
}
Ok(None)
}
pub fn storage_unauthorized_response() -> HttpResponse {
HttpResponse::Unauthorized().json(StorageUnauthorizedBody {
status: "error",
message: "Storage authentication required",
error: "Missing or invalid Athena Auth session",
code: "STORAGE_UNAUTHORIZED",
})
}
fn actor_from_validated_session(context: ValidatedSessionContext) -> StorageActor {
let role_ids = merged_roles(context.role, context.member_roles);
let is_admin = actor_is_admin(&role_ids, context.has_admin_scope);
StorageActor {
user_id: context.user_id,
active_organization_id: context.active_organization_id,
organization_ids: context.organization_ids,
role_ids,
is_admin,
session_id: Some(context.session_id),
}
}
fn actor_from_bearer_row(row: BearerActorRow) -> StorageActor {
let role_ids = merged_roles(row.user_role, row.member_roles);
let is_admin = actor_is_admin(&role_ids, row.has_admin_scope);
StorageActor {
user_id: row.user_id,
active_organization_id: row.organization_ids.first().cloned(),
organization_ids: row.organization_ids,
role_ids,
is_admin,
session_id: None,
}
}
fn legacy_actor_from_headers(req: &HttpRequest) -> Option<StorageActor> {
let user_id = non_empty(get_x_user_id(req))?;
let active_organization_id = non_empty(get_x_organization_id(req));
let organization_ids = active_organization_id.clone().into_iter().collect();
Some(StorageActor {
user_id,
active_organization_id,
organization_ids,
role_ids: Vec::new(),
is_admin: false,
session_id: None,
})
}
fn actor_is_admin(role_ids: &[String], has_admin_scope: bool) -> bool {
has_admin_scope
|| role_ids.iter().any(|role| {
matches!(
role.as_str(),
"admin" | "super_admin" | "owner" | "storage_admin" | "storage-admin"
)
})
}
fn merged_roles(user_role: Option<String>, member_roles: Vec<String>) -> Vec<String> {
let mut roles = Vec::new();
if let Some(user_role) = normalize_value(user_role.as_deref()) {
roles.push(user_role);
}
for role in member_roles {
if let Some(role) = normalize_value(Some(role.as_str())) {
if !roles.iter().any(|existing| existing == &role) {
roles.push(role);
}
}
}
roles
}
fn session_token_from_request(req: &HttpRequest) -> Option<String> {
req.headers()
.get(SESSION_TOKEN_HEADER)
.and_then(|value| value.to_str().ok())
.and_then(|value| normalize_value(Some(value)))
.or_else(|| {
extract_session_token_from_headers(
None,
req.headers()
.get("cookie")
.and_then(|value| value.to_str().ok()),
Some(SESSION_COOKIE_NAME),
)
.map(|token| token.token)
})
}
fn bearer_token_from_request(req: &HttpRequest) -> Option<String> {
req.headers()
.get("X-Athena-Auth-Bearer-Token")
.and_then(|value| value.to_str().ok())
.and_then(|value| normalize_value(Some(value)))
.or_else(|| {
req.headers()
.get("authorization")
.and_then(|value| value.to_str().ok())
.and_then(|value| {
value
.strip_prefix("Bearer ")
.or_else(|| value.strip_prefix("bearer "))
.and_then(|token| normalize_value(Some(token)))
})
})
}
fn non_empty(value: Option<String>) -> Option<String> {
value.and_then(|value| normalize_value(Some(value.as_str())))
}
fn normalize_value(value: Option<&str>) -> Option<String> {
value
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
async fn fetch_bearer_actor_row(
pool: &sqlx::PgPool,
bearer_token: &str,
) -> Result<Option<BearerActorRow>, sqlx::Error> {
match query_bearer_actor_row(pool, ATHENA_BEARER_ACTOR_SQL, bearer_token).await {
Ok(row) => Ok(row),
Err(error) if should_retry_auth_schema(&error) => {
query_bearer_actor_row(pool, ATHENA_COMPAT_BEARER_ACTOR_SQL, bearer_token).await
}
Err(error) => Err(error),
}
}
async fn query_bearer_actor_row(
pool: &sqlx::PgPool,
sql: &str,
bearer_token: &str,
) -> Result<Option<BearerActorRow>, sqlx::Error> {
sqlx::query_as::<_, BearerActorRow>(sql)
.bind(bearer_token)
.fetch_optional(pool)
.await
}
fn should_retry_auth_schema(error: &sqlx::Error) -> bool {
let Some(database_error) = error.as_database_error() else {
return false;
};
matches!(database_error.code().as_deref(), Some("42P01" | "42703"))
}
#[cfg(test)]
mod tests {
use super::*;
use actix_web::test::TestRequest;
#[test]
fn session_token_prefers_explicit_storage_header() {
let req = TestRequest::default()
.insert_header((SESSION_TOKEN_HEADER, "session_header"))
.insert_header(("cookie", "athena-auth.session-token=session_cookie"))
.to_http_request();
assert_eq!(
session_token_from_request(&req).as_deref(),
Some("session_header")
);
}
#[test]
fn session_token_falls_back_to_cookie_parsing_from_auth_core() {
let req = TestRequest::default()
.insert_header((
"cookie",
"foo=bar; athena-auth.session-token=session_cookie",
))
.to_http_request();
assert_eq!(
session_token_from_request(&req).as_deref(),
Some("session_cookie")
);
}
#[test]
fn validated_session_maps_into_storage_actor() {
let actor = actor_from_validated_session(ValidatedSessionContext {
session_id: "session-1".to_string(),
user_id: "user-1".to_string(),
email: Some("user@example.com".to_string()),
name: Some("User".to_string()),
role: Some("admin".to_string()),
active_organization_id: Some("org-2".to_string()),
organization_ids: vec!["org-1".to_string()],
member_roles: vec!["owner".to_string()],
permissions: vec!["storage:*".to_string()],
has_admin_scope: true,
expires_at: Utc::now(),
});
assert_eq!(actor.user_id, "user-1");
assert_eq!(actor.session_id.as_deref(), Some("session-1"));
assert_eq!(actor.organization_ids, vec!["org-1".to_string()]);
assert_eq!(
actor.role_ids,
vec!["admin".to_string(), "owner".to_string()]
);
assert!(actor.is_admin);
}
}