use actix_web::HttpRequest;
use serde_json::Value;
use sqlx::{Postgres, Row, Transaction};
use std::time::{SystemTime, UNIX_EPOCH};
use uuid::Uuid;
use crate::utils::request_logging::RequestAuthLogContext;
#[derive(Debug, Clone)]
pub struct DatabaseAuditLogEntry {
pub request_id: Uuid,
pub client_name: String,
pub schema_name: String,
pub table_name: Option<String>,
pub object_type: String,
pub operation: String,
pub status: String,
pub request_payload: Value,
pub executed_sql: Value,
pub result_payload: Value,
pub error_message: Option<String>,
pub duration_ms: i64,
pub api_key_id: Option<Uuid>,
pub api_key_public_id: Option<String>,
pub user_id: Option<String>,
pub company_id: Option<String>,
pub organization_id: Option<String>,
pub remote_addr: Option<String>,
pub user_agent: Option<String>,
}
fn current_epoch_seconds() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64
}
pub fn remote_addr(req: &HttpRequest) -> Option<String> {
req.headers()
.get("X-Real-IP")
.and_then(|value| value.to_str().ok())
.map(str::to_string)
.or_else(|| req.peer_addr().map(|addr| addr.ip().to_string()))
}
pub fn user_agent(req: &HttpRequest) -> Option<String> {
req.headers()
.get("User-Agent")
.and_then(|value| value.to_str().ok())
.map(str::to_string)
}
pub async fn audit_table_exists(tx: &mut Transaction<'_, Postgres>) -> Result<bool, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT to_regclass('public.database_audit_log') IS NOT NULL AS exists
"#,
)
.fetch_one(&mut **tx)
.await?;
row.try_get::<bool, _>("exists")
}
pub async fn function_ddl_audit_table_exists(
tx: &mut Transaction<'_, Postgres>,
) -> Result<bool, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT to_regclass('public.function_ddl_audit_log') IS NOT NULL AS exists
"#,
)
.fetch_one(&mut **tx)
.await?;
row.try_get::<bool, _>("exists")
}
#[derive(Debug, Clone)]
pub struct FunctionDdlAuditLogEntry {
pub request_id: Uuid,
pub client_name: String,
pub schema_name: String,
pub function_name: String,
pub function_signature: Option<String>,
pub action: String,
pub status: String,
pub ddl_sql: String,
pub previous_definition: Value,
pub next_definition: Value,
pub error_message: Option<String>,
pub duration_ms: i64,
pub api_key_id: Option<Uuid>,
pub api_key_public_id: Option<String>,
pub user_id: Option<String>,
pub company_id: Option<String>,
pub organization_id: Option<String>,
pub remote_addr: Option<String>,
pub user_agent: Option<String>,
}
#[allow(clippy::too_many_arguments)]
pub fn build_function_ddl_audit_entry(
req: &HttpRequest,
auth: &RequestAuthLogContext,
request_id: &str,
client_name: &str,
schema_name: &str,
function_name: &str,
function_signature: Option<String>,
action: &str,
status: &str,
ddl_sql: String,
previous_definition: Value,
next_definition: Value,
error_message: Option<String>,
duration_ms: i64,
) -> FunctionDdlAuditLogEntry {
FunctionDdlAuditLogEntry {
request_id: Uuid::parse_str(request_id).unwrap_or_else(|_| Uuid::new_v4()),
client_name: client_name.to_string(),
schema_name: schema_name.to_string(),
function_name: function_name.to_string(),
function_signature,
action: action.to_string(),
status: status.to_string(),
ddl_sql,
previous_definition,
next_definition,
error_message,
duration_ms,
api_key_id: auth
.api_key_id
.as_deref()
.and_then(|value| Uuid::parse_str(value).ok()),
api_key_public_id: auth.presented_api_key_public_id.clone(),
user_id: req
.headers()
.get("X-User-Id")
.and_then(|value| value.to_str().ok())
.map(str::to_string),
company_id: req
.headers()
.get("X-Company-Id")
.and_then(|value| value.to_str().ok())
.map(str::to_string),
organization_id: req
.headers()
.get("X-Organization-Id")
.and_then(|value| value.to_str().ok())
.map(str::to_string),
remote_addr: remote_addr(req),
user_agent: user_agent(req),
}
}
#[allow(clippy::too_many_arguments)]
pub fn build_audit_entry(
req: &HttpRequest,
auth: &RequestAuthLogContext,
client_name: &str,
schema_name: &str,
table_name: Option<&str>,
object_type: &str,
operation: &str,
status: &str,
request_payload: Value,
executed_sql: Vec<String>,
result_payload: Value,
error_message: Option<String>,
duration_ms: i64,
request_id: &str,
) -> DatabaseAuditLogEntry {
DatabaseAuditLogEntry {
request_id: Uuid::parse_str(request_id).unwrap_or_else(|_| Uuid::new_v4()),
client_name: client_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.map(str::to_string),
object_type: object_type.to_string(),
operation: operation.to_string(),
status: status.to_string(),
request_payload,
executed_sql: Value::Array(executed_sql.into_iter().map(Value::String).collect()),
result_payload,
error_message,
duration_ms,
api_key_id: auth
.api_key_id
.as_deref()
.and_then(|value| Uuid::parse_str(value).ok()),
api_key_public_id: auth.presented_api_key_public_id.clone(),
user_id: req
.headers()
.get("X-User-Id")
.and_then(|value| value.to_str().ok())
.map(str::to_string),
company_id: req
.headers()
.get("X-Company-Id")
.and_then(|value| value.to_str().ok())
.map(str::to_string),
organization_id: req
.headers()
.get("X-Organization-Id")
.and_then(|value| value.to_str().ok())
.map(str::to_string),
remote_addr: remote_addr(req),
user_agent: user_agent(req),
}
}
pub async fn insert_database_audit_log(
tx: &mut Transaction<'_, Postgres>,
entry: &DatabaseAuditLogEntry,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
INSERT INTO database_audit_log
(
database_audit_log_id,
request_id,
client_name,
schema_name,
table_name,
object_type,
operation,
status,
api_key_id,
api_key_public_id,
user_id,
company_id,
organization_id,
request_payload,
executed_sql,
result_payload,
error_message,
remote_addr,
user_agent,
duration_ms,
time
)
VALUES
(
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8,
$9,
$10,
$11,
$12,
$13,
$14,
$15,
$16,
$17,
$18,
$19,
$20,
$21
)
"#,
)
.bind(Uuid::new_v4())
.bind(entry.request_id)
.bind(&entry.client_name)
.bind(&entry.schema_name)
.bind(&entry.table_name)
.bind(&entry.object_type)
.bind(&entry.operation)
.bind(&entry.status)
.bind(entry.api_key_id)
.bind(&entry.api_key_public_id)
.bind(&entry.user_id)
.bind(&entry.company_id)
.bind(&entry.organization_id)
.bind(&entry.request_payload)
.bind(&entry.executed_sql)
.bind(&entry.result_payload)
.bind(&entry.error_message)
.bind(&entry.remote_addr)
.bind(&entry.user_agent)
.bind(entry.duration_ms)
.bind(current_epoch_seconds())
.execute(&mut **tx)
.await?;
Ok(())
}
pub async fn insert_function_ddl_audit_log(
tx: &mut Transaction<'_, Postgres>,
entry: &FunctionDdlAuditLogEntry,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
INSERT INTO function_ddl_audit_log
(
function_ddl_audit_log_id,
request_id,
client_name,
schema_name,
function_name,
function_signature,
action,
status,
ddl_sql,
previous_definition,
next_definition,
error_message,
api_key_id,
api_key_public_id,
user_id,
company_id,
organization_id,
remote_addr,
user_agent,
duration_ms,
time
)
VALUES
(
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8,
$9,
$10,
$11,
$12,
$13,
$14,
$15,
$16,
$17,
$18,
$19,
$20,
$21
)
"#,
)
.bind(Uuid::new_v4())
.bind(entry.request_id)
.bind(&entry.client_name)
.bind(&entry.schema_name)
.bind(&entry.function_name)
.bind(&entry.function_signature)
.bind(&entry.action)
.bind(&entry.status)
.bind(&entry.ddl_sql)
.bind(&entry.previous_definition)
.bind(&entry.next_definition)
.bind(&entry.error_message)
.bind(entry.api_key_id)
.bind(&entry.api_key_public_id)
.bind(&entry.user_id)
.bind(&entry.company_id)
.bind(&entry.organization_id)
.bind(&entry.remote_addr)
.bind(&entry.user_agent)
.bind(entry.duration_ms)
.bind(current_epoch_seconds())
.execute(&mut **tx)
.await?;
Ok(())
}