athena_rs 3.26.1

Hyper performant polyglot Database driver
Documentation
//! Best-effort PostgreSQL audit logging for chat session authentication outcomes.

use actix_web::HttpRequest;
use athena_actix::headers::request_context::athena_request_id;
use athena_auth_core::{SessionTokenSource, ValidatedSessionContext};
use serde_json::{Value, json};
use sqlx::postgres::PgPool;
use tracing::error;
use uuid::Uuid;

use crate::AppState;
use crate::api::gateway::auth::extract_client_ip;
use crate::utils::best_effort_pg_backoff::{
    best_effort_pg_write_backoff_active, maybe_activate_best_effort_pg_write_backoff,
};
use crate::utils::logging_task_limiter::spawn_best_effort_logging_task;

const CHAT_SESSION_AUTH_PG_LOGGING_BACKOFF_DOMAIN: &str = "chat_session_auth_pg_logging";

/// Classification of a chat/WSS session auth attempt for structured logging.
#[derive(Debug, Clone, Copy)]
pub enum ChatSessionAuthOutcome {
    Authenticated,
    MissingSessionToken,
    InvalidSession,
    MissingOrganization,
    AuthStoreUnavailable,
}

impl ChatSessionAuthOutcome {
    fn as_str(self) -> &'static str {
        match self {
            Self::Authenticated => "authenticated",
            Self::MissingSessionToken => "missing_session_token",
            Self::InvalidSession => "invalid_session",
            Self::MissingOrganization => "missing_organization",
            Self::AuthStoreUnavailable => "auth_store_unavailable",
        }
    }
}

#[derive(Debug, Clone)]
struct ChatSessionAuthLogEntry {
    request_id: String,
    client_name: Option<String>,
    transport: &'static str,
    method: String,
    path: String,
    auth_source: Option<String>,
    outcome: &'static str,
    failure_reason: Option<String>,
    user_id: Option<String>,
    session_id: Option<String>,
    organization_id: Option<String>,
    remote_addr: Option<String>,
    user_agent: Option<String>,
    details: Value,
}

/// Enqueues a best-effort insert into `athena.chat_session_auth_log` when logging PG is configured.
///
/// No-op when the logging pool is missing or PG write backoff is active for this domain.
pub fn spawn_chat_session_auth_log(
    state: &AppState,
    req: &HttpRequest,
    client_name: Option<&str>,
    auth_source: Option<SessionTokenSource>,
    validated_session: Option<&ValidatedSessionContext>,
    outcome: ChatSessionAuthOutcome,
    failure_reason: Option<String>,
    details: Value,
) {
    let pool = state
        .logging_client_name
        .as_ref()
        .and_then(|logging_client| state.pg_registry.get_pool(logging_client));
    if pool.is_none() {
        return;
    }
    if best_effort_pg_write_backoff_active(CHAT_SESSION_AUTH_PG_LOGGING_BACKOFF_DOMAIN) {
        return;
    }

    let entry = build_log_entry(
        state,
        req,
        client_name,
        auth_source,
        validated_session,
        outcome,
        failure_reason,
        details,
    );
    let limiter = state.logging_task_limiter.clone();

    spawn_best_effort_logging_task(limiter, "chat_session_auth_log", async move {
        let Some(pool) = pool else {
            return;
        };
        if let Err(err) = insert_chat_session_auth_log(&pool, entry).await {
            let err_message = err.to_string();
            if maybe_activate_best_effort_pg_write_backoff(
                CHAT_SESSION_AUTH_PG_LOGGING_BACKOFF_DOMAIN,
                "chat_session_auth_log",
                &err_message,
            )
            .is_none()
            {
                error!(error = %err, "failed to write chat session auth log");
            }
        }
    });
}

