use actix_web::HttpRequest;
use athena_actix::headers::request_context::athena_request_id;
use athena_auth_core::{SessionTokenSource, ValidatedSessionContext};
use serde_json::{Value, json};
use sqlx::postgres::PgPool;
use tracing::error;
use uuid::Uuid;
use crate::AppState;
use crate::api::gateway::auth::extract_client_ip;
use crate::utils::best_effort_pg_backoff::{
best_effort_pg_write_backoff_active, maybe_activate_best_effort_pg_write_backoff,
};
use crate::utils::logging_task_limiter::spawn_best_effort_logging_task;
const CHAT_SESSION_AUTH_PG_LOGGING_BACKOFF_DOMAIN: &str = "chat_session_auth_pg_logging";
#[derive(Debug, Clone, Copy)]
pub enum ChatSessionAuthOutcome {
Authenticated,
MissingSessionToken,
InvalidSession,
MissingOrganization,
AuthStoreUnavailable,
}
impl ChatSessionAuthOutcome {
fn as_str(self) -> &'static str {
match self {
Self::Authenticated => "authenticated",
Self::MissingSessionToken => "missing_session_token",
Self::InvalidSession => "invalid_session",
Self::MissingOrganization => "missing_organization",
Self::AuthStoreUnavailable => "auth_store_unavailable",
}
}
}
#[derive(Debug, Clone)]
struct ChatSessionAuthLogEntry {
request_id: String,
client_name: Option<String>,
transport: &'static str,
method: String,
path: String,
auth_source: Option<String>,
outcome: &'static str,
failure_reason: Option<String>,
user_id: Option<String>,
session_id: Option<String>,
organization_id: Option<String>,
remote_addr: Option<String>,
user_agent: Option<String>,
details: Value,
}
pub fn spawn_chat_session_auth_log(
state: &AppState,
req: &HttpRequest,
client_name: Option<&str>,
auth_source: Option<SessionTokenSource>,
validated_session: Option<&ValidatedSessionContext>,
outcome: ChatSessionAuthOutcome,
failure_reason: Option<String>,
details: Value,
) {
let pool = state
.logging_client_name
.as_ref()
.and_then(|logging_client| state.pg_registry.get_pool(logging_client));
if pool.is_none() {
return;
}
if best_effort_pg_write_backoff_active(CHAT_SESSION_AUTH_PG_LOGGING_BACKOFF_DOMAIN) {
return;
}
let entry = build_log_entry(
state,
req,
client_name,
auth_source,
validated_session,
outcome,
failure_reason,
details,
);
let limiter = state.logging_task_limiter.clone();
spawn_best_effort_logging_task(limiter, "chat_session_auth_log", async move {
let Some(pool) = pool else {
return;
};
if let Err(err) = insert_chat_session_auth_log(&pool, entry).await {
let err_message = err.to_string();
if maybe_activate_best_effort_pg_write_backoff(
CHAT_SESSION_AUTH_PG_LOGGING_BACKOFF_DOMAIN,
"chat_session_auth_log",
&err_message,
)
.is_none()
{
error!(error = %err, "failed to write chat session auth log");
}
}
});
}
fn build_log_entry(
state: &AppState,
req: &HttpRequest,
client_name: Option<&str>,
auth_source: Option<SessionTokenSource>,
validated_session: Option<&ValidatedSessionContext>,
outcome: ChatSessionAuthOutcome,
failure_reason: Option<String>,
details: Value,
) -> ChatSessionAuthLogEntry {
ChatSessionAuthLogEntry {
request_id: athena_request_id(req).unwrap_or_else(|| Uuid::new_v4().to_string()),
client_name: client_name.map(str::to_string),
transport: transport_for_path(req.path()),
method: req.method().as_str().to_string(),
path: req.path().to_string(),
auth_source: auth_source.map(|source| source.as_str().to_string()),
outcome: outcome.as_str(),
failure_reason,
user_id: validated_session.map(|session| session.user_id.clone()),
session_id: validated_session.map(|session| session.session_id.clone()),
organization_id: validated_session.and_then(resolved_organization_id),
remote_addr: extract_client_ip(req, state.logging_trust_x_forwarded_for)
.map(|ip| ip.to_string()),
user_agent: req
.headers()
.get("user-agent")
.and_then(|value| value.to_str().ok())
.map(str::to_string),
details,
}
}
fn resolved_organization_id(session: &ValidatedSessionContext) -> Option<String> {
session
.active_organization_id
.clone()
.or_else(|| session.organization_ids.first().cloned())
}
fn transport_for_path(path: &str) -> &'static str {
if path.starts_with("/wss/") {
"wss"
} else {
"http"
}
}
async fn insert_chat_session_auth_log(
pool: &PgPool,
entry: ChatSessionAuthLogEntry,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
INSERT INTO athena.chat_session_auth_log (
request_id,
client_name,
transport,
method,
path,
auth_source,
outcome,
failure_reason,
user_id,
session_id,
organization_id,
remote_addr,
user_agent,
details
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
"#,
)
.bind(entry.request_id)
.bind(entry.client_name)
.bind(entry.transport)
.bind(entry.method)
.bind(entry.path)
.bind(entry.auth_source)
.bind(entry.outcome)
.bind(entry.failure_reason)
.bind(entry.user_id)
.bind(entry.session_id)
.bind(entry.organization_id)
.bind(entry.remote_addr)
.bind(entry.user_agent)
.bind(entry.details)
.execute(pool)
.await?;
Ok(())
}
pub fn build_missing_token_details(
authorization_header_present: bool,
cookie_header_present: bool,
cookie_name: &str,
) -> Value {
json!({
"authorization_header_present": authorization_header_present,
"cookie_header_present": cookie_header_present,
"cookie_name": cookie_name,
})
}
pub fn build_session_source_details(auth_source: SessionTokenSource) -> Value {
json!({
"auth_source": auth_source.as_str(),
})
}
#[cfg(test)]
mod tests {
use super::{build_missing_token_details, transport_for_path};
use serde_json::Value;
#[test]
fn transport_for_wss_path_is_wss() {
assert_eq!(transport_for_path("/wss/gateway"), "wss");
}
#[test]
fn transport_for_chat_path_is_http() {
assert_eq!(transport_for_path("/chat/rooms"), "http");
}
#[test]
fn missing_token_details_capture_header_presence() {
let details = build_missing_token_details(true, false, "athena-auth.session-token");
assert_eq!(
details
.get("authorization_header_present")
.and_then(Value::as_bool),
Some(true)
);
assert_eq!(
details
.get("cookie_header_present")
.and_then(Value::as_bool),
Some(false)
);
assert_eq!(
details.get("cookie_name").and_then(Value::as_str),
Some("athena-auth.session-token")
);
}
}