use actix_web::{HttpRequest, HttpResponse, http::StatusCode};
use serde_json::{Value, json, Map};
use sqlx::postgres::PgPool;
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::error;
use uuid::Uuid;
use crate::AppState;
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::headers::{x_company_id, x_organization_id, x_user_id};
use crate::drivers::supabase::supabase;
pub struct LoggedRequest {
pub request_id: String,
pub client_name: String,
pub method: String,
pub path: String,
pub status_code: u16,
pub time: i64,
}
pub fn log_request(req: HttpRequest, state: Option<&AppState>) -> LoggedRequest {
let client_name: String = x_athena_client(&req);
let user_id: String = x_user_id::get_x_user_id(&req).unwrap_or_else(|| "NOT_FOUND".to_string());
let company_id: String =
x_company_id::get_x_company_id(&req).unwrap_or_else(|| "NOT_FOUND".to_string());
let organization_id: String =
x_organization_id::get_x_organization_id(&req).unwrap_or_else(|| "NOT_FOUND".to_string());
let request_id: String = Uuid::new_v4().to_string();
let body_json: Value = req
.match_info()
.get("body")
.and_then(|body| serde_json::from_str(body).ok())
.unwrap_or_else(|| json!({}));
let query_string: &str = req.query_string();
let query_string: String = if query_string.is_empty() {
"NOT_FOUND".to_string()
} else {
query_string.to_string()
};
let headers: HashMap<String, String> = req
.headers()
.iter()
.map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
.collect();
let mut headers_map: serde_json::Map<String, Value> = serde_json::Map::new();
for (key, value) in headers.iter() {
headers_map.insert(key.clone(), Value::String(value.clone()));
}
let headers_json: Value = Value::Object(headers_map);
let method: String = req.method().as_str().to_string();
let path: String = req.path().to_string();
let status_http_code: u16 = req
.app_data::<HttpResponse>()
.map(|resp| resp.status().as_u16())
.unwrap_or(0);
let x_real_ip: String = headers
.get("X-Real-IP")
.unwrap_or(&"NOT_FOUND".to_string())
.to_string();
let user_agent: String = req
.headers()
.get("User-Agent")
.and_then(|v| v.to_str().ok())
.unwrap_or_else(|| "NOT_FOUND")
.to_string();
if user_agent
== "Better Stack Better Uptime Bot Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
{
return LoggedRequest {
request_id,
client_name,
method,
path,
status_code: status_http_code,
time: 0,
};
}
let time: i64 = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| std::time::Duration::new(0, 0))
.as_secs() as i64;
let host: String = req
.headers()
.get("Host")
.and_then(|v| v.to_str().ok())
.unwrap_or_else(|| "NOT_FOUND")
.to_string();
let cache_control: String = req
.headers()
.get("Cache-Control")
.and_then(|v| v.to_str().ok())
.unwrap_or_else(|| "NOT_FOUND")
.to_string();
let cached: bool = cache_control != "no-cache";
let request_data: Value = json!({
"object": "request",
"request_id": request_id,
"user_id": user_id,
"company_id": company_id,
"body": body_json.clone(),
"query_string": query_string,
"headers": headers_json.clone(),
"method": method,
"path": path,
"http_code": status_http_code,
"ipv4": x_real_ip,
"user_agent": user_agent,
"time": time,
"host": host,
"cached": cached
});
if let Some(state) = state {
if let Some(logging_client) = state.logging_client_name.as_ref() {
if let Some(pool) = state.pg_registry.get_pool(logging_client) {
let entry: RequestLogEntry = RequestLogEntry {
request_id: request_id.clone(),
client: client_name.clone(),
method: method.clone(),
path: path.clone(),
query_string: query_string.clone(),
status_code: status_http_code as i32,
ipv4: x_real_ip.clone(),
user_agent: user_agent.clone(),
headers: headers_json.clone(),
body: body_json.clone(),
user_id: user_id.clone(),
company_id: company_id.clone(),
organization_id: organization_id.clone(),
host: host.clone(),
cached,
time,
};
let pool_clone: sqlx::Pool<sqlx::Postgres> = pool.clone();
actix_web::rt::spawn(async move {
if let Err(err) = insert_request_log(pool_clone, entry).await {
error!(error = %err, "failed to write gateway request log");
}
});
}
}
}
let request_data_clone: Value = request_data.clone();
actix_web::rt::spawn(async move {
if let Ok(client) = supabase().await {
let _ = client
.insert("event_log_api", request_data_clone.clone())
.await;
}
});
LoggedRequest {
request_id,
client_name,
method,
path,
status_code: status_http_code,
time,
}
}
pub fn log_operation_event(
state: Option<&AppState>,
request: &LoggedRequest,
operation: &str,
table_name: Option<&str>,
duration_ms: u128,
status: StatusCode,
details: Option<Value>,
) {
if let Some(state) = state {
if let Some(logging_client) = state.logging_client_name.as_ref() {
if let Some(pool) = state.pg_registry.get_pool(logging_client) {
let details_value = details.unwrap_or_else(|| json!({}));
let is_error = status.is_client_error() || status.is_server_error();
let message = details_value.get("message")
.and_then(|v| v.as_str())
.or_else(|| details_value.get("error").and_then(|v| v.as_str()))
.or_else(|| details_value.get("error_code").and_then(|v| v.as_str()))
.map(|s| s.to_string());
let cache_key = details_value.get("cache_key")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let entry = OperationLogEntry {
request_id: request.request_id.clone(),
operation: operation.to_string(),
table_name: table_name.map(|value| value.to_string()),
client: request.client_name.clone(),
method: request.method.clone(),
path: request.path.clone(),
status_code: status.as_u16() as i32,
duration_ms: duration_ms as i64,
details: details_value,
time: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| std::time::Duration::new(0, 0))
.as_secs() as i64,
error: is_error,
message,
cache_key,
};
let pool_clone = pool.clone();
let operation_name = operation.to_string();
actix_web::rt::spawn(async move {
if let Err(err) = insert_operation_log(pool_clone, entry).await {
error!(error = %err, operation = operation_name, "failed to write gateway operation log");
}
});
}
}
}
}
struct RequestLogEntry {
request_id: String,
client: String,
method: String,
path: String,
query_string: String,
status_code: i32,
ipv4: String,
user_agent: String,
headers: Value,
body: Value,
user_id: String,
company_id: String,
organization_id: String,
host: String,
cached: bool,
time: i64,
}
async fn insert_request_log(pool: PgPool, entry: RequestLogEntry) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
INSERT INTO gateway_request_log
(request_id, client, method, path, query_string, status_code, ipv4,
user_agent, headers, body, user_id, company_id, organization_id, host,
cached, time)
VALUES
($1, $2, $3, $4, $5, $6, $7,
$8, $9, $10, $11, $12, $13, $14,
$15, $16)
"#,
)
.bind(entry.request_id)
.bind(entry.client)
.bind(entry.method)
.bind(entry.path)
.bind(entry.query_string)
.bind(entry.status_code)
.bind(entry.ipv4)
.bind(entry.user_agent)
.bind(entry.headers)
.bind(entry.body)
.bind(entry.user_id)
.bind(entry.company_id)
.bind(entry.organization_id)
.bind(entry.host)
.bind(entry.cached)
.bind(entry.time)
.execute(&pool)
.await?;
Ok(())
}
struct OperationLogEntry {
request_id: String,
operation: String,
table_name: Option<String>,
client: String,
method: String,
path: String,
status_code: i32,
duration_ms: i64,
details: Value,
time: i64,
error: bool,
message: Option<String>,
cache_key: Option<String>,
}
async fn insert_operation_log(pool: PgPool, entry: OperationLogEntry) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
INSERT INTO gateway_operation_log
(request_id, operation, table_name, client, method, path, status_code,
duration_ms, details, time, error, message, cache_key)
VALUES
($1, $2, $3, $4, $5, $6, $7,
$8, $9, $10, $11, $12, $13)
"#,
)
.bind(entry.request_id)
.bind(entry.operation)
.bind(entry.table_name)
.bind(entry.client)
.bind(entry.method)
.bind(entry.path)
.bind(entry.status_code)
.bind(entry.duration_ms)
.bind(entry.details)
.bind(entry.time)
.bind(entry.error)
.bind(entry.message)
.bind(entry.cache_key)
.execute(&pool)
.await?;
Ok(())
}