athena_rs 0.83.0

Database gateway API
Documentation
use actix_web::{HttpRequest, HttpResponse, http::StatusCode};
use serde_json::{Value, json};
use sqlx::postgres::PgPool;
use sqlx::{Pool, Postgres};
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::error;
use uuid::Uuid;

use crate::AppState;
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::headers::{x_company_id, x_organization_id, x_user_id};
use crate::drivers::supabase::supabase;

/// Structured metadata returned to callers so downstream handlers can correlate their work.
pub struct LoggedRequest {
    pub request_id: String,
    pub client_name: String,
    pub method: String,
    pub path: String,
    pub status_code: u16,
    pub time: i64,
}

#[derive(Clone, Debug, Default)]
pub struct RequestAuthLogContext {
    pub api_key_id: Option<String>,
    pub presented_api_key_public_id: Option<String>,
    pub presented_api_key_hash: Option<String>,
    pub presented_api_key_salt: Option<String>,
    pub api_key_authenticated: bool,
    pub api_key_authorized: bool,
    pub api_key_enforced: bool,
    pub api_key_auth_reason: Option<String>,
}

/// Log the incoming HTTP request to Supabase (legacy behavior) and Postgres when enabled.
pub fn log_request(
    req: HttpRequest,
    state: Option<&AppState>,
    request_id: Option<String>,
    auth_context: Option<&RequestAuthLogContext>,
) -> LoggedRequest {
    let client_name: String = x_athena_client(&req);
    let user_id: String = x_user_id::get_x_user_id(&req).unwrap_or_else(|| "NOT_FOUND".to_string());
    let company_id: String =
        x_company_id::get_x_company_id(&req).unwrap_or_else(|| "NOT_FOUND".to_string());
    let organization_id: String =
        x_organization_id::get_x_organization_id(&req).unwrap_or_else(|| "NOT_FOUND".to_string());

    let request_id: String = request_id.unwrap_or_else(|| Uuid::new_v4().to_string());
    let auth_context = auth_context.cloned().unwrap_or_default();
    let body_json: Value = req
        .match_info()
        .get("body")
        .and_then(|body| serde_json::from_str(body).ok())
        .unwrap_or_else(|| json!({}));

    let query_string: &str = req.query_string();
    let query_string: String = if query_string.is_empty() {
        "NOT_FOUND".to_string()
    } else {
        query_string.to_string()
    };

    let headers: HashMap<String, String> = req
        .headers()
        .iter()
        .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
        .collect();

    let mut headers_map: serde_json::Map<String, Value> = serde_json::Map::new();
    for (key, value) in headers.iter() {
        headers_map.insert(key.clone(), Value::String(value.clone()));
    }
    let headers_json: Value = Value::Object(headers_map);

    let method: String = req.method().as_str().to_string();
    let path: String = req.path().to_string();
    let status_http_code: u16 = req
        .app_data::<HttpResponse>()
        .map(|resp| resp.status().as_u16())
        .unwrap_or(0);

    let x_real_ip: String = headers
        .get("X-Real-IP")
        .unwrap_or(&"NOT_FOUND".to_string())
        .to_string();

    let user_agent: String = req
        .headers()
        .get("User-Agent")
        .and_then(|v| v.to_str().ok())
        .unwrap_or_else(|| "NOT_FOUND")
        .to_string();

    if user_agent
        == "Better Stack Better Uptime Bot Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
    {
        return LoggedRequest {
            request_id,
            client_name,
            method,
            path,
            status_code: status_http_code,
            time: 0,
        };
    }

    let time: i64 = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_else(|_| std::time::Duration::new(0, 0))
        .as_secs() as i64;

    let host: String = req
        .headers()
        .get("Host")
        .and_then(|v| v.to_str().ok())
        .unwrap_or_else(|| "NOT_FOUND")
        .to_string();

    let cache_control: String = req
        .headers()
        .get("Cache-Control")
        .and_then(|v| v.to_str().ok())
        .unwrap_or_else(|| "NOT_FOUND")
        .to_string();

    let cached: bool = cache_control != "no-cache";

    let request_data: Value = json!({
        "object": "request",
        "request_id": request_id,
        "user_id": user_id,
        "company_id": company_id,
        "api_key_id": auth_context.api_key_id.clone(),
        "presented_api_key_public_id": auth_context.presented_api_key_public_id.clone(),
        "presented_api_key_hash": auth_context.presented_api_key_hash.clone(),
        "presented_api_key_salt": auth_context.presented_api_key_salt.clone(),
        "api_key_authenticated": auth_context.api_key_authenticated,
        "api_key_authorized": auth_context.api_key_authorized,
        "api_key_enforced": auth_context.api_key_enforced,
        "api_key_auth_reason": auth_context.api_key_auth_reason.clone(),
        "body": body_json.clone(),
        "query_string": query_string,
        "headers": headers_json.clone(),
        "method": method,
        "path": path,
        "http_code": status_http_code,
        "ipv4": x_real_ip,
        "user_agent": user_agent,
        "time": time,
        "host": host,
        "cached": cached
    });

    if let Some(state) = state {
        if let Some(logging_client) = state.logging_client_name.as_ref() {
            if let Some(pool) = state.pg_registry.get_pool(logging_client) {
                let entry: RequestLogEntry = RequestLogEntry {
                    request_id: request_id.clone(),
                    client: client_name.clone(),
                    method: method.clone(),
                    path: path.clone(),
                    query_string: query_string.clone(),
                    status_code: status_http_code as i32,
                    ipv4: x_real_ip.clone(),
                    user_agent: user_agent.clone(),
                    headers: headers_json.clone(),
                    body: body_json.clone(),
                    user_id: user_id.clone(),
                    company_id: company_id.clone(),
                    organization_id: organization_id.clone(),
                    api_key_id: auth_context.api_key_id.clone(),
                    presented_api_key_public_id: auth_context.presented_api_key_public_id.clone(),
                    presented_api_key_hash: auth_context.presented_api_key_hash.clone(),
                    presented_api_key_salt: auth_context.presented_api_key_salt.clone(),
                    api_key_authenticated: auth_context.api_key_authenticated,
                    api_key_authorized: auth_context.api_key_authorized,
                    api_key_enforced: auth_context.api_key_enforced,
                    api_key_auth_reason: auth_context.api_key_auth_reason.clone(),
                    host: host.clone(),
                    cached,
                    time,
                };
                let pool_clone: Pool<Postgres> = pool.clone();
                actix_web::rt::spawn(async move {
                    if let Err(err) = insert_request_log(pool_clone, entry).await {
                        error!(error = %err, "failed to write gateway request log");
                    }
                });
            }
        }
    }

    let request_data_clone: Value = request_data.clone();
    actix_web::rt::spawn(async move {
        if let Ok(client) = supabase().await {
            let _ = client
                .insert("event_log_api", request_data_clone.clone())
                .await;
        }
    });

    LoggedRequest {
        request_id,
        client_name,
        method,
        path,
        status_code: status_http_code,
        time,
    }
}

