athena_rs 3.3.0

Database gateway API
Documentation
use actix_web::HttpRequest;
use serde_json::Value;
use sqlx::{Postgres, Row, Transaction};
use std::time::{SystemTime, UNIX_EPOCH};
use uuid::Uuid;

use crate::utils::request_logging::RequestAuthLogContext;

#[derive(Debug, Clone)]
pub struct DatabaseAuditLogEntry {
    pub request_id: Uuid,
    pub client_name: String,
    pub schema_name: String,
    pub table_name: Option<String>,
    pub object_type: String,
    pub operation: String,
    pub status: String,
    pub request_payload: Value,
    pub executed_sql: Value,
    pub result_payload: Value,
    pub error_message: Option<String>,
    pub duration_ms: i64,
    pub api_key_id: Option<Uuid>,
    pub api_key_public_id: Option<String>,
    pub user_id: Option<String>,
    pub company_id: Option<String>,
    pub organization_id: Option<String>,
    pub remote_addr: Option<String>,
    pub user_agent: Option<String>,
}

fn current_epoch_seconds() -> i64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs() as i64
}

pub fn remote_addr(req: &HttpRequest) -> Option<String> {
    req.headers()
        .get("X-Real-IP")
        .and_then(|value| value.to_str().ok())
        .map(str::to_string)
        .or_else(|| req.peer_addr().map(|addr| addr.ip().to_string()))
}

pub fn user_agent(req: &HttpRequest) -> Option<String> {
    req.headers()
        .get("User-Agent")
        .and_then(|value| value.to_str().ok())
        .map(str::to_string)
}

pub async fn audit_table_exists(tx: &mut Transaction<'_, Postgres>) -> Result<bool, sqlx::Error> {
    let row = sqlx::query(
        r#"
        SELECT to_regclass('public.database_audit_log') IS NOT NULL AS exists
        "#,
    )
    .fetch_one(&mut **tx)
    .await?;

    row.try_get::<bool, _>("exists")
}

pub async fn function_ddl_audit_table_exists(
    tx: &mut Transaction<'_, Postgres>,
) -> Result<bool, sqlx::Error> {
    let row = sqlx::query(
        r#"
        SELECT to_regclass('public.function_ddl_audit_log') IS NOT NULL AS exists
        "#,
    )
    .fetch_one(&mut **tx)
    .await?;

    row.try_get::<bool, _>("exists")
}

#[derive(Debug, Clone)]
pub struct FunctionDdlAuditLogEntry {
    pub request_id: Uuid,
    pub client_name: String,
    pub schema_name: String,
    pub function_name: String,
    pub function_signature: Option<String>,
    pub action: String,
    pub status: String,
    pub ddl_sql: String,
    pub previous_definition: Value,
    pub next_definition: Value,
    pub error_message: Option<String>,
    pub duration_ms: i64,
    pub api_key_id: Option<Uuid>,
    pub api_key_public_id: Option<String>,
    pub user_id: Option<String>,
    pub company_id: Option<String>,
    pub organization_id: Option<String>,
    pub remote_addr: Option<String>,
    pub user_agent: Option<String>,
}

#[allow(clippy::too_many_arguments)]
pub fn build_function_ddl_audit_entry(
    req: &HttpRequest,
    auth: &RequestAuthLogContext,
    request_id: &str,
    client_name: &str,
    schema_name: &str,
    function_name: &str,
    function_signature: Option<String>,
    action: &str,
    status: &str,
    ddl_sql: String,
    previous_definition: Value,
    next_definition: Value,
    error_message: Option<String>,
    duration_ms: i64,
) -> FunctionDdlAuditLogEntry {
    FunctionDdlAuditLogEntry {
        request_id: Uuid::parse_str(request_id).unwrap_or_else(|_| Uuid::new_v4()),
        client_name: client_name.to_string(),
        schema_name: schema_name.to_string(),
        function_name: function_name.to_string(),
        function_signature,
        action: action.to_string(),
        status: status.to_string(),
        ddl_sql,
        previous_definition,
        next_definition,
        error_message,
        duration_ms,
        api_key_id: auth
            .api_key_id
            .as_deref()
            .and_then(|value| Uuid::parse_str(value).ok()),
        api_key_public_id: auth.presented_api_key_public_id.clone(),
        user_id: req
            .headers()
            .get("X-User-Id")
            .and_then(|value| value.to_str().ok())
            .map(str::to_string),
        company_id: req
            .headers()
            .get("X-Company-Id")
            .and_then(|value| value.to_str().ok())
            .map(str::to_string),
        organization_id: req
            .headers()
            .get("X-Organization-Id")
            .and_then(|value| value.to_str().ok())
            .map(str::to_string),
        remote_addr: remote_addr(req),
        user_agent: user_agent(req),
    }
}