fn build_log_entry(
    state: &AppState,
    req: &HttpRequest,
    client_name: Option<&str>,
    auth_source: Option<SessionTokenSource>,
    validated_session: Option<&ValidatedSessionContext>,
    outcome: ChatSessionAuthOutcome,
    failure_reason: Option<String>,
    details: Value,
) -> ChatSessionAuthLogEntry {
    ChatSessionAuthLogEntry {
        request_id: athena_request_id(req).unwrap_or_else(|| Uuid::new_v4().to_string()),
        client_name: client_name.map(str::to_string),
        transport: transport_for_path(req.path()),
        method: req.method().as_str().to_string(),
        path: req.path().to_string(),
        auth_source: auth_source.map(|source| source.as_str().to_string()),
        outcome: outcome.as_str(),
        failure_reason,
        user_id: validated_session.map(|session| session.user_id.clone()),
        session_id: validated_session.map(|session| session.session_id.clone()),
        organization_id: validated_session.and_then(resolved_organization_id),
        remote_addr: extract_client_ip(req, state.logging_trust_x_forwarded_for)
            .map(|ip| ip.to_string()),
        user_agent: req
            .headers()
            .get("user-agent")
            .and_then(|value| value.to_str().ok())
            .map(str::to_string),
        details,
    }
}

fn resolved_organization_id(session: &ValidatedSessionContext) -> Option<String> {
    session
        .active_organization_id
        .clone()
        .or_else(|| session.organization_ids.first().cloned())
}

fn transport_for_path(path: &str) -> &'static str {
    if path.starts_with("/wss/") {
        "wss"
    } else {
        "http"
    }
}

async fn insert_chat_session_auth_log(
    pool: &PgPool,
    entry: ChatSessionAuthLogEntry,
) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        INSERT INTO athena.chat_session_auth_log (
            request_id,
            client_name,
            transport,
            method,
            path,
            auth_source,
            outcome,
            failure_reason,
            user_id,
            session_id,
            organization_id,
            remote_addr,
            user_agent,
            details
        )
        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
        "#,
    )
    .bind(entry.request_id)
    .bind(entry.client_name)
    .bind(entry.transport)
    .bind(entry.method)
    .bind(entry.path)
    .bind(entry.auth_source)
    .bind(entry.outcome)
    .bind(entry.failure_reason)
    .bind(entry.user_id)
    .bind(entry.session_id)
    .bind(entry.organization_id)
    .bind(entry.remote_addr)
    .bind(entry.user_agent)
    .bind(entry.details)
    .execute(pool)
    .await?;

    Ok(())
}

/// JSON details for auth failures where no session token was extracted.
pub fn build_missing_token_details(
    authorization_header_present: bool,
    cookie_header_present: bool,
    cookie_name: &str,
) -> Value {
    json!({
        "authorization_header_present": authorization_header_present,
        "cookie_header_present": cookie_header_present,
        "cookie_name": cookie_name,
    })
}

/// JSON details recording whether the session token came from cookie or authorization header.
pub fn build_session_source_details(auth_source: SessionTokenSource) -> Value {
    json!({
        "auth_source": auth_source.as_str(),
    })
}

#[cfg(test)]
mod tests {
    use super::{build_missing_token_details, transport_for_path};
    use serde_json::Value;

    #[test]
    fn transport_for_wss_path_is_wss() {
        assert_eq!(transport_for_path("/wss/gateway"), "wss");
    }

    #[test]
    fn transport_for_chat_path_is_http() {
        assert_eq!(transport_for_path("/chat/rooms"), "http");
    }

    #[test]
    fn missing_token_details_capture_header_presence() {
        let details = build_missing_token_details(true, false, "athena-auth.session-token");

        assert_eq!(
            details
                .get("authorization_header_present")
                .and_then(Value::as_bool),
            Some(true)
        );
        assert_eq!(
            details
                .get("cookie_header_present")
                .and_then(Value::as_bool),
            Some(false)
        );
        assert_eq!(
            details.get("cookie_name").and_then(Value::as_str),
            Some("athena-auth.session-token")
        );
    }
}