/// Logs gateway operations (insert/fetch/update/delete/query/ping) to Postgres when configured.
pub fn log_operation_event(
    state: Option<&AppState>,
    request: &LoggedRequest,
    operation: &str,
    table_name: Option<&str>,
    duration_ms: u128,
    status: StatusCode,
    details: Option<Value>,
) {
    if let Some(state) = state {
        if let Some(logging_client) = state.logging_client_name.as_ref() {
            if let Some(pool) = state.pg_registry.get_pool(logging_client) {
                let details_value: Value = details.unwrap_or_else(|| json!({}));

                // Determine if this is an error based on status code
                let is_error: bool = status.is_client_error() || status.is_server_error();

                // Extract message from details JSON if present
                let message: Option<String> = details_value
                    .get("message")
                    .and_then(|v| v.as_str())
                    .or_else(|| details_value.get("error").and_then(|v| v.as_str()))
                    .or_else(|| details_value.get("error_code").and_then(|v| v.as_str()))
                    .map(|s| s.to_string());

                // Extract cache_key from details JSON if present
                let cache_key: Option<String> = details_value
                    .get("cache_key")
                    .and_then(|v| v.as_str())
                    .map(|s| s.to_string());

                let entry: OperationLogEntry = OperationLogEntry {
                    request_id: request.request_id.clone(),
                    operation: operation.to_string(),
                    table_name: table_name.map(|value| value.to_string()),
                    client: request.client_name.clone(),
                    method: request.method.clone(),
                    path: request.path.clone(),
                    status_code: status.as_u16() as i32,
                    duration_ms: duration_ms as i64,
                    details: details_value,
                    time: SystemTime::now()
                        .duration_since(UNIX_EPOCH)
                        .unwrap_or_else(|_| std::time::Duration::new(0, 0))
                        .as_secs() as i64,
                    error: is_error,
                    message,
                    cache_key,
                };
                let pool_clone: sqlx::Pool<sqlx::Postgres> = pool.clone();
                let operation_name: String = operation.to_string();
                actix_web::rt::spawn(async move {
                    if let Err(err) = insert_operation_log(pool_clone, entry).await {
                        error!(error = %err, operation = operation_name, "failed to write gateway operation log");
                    }
                });
            }
        }
    }
}