#[allow(clippy::too_many_arguments)]
pub fn build_audit_entry(
    req: &HttpRequest,
    auth: &RequestAuthLogContext,
    client_name: &str,
    schema_name: &str,
    table_name: Option<&str>,
    object_type: &str,
    operation: &str,
    status: &str,
    request_payload: Value,
    executed_sql: Vec<String>,
    result_payload: Value,
    error_message: Option<String>,
    duration_ms: i64,
    request_id: &str,
) -> DatabaseAuditLogEntry {
    DatabaseAuditLogEntry {
        request_id: Uuid::parse_str(request_id).unwrap_or_else(|_| Uuid::new_v4()),
        client_name: client_name.to_string(),
        schema_name: schema_name.to_string(),
        table_name: table_name.map(str::to_string),
        object_type: object_type.to_string(),
        operation: operation.to_string(),
        status: status.to_string(),
        request_payload,
        executed_sql: Value::Array(executed_sql.into_iter().map(Value::String).collect()),
        result_payload,
        error_message,
        duration_ms,
        api_key_id: auth
            .api_key_id
            .as_deref()
            .and_then(|value| Uuid::parse_str(value).ok()),
        api_key_public_id: auth.presented_api_key_public_id.clone(),
        user_id: req
            .headers()
            .get("X-User-Id")
            .and_then(|value| value.to_str().ok())
            .map(str::to_string),
        company_id: req
            .headers()
            .get("X-Company-Id")
            .and_then(|value| value.to_str().ok())
            .map(str::to_string),
        organization_id: req
            .headers()
            .get("X-Organization-Id")
            .and_then(|value| value.to_str().ok())
            .map(str::to_string),
        remote_addr: remote_addr(req),
        user_agent: user_agent(req),
    }
}

pub async fn insert_database_audit_log(
    tx: &mut Transaction<'_, Postgres>,
    entry: &DatabaseAuditLogEntry,
) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        INSERT INTO database_audit_log
            (
                database_audit_log_id,
                request_id,
                client_name,
                schema_name,
                table_name,
                object_type,
                operation,
                status,
                api_key_id,
                api_key_public_id,
                user_id,
                company_id,
                organization_id,
                request_payload,
                executed_sql,
                result_payload,
                error_message,
                remote_addr,
                user_agent,
                duration_ms,
                time
            )
        VALUES
            (
                $1,
                $2,
                $3,
                $4,
                $5,
                $6,
                $7,
                $8,
                $9,
                $10,
                $11,
                $12,
                $13,
                $14,
                $15,
                $16,
                $17,
                $18,
                $19,
                $20,
                $21
            )
        "#,
    )
    .bind(Uuid::new_v4())
    .bind(entry.request_id)
    .bind(&entry.client_name)
    .bind(&entry.schema_name)
    .bind(&entry.table_name)
    .bind(&entry.object_type)
    .bind(&entry.operation)
    .bind(&entry.status)
    .bind(entry.api_key_id)
    .bind(&entry.api_key_public_id)
    .bind(&entry.user_id)
    .bind(&entry.company_id)
    .bind(&entry.organization_id)
    .bind(&entry.request_payload)
    .bind(&entry.executed_sql)
    .bind(&entry.result_payload)
    .bind(&entry.error_message)
    .bind(&entry.remote_addr)
    .bind(&entry.user_agent)
    .bind(entry.duration_ms)
    .bind(current_epoch_seconds())
    .execute(&mut **tx)
    .await?;

    Ok(())
}

pub async fn insert_function_ddl_audit_log(
    tx: &mut Transaction<'_, Postgres>,
    entry: &FunctionDdlAuditLogEntry,
) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        INSERT INTO function_ddl_audit_log
            (
                function_ddl_audit_log_id,
                request_id,
                client_name,
                schema_name,
                function_name,
                function_signature,
                action,
                status,
                ddl_sql,
                previous_definition,
                next_definition,
                error_message,
                api_key_id,
                api_key_public_id,
                user_id,
                company_id,
                organization_id,
                remote_addr,
                user_agent,
                duration_ms,
                time
            )
        VALUES
            (
                $1,
                $2,
                $3,
                $4,
                $5,
                $6,
                $7,
                $8,
                $9,
                $10,
                $11,
                $12,
                $13,
                $14,
                $15,
                $16,
                $17,
                $18,
                $19,
                $20,
                $21
            )
        "#,
    )
    .bind(Uuid::new_v4())
    .bind(entry.request_id)
    .bind(&entry.client_name)
    .bind(&entry.schema_name)
    .bind(&entry.function_name)
    .bind(&entry.function_signature)
    .bind(&entry.action)
    .bind(&entry.status)
    .bind(&entry.ddl_sql)
    .bind(&entry.previous_definition)
    .bind(&entry.next_definition)
    .bind(&entry.error_message)
    .bind(entry.api_key_id)
    .bind(&entry.api_key_public_id)
    .bind(&entry.user_id)
    .bind(&entry.company_id)
    .bind(&entry.organization_id)
    .bind(&entry.remote_addr)
    .bind(&entry.user_agent)
    .bind(entry.duration_ms)
    .bind(current_epoch_seconds())
    .execute(&mut **tx)
    .await?;

    Ok(())
}