struct RequestLogEntry {
    request_id: String,
    client: String,
    method: String,
    path: String,
    query_string: String,
    status_code: i32,
    ipv4: String,
    user_agent: String,
    headers: Value,
    body: Value,
    user_id: String,
    company_id: String,
    organization_id: String,
    api_key_id: Option<String>,
    presented_api_key_public_id: Option<String>,
    presented_api_key_hash: Option<String>,
    presented_api_key_salt: Option<String>,
    api_key_authenticated: bool,
    api_key_authorized: bool,
    api_key_enforced: bool,
    api_key_auth_reason: Option<String>,
    host: String,
    cached: bool,
    time: i64,
}

async fn insert_request_log(pool: PgPool, entry: RequestLogEntry) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        INSERT INTO gateway_request_log
            (request_id, client, method, path, query_string, status_code, ipv4,
             user_agent, headers, body, user_id, company_id, organization_id,
             api_key_id, presented_api_key_public_id, presented_api_key_hash,
             presented_api_key_salt, api_key_authenticated, api_key_authorized,
             api_key_enforced, api_key_auth_reason, host, cached, time)
        VALUES
            ($1, $2, $3, $4, $5, $6, $7,
             $8, $9, $10, $11, $12, $13, $14,
             $15, $16, $17, $18, $19, $20,
             $21, $22, $23, $24)
        "#,
    )
    .bind(entry.request_id)
    .bind(entry.client)
    .bind(entry.method)
    .bind(entry.path)
    .bind(entry.query_string)
    .bind(entry.status_code)
    .bind(entry.ipv4)
    .bind(entry.user_agent)
    .bind(entry.headers)
    .bind(entry.body)
    .bind(entry.user_id)
    .bind(entry.company_id)
    .bind(entry.organization_id)
    .bind(entry.api_key_id)
    .bind(entry.presented_api_key_public_id)
    .bind(entry.presented_api_key_hash)
    .bind(entry.presented_api_key_salt)
    .bind(entry.api_key_authenticated)
    .bind(entry.api_key_authorized)
    .bind(entry.api_key_enforced)
    .bind(entry.api_key_auth_reason)
    .bind(entry.host)
    .bind(entry.cached)
    .bind(entry.time)
    .execute(&pool)
    .await?;
    Ok(())
}

struct OperationLogEntry {
    request_id: String,
    operation: String,
    table_name: Option<String>,
    client: String,
    method: String,
    path: String,
    status_code: i32,
    duration_ms: i64,
    details: Value,
    time: i64,
    error: bool,
    message: Option<String>,
    cache_key: Option<String>,
}

async fn insert_operation_log(pool: PgPool, entry: OperationLogEntry) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        INSERT INTO gateway_operation_log
            (request_id, operation, table_name, client, method, path, status_code,
             duration_ms, details, time, error, message, cache_key)
        VALUES
            ($1, $2, $3, $4, $5, $6, $7,
             $8, $9, $10, $11, $12, $13)
        "#,
    )
    .bind(entry.request_id)
    .bind(entry.operation)
    .bind(entry.table_name)
    .bind(entry.client)
    .bind(entry.method)
    .bind(entry.path)
    .bind(entry.status_code)
    .bind(entry.duration_ms)
    .bind(entry.details)
    .bind(entry.time)
    .bind(entry.error)
    .bind(entry.message)
    .bind(entry.cache_key)
    .execute(&pool)
    .await?;
    Ok(())
}