use actix_web::{HttpRequest, HttpResponse, http::StatusCode};
use serde_json::{Map, Value, json};
use sqlx::postgres::PgPool;
use sqlx::{Pool, Postgres};
use std::collections::HashMap;
use std::str::SplitN;
use std::sync::atomic::{AtomicI64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::error;
use uuid::Uuid;
use crate::AppState;
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::headers::{x_company_id, x_organization_id, x_user_id};
use crate::data::api_keys::resolve_internal_api_key_id;
use crate::data::clients::touch_athena_client_last_seen;
use crate::drivers::postgresql::schema_cache::get_public_table_column_types;
use crate::drivers::supabase::supabase;
static LAST_REALTIME_REGISTRY_PURGE_TS: AtomicI64 = AtomicI64::new(0);
pub struct LoggedRequest {
pub request_id: String,
pub client_name: String,
pub method: String,
pub path: String,
pub status_code: u16,
pub time: i64,
}
#[derive(Clone, Debug, Default)]
pub struct RequestAuthLogContext {
pub api_key_id: Option<String>,
pub internal_api_key_id: Option<i64>,
pub presented_api_key_public_id: Option<String>,
pub presented_api_key_hash: Option<String>,
pub presented_api_key_salt: Option<String>,
pub api_key_authenticated: bool,
pub api_key_authorized: bool,
pub api_key_enforced: bool,
pub api_key_auth_reason: Option<String>,
}
fn is_sensitive_field_name(name: &str) -> bool {
let normalized: String = name.to_ascii_lowercase().replace('-', "").replace('_', "");
[
"authorization",
"apikey",
"xapikey",
"xathenakey",
"cookie",
"setcookie",
"token",
"secret",
"password",
"passwd",
"jwt",
]
.iter()
.any(|needle| normalized.contains(needle))
}
fn redact_query_string(query: &str) -> String {
if query.is_empty() {
return "NOT_FOUND".to_string();
}
query
.split('&')
.map(|part| {
let mut split: SplitN<'_, char> = part.splitn(2, '=');
let key: &str = split.next().unwrap_or_default();
let value: Option<&str> = split.next();
if is_sensitive_field_name(key) {
if value.is_some() {
format!("{}=[REDACTED]", key)
} else {
format!("{}=", key)
}
} else {
part.to_string()
}
})
.collect::<Vec<String>>()
.join("&")
}
fn redact_json_value(value: Value) -> Value {
match value {
Value::Object(map) => {
let mut redacted: Map<String, Value> = Map::new();
for (key, nested_value) in map {
if is_sensitive_field_name(&key) {
redacted.insert(key, Value::String("[REDACTED]".to_string()));
} else {
redacted.insert(key, redact_json_value(nested_value));
}
}
Value::Object(redacted)
}
Value::Array(list) => Value::Array(list.into_iter().map(redact_json_value).collect()),
other => other,
}
}
fn redact_optional_secret(value: Option<String>) -> Option<String> {
value.map(|_| "[REDACTED]".to_string())
}
fn parse_u64_env(name: &str, default: u64) -> u64 {
std::env::var(name)
.ok()
.and_then(|value| value.parse::<u64>().ok())
.unwrap_or(default)
}
fn normalize_ip(x_real_ip: &str, req: &HttpRequest) -> String {
if !x_real_ip.is_empty() && x_real_ip != "NOT_FOUND" {
return x_real_ip.to_string();
}
if let Some(forwarded) = req
.headers()
.get("X-Forwarded-For")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.split(',').next())
.map(|v| v.trim())
.filter(|v| !v.is_empty())
{
return forwarded.to_string();
}
req.peer_addr()
.map(|addr| addr.ip().to_string())
.unwrap_or_else(|| "NOT_FOUND".to_string())
}
fn build_athena_url(req: &HttpRequest, host: &str) -> String {
if host.is_empty() || host == "NOT_FOUND" {
return "NOT_FOUND".to_string();
}
let scheme = if let Some(proto) = req
.headers()
.get("X-Forwarded-Proto")
.and_then(|v| v.to_str().ok())
.filter(|v| !v.is_empty())
{
proto.to_string()
} else {
req.connection_info().scheme().to_string()
};
format!("{}://{}", scheme, host)
}
pub fn log_request(
req: HttpRequest,
state: Option<&AppState>,
request_id: Option<String>,
auth_context: Option<&RequestAuthLogContext>,
) -> LoggedRequest {
let client_name: String = x_athena_client(&req);
let user_id: String = x_user_id::get_x_user_id(&req).unwrap_or_else(|| "NOT_FOUND".to_string());
let company_id: String =
x_company_id::get_x_company_id(&req).unwrap_or_else(|| "NOT_FOUND".to_string());
let organization_id: String =
x_organization_id::get_x_organization_id(&req).unwrap_or_else(|| "NOT_FOUND".to_string());
let request_id: String = request_id.unwrap_or_else(|| Uuid::new_v4().to_string());
let auth_context = auth_context.cloned().unwrap_or_default();
let body_json: Value = req
.match_info()
.get("body")
.and_then(|body| serde_json::from_str(body).ok())
.unwrap_or_else(|| json!({}));
let body_json = redact_json_value(body_json);
let query_string: String = redact_query_string(req.query_string());
let headers: HashMap<String, String> = req
.headers()
.iter()
.map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
.collect();
let redacted_headers: HashMap<String, String> = headers
.iter()
.map(|(key, value)| {
if is_sensitive_field_name(key) {
(key.clone(), "[REDACTED]".to_string())
} else {
(key.clone(), value.clone())
}
})
.collect();
let mut headers_map: serde_json::Map<String, Value> = serde_json::Map::new();
for (key, value) in redacted_headers.iter() {
headers_map.insert(key.clone(), Value::String(value.clone()));
}
let headers_json: Value = Value::Object(headers_map);
let method: String = req.method().as_str().to_string();
let path: String = req.path().to_string();
let status_http_code: u16 = req
.app_data::<HttpResponse>()
.map(|resp| resp.status().as_u16())
.unwrap_or(0);
let x_real_ip: String = headers
.get("X-Real-IP")
.unwrap_or(&"NOT_FOUND".to_string())
.to_string();
let normalized_ip = normalize_ip(&x_real_ip, &req);
let user_agent: String = req
.headers()
.get("User-Agent")
.and_then(|v| v.to_str().ok())
.unwrap_or("NOT_FOUND")
.to_string();
if user_agent
== "Better Stack Better Uptime Bot Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
{
return LoggedRequest {
request_id,
client_name,
method,
path,
status_code: status_http_code,
time: 0,
};
}
let time: i64 = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| std::time::Duration::new(0, 0))
.as_secs() as i64;
let host: String = req
.headers()
.get("Host")
.and_then(|v| v.to_str().ok())
.unwrap_or("NOT_FOUND")
.to_string();
let athena_url = build_athena_url(&req, &host);
let cache_control: String = req
.headers()
.get("Cache-Control")
.and_then(|v| v.to_str().ok())
.unwrap_or("NOT_FOUND")
.to_string();
let cached: bool = cache_control != "no-cache";
let request_data: Value = json!({
"object": "request",
"request_id": request_id,
"user_id": user_id,
"company_id": company_id,
"api_key_id": auth_context.api_key_id.clone(),
"presented_api_key_public_id": auth_context.presented_api_key_public_id.clone(),
"presented_api_key_hash": redact_optional_secret(auth_context.presented_api_key_hash.clone()),
"presented_api_key_salt": redact_optional_secret(auth_context.presented_api_key_salt.clone()),
"api_key_authenticated": auth_context.api_key_authenticated,
"api_key_authorized": auth_context.api_key_authorized,
"api_key_enforced": auth_context.api_key_enforced,
"api_key_auth_reason": auth_context.api_key_auth_reason.clone(),
"body": body_json.clone(),
"query_string": query_string,
"headers": headers_json.clone(),
"method": method,
"path": path,
"http_code": status_http_code,
"ipv4": normalized_ip,
"user_agent": user_agent,
"time": time,
"host": host,
"athena_url": athena_url,
"cached": cached
});
if let Some(state) = state
&& let Some(logging_client) = state.logging_client_name.as_ref()
&& let Some(pool) = state.pg_registry.get_pool(logging_client)
{
let entry: RequestLogEntry = RequestLogEntry {
request_id: request_id.clone(),
client: client_name.clone(),
method: method.clone(),
path: path.clone(),
query_string: query_string.clone(),
status_code: status_http_code as i32,
ipv4: normalized_ip.clone(),
user_agent: user_agent.clone(),
headers: headers_json.clone(),
body: body_json.clone(),
user_id: user_id.clone(),
company_id: company_id.clone(),
organization_id: organization_id.clone(),
api_key_id: auth_context.api_key_id.clone(),
internal_api_key_id: auth_context.internal_api_key_id,
presented_api_key_public_id: auth_context.presented_api_key_public_id.clone(),
presented_api_key_hash: redact_optional_secret(
auth_context.presented_api_key_hash.clone(),
),
presented_api_key_salt: redact_optional_secret(
auth_context.presented_api_key_salt.clone(),
),
api_key_authenticated: auth_context.api_key_authenticated,
api_key_authorized: auth_context.api_key_authorized,
api_key_enforced: auth_context.api_key_enforced,
api_key_auth_reason: auth_context.api_key_auth_reason.clone(),
host: host.clone(),
athena_url: athena_url.clone(),
cached,
time,
};
let pool_clone: Pool<Postgres> = pool.clone();
actix_web::rt::spawn(async move {
if let Err(err) = insert_request_log(pool_clone, entry).await {
error!(error = %err, "failed to write gateway request log");
}
});
}
let request_data_clone: Value = request_data.clone();
actix_web::rt::spawn(async move {
if let Ok(client) = supabase().await {
let _ = client
.insert("event_log_api", request_data_clone.clone())
.await;
}
});
LoggedRequest {
request_id,
client_name,
method,
path,
status_code: status_http_code,
time,
}
}
pub fn log_operation_event(
state: Option<&AppState>,
request: &LoggedRequest,
operation: &str,
table_name: Option<&str>,
duration_ms: u128,
status: StatusCode,
details: Option<Value>,
) {
if let Some(state) = state
&& let Some(logging_client) = state.logging_client_name.as_ref()
&& let Some(pool) = state.pg_registry.get_pool(logging_client)
{
let details_value: Value = details.unwrap_or_else(|| json!({}));
let is_error: bool = status.is_client_error() || status.is_server_error();
let message: Option<String> = details_value
.get("message")
.and_then(|v| v.as_str())
.or_else(|| details_value.get("error").and_then(|v| v.as_str()))
.or_else(|| details_value.get("error_code").and_then(|v| v.as_str()))
.map(|s| s.to_string());
let cache_key: Option<String> = details_value
.get("cache_key")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let entry: OperationLogEntry = OperationLogEntry {
request_id: request.request_id.clone(),
operation: operation.to_string(),
table_name: table_name.map(|value| value.to_string()),
client: request.client_name.clone(),
method: request.method.clone(),
path: request.path.clone(),
status_code: status.as_u16() as i32,
duration_ms: duration_ms as i64,
details: details_value,
time: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| std::time::Duration::new(0, 0))
.as_secs() as i64,
error: is_error,
message,
cache_key,
};
let pool_clone: sqlx::Pool<sqlx::Postgres> = pool.clone();
let operation_name: String = operation.to_string();
actix_web::rt::spawn(async move {
if let Err(err) = insert_operation_log(pool_clone, entry).await {
error!(error = %err, operation = operation_name, "failed to write gateway operation log");
}
});
}
}
struct RequestLogEntry {
request_id: String,
client: String,
method: String,
path: String,
query_string: String,
status_code: i32,
ipv4: String,
user_agent: String,
headers: Value,
body: Value,
user_id: String,
company_id: String,
organization_id: String,
api_key_id: Option<String>,
internal_api_key_id: Option<i64>,
presented_api_key_public_id: Option<String>,
presented_api_key_hash: Option<String>,
presented_api_key_salt: Option<String>,
api_key_authenticated: bool,
api_key_authorized: bool,
api_key_enforced: bool,
api_key_auth_reason: Option<String>,
host: String,
athena_url: String,
cached: bool,
time: i64,
}
struct RealtimeConnectionEntry {
request_id: String,
client_name: String,
ipv4: String,
athena_url: String,
api_key_public_id: Option<String>,
api_key_external_id: Option<String>,
api_key_internal_id: Option<i64>,
api_key_authenticated: bool,
api_key_authorized: bool,
method: String,
path: String,
user_agent: String,
status_code: i32,
}
async fn insert_request_log(pool: PgPool, entry: RequestLogEntry) -> Result<(), sqlx::Error> {
let client_name = entry.client.clone();
let status_code = entry.status_code;
let cached = entry.cached;
let realtime_entry = RealtimeConnectionEntry {
request_id: entry.request_id.clone(),
client_name: entry.client.clone(),
ipv4: entry.ipv4.clone(),
athena_url: entry.athena_url.clone(),
api_key_public_id: entry.presented_api_key_public_id.clone(),
api_key_external_id: entry.api_key_id.clone(),
api_key_internal_id: entry.internal_api_key_id,
api_key_authenticated: entry.api_key_authenticated,
api_key_authorized: entry.api_key_authorized,
method: entry.method.clone(),
path: entry.path.clone(),
user_agent: entry.user_agent.clone(),
status_code: entry.status_code,
};
let column_types = get_public_table_column_types(&pool, "gateway_request_log").await?;
let mut query_builder =
sqlx::QueryBuilder::<Postgres>::new("INSERT INTO gateway_request_log (");
let mut first = true;
let mut push_column = |query: &mut sqlx::QueryBuilder<'_, Postgres>, column: &str| {
if !first {
query.push(", ");
}
query.push(column);
first = false;
};
push_column(&mut query_builder, "request_id");
push_column(&mut query_builder, "client");
push_column(&mut query_builder, "method");
push_column(&mut query_builder, "path");
push_column(&mut query_builder, "query_string");
push_column(&mut query_builder, "status_code");
push_column(&mut query_builder, "ipv4");
push_column(&mut query_builder, "user_agent");
push_column(&mut query_builder, "headers");
push_column(&mut query_builder, "body");
push_column(&mut query_builder, "user_id");
push_column(&mut query_builder, "company_id");
push_column(&mut query_builder, "organization_id");
let api_key_id_type = column_types.get("api_key_id").cloned();
let has_api_key_id = api_key_id_type.is_some();
if has_api_key_id {
push_column(&mut query_builder, "api_key_id");
}
let has_presented_public_id = column_types.contains_key("presented_api_key_public_id");
if has_presented_public_id {
push_column(&mut query_builder, "presented_api_key_public_id");
}
let has_presented_hash = column_types.contains_key("presented_api_key_hash");
if has_presented_hash {
push_column(&mut query_builder, "presented_api_key_hash");
}
let has_presented_salt = column_types.contains_key("presented_api_key_salt");
if has_presented_salt {
push_column(&mut query_builder, "presented_api_key_salt");
}
let has_authenticated = column_types.contains_key("api_key_authenticated");
if has_authenticated {
push_column(&mut query_builder, "api_key_authenticated");
}
let has_authorized = column_types.contains_key("api_key_authorized");
if has_authorized {
push_column(&mut query_builder, "api_key_authorized");
}
let has_enforced = column_types.contains_key("api_key_enforced");
if has_enforced {
push_column(&mut query_builder, "api_key_enforced");
}
let has_auth_reason = column_types.contains_key("api_key_auth_reason");
if has_auth_reason {
push_column(&mut query_builder, "api_key_auth_reason");
}
push_column(&mut query_builder, "host");
push_column(&mut query_builder, "cached");
let has_time = column_types.contains_key("time");
if has_time {
push_column(&mut query_builder, "time");
}
query_builder.push(") VALUES (");
let mut separated = query_builder.separated(", ");
separated.push_bind(entry.request_id);
separated.push_bind(entry.client);
separated.push_bind(entry.method);
separated.push_bind(entry.path);
separated.push_bind(entry.query_string);
separated.push_bind(entry.status_code);
separated.push_bind(entry.ipv4);
separated.push_bind(entry.user_agent);
separated.push_bind(entry.headers);
separated.push_bind(entry.body);
separated.push_bind(entry.user_id);
separated.push_bind(entry.company_id);
separated.push_bind(entry.organization_id);
if let Some(data_type) = api_key_id_type.as_deref() {
if data_type == "bigint" {
let internal_id = match entry.internal_api_key_id {
Some(value) => Some(value),
None => match entry.api_key_id.as_deref() {
Some(external_id) => resolve_internal_api_key_id(&pool, external_id).await?,
None => None,
},
};
separated.push_bind(internal_id);
} else {
separated.push_bind(entry.api_key_id);
}
}
if has_presented_public_id {
separated.push_bind(entry.presented_api_key_public_id);
}
if has_presented_hash {
separated.push_bind(entry.presented_api_key_hash);
}
if has_presented_salt {
separated.push_bind(entry.presented_api_key_salt);
}
if has_authenticated {
separated.push_bind(entry.api_key_authenticated);
}
if has_authorized {
separated.push_bind(entry.api_key_authorized);
}
if has_enforced {
separated.push_bind(entry.api_key_enforced);
}
if has_auth_reason {
separated.push_bind(entry.api_key_auth_reason);
}
separated.push_bind(entry.host);
separated.push_bind(entry.cached);
if has_time {
match column_types.get("time").map(String::as_str) {
Some("timestamp with time zone") => {
separated.push_bind(chrono::DateTime::<chrono::Utc>::from_timestamp(
entry.time, 0,
));
}
_ => {
separated.push_bind(entry.time);
}
};
}
separated.push_unseparated(")");
query_builder.build().execute(&pool).await?;
upsert_realtime_connection_registry(&pool, realtime_entry).await?;
maybe_purge_inactive_realtime_registry(&pool).await?;
upsert_client_request_statistics(&pool, &client_name, status_code, cached).await?;
let _ = touch_athena_client_last_seen(&pool, &client_name).await;
Ok(())
}
async fn upsert_realtime_connection_registry(
pool: &PgPool,
entry: RealtimeConnectionEntry,
) -> Result<(), sqlx::Error> {
let key_api = entry
.api_key_public_id
.as_deref()
.or(entry.api_key_external_id.as_deref())
.unwrap_or("ANON");
let registry_key = format!(
"{}|{}|{}|{}",
entry.client_name, entry.ipv4, entry.athena_url, key_api
);
sqlx::query(
r#"
INSERT INTO gateway_connection_registry_realtime (
registry_key,
client_name,
ipv4,
athena_url,
api_key_public_id,
api_key_external_id,
api_key_internal_id,
api_key_authenticated,
api_key_authorized,
last_method,
last_path,
user_agent,
last_request_id,
last_status_code,
total_hits,
first_seen_at,
last_seen_at,
updated_at
)
VALUES (
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8,
$9,
$10,
$11,
$12,
$13,
$14,
1,
now(),
now(),
now()
)
ON CONFLICT (registry_key) DO UPDATE
SET api_key_public_id = EXCLUDED.api_key_public_id,
api_key_external_id = EXCLUDED.api_key_external_id,
api_key_internal_id = EXCLUDED.api_key_internal_id,
api_key_authenticated = EXCLUDED.api_key_authenticated,
api_key_authorized = EXCLUDED.api_key_authorized,
last_method = EXCLUDED.last_method,
last_path = EXCLUDED.last_path,
user_agent = EXCLUDED.user_agent,
last_request_id = EXCLUDED.last_request_id,
last_status_code = EXCLUDED.last_status_code,
total_hits = gateway_connection_registry_realtime.total_hits + 1,
last_seen_at = now(),
updated_at = now()
"#,
)
.bind(registry_key)
.bind(entry.client_name)
.bind(entry.ipv4)
.bind(entry.athena_url)
.bind(entry.api_key_public_id)
.bind(entry.api_key_external_id)
.bind(entry.api_key_internal_id)
.bind(entry.api_key_authenticated)
.bind(entry.api_key_authorized)
.bind(entry.method)
.bind(entry.path)
.bind(entry.user_agent)
.bind(entry.request_id)
.bind(entry.status_code)
.execute(pool)
.await?;
Ok(())
}
async fn maybe_purge_inactive_realtime_registry(pool: &PgPool) -> Result<(), sqlx::Error> {
let now_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| std::time::Duration::new(0, 0))
.as_secs() as i64;
let run_every_secs = parse_u64_env("ATHENA_REALTIME_REGISTRY_PURGE_INTERVAL_SECS", 30) as i64;
let inactive_after_secs = parse_u64_env("ATHENA_REALTIME_REGISTRY_INACTIVE_AFTER_SECS", 300);
let previous = LAST_REALTIME_REGISTRY_PURGE_TS.load(Ordering::Relaxed);
if now_ts - previous < run_every_secs {
return Ok(());
}
if LAST_REALTIME_REGISTRY_PURGE_TS
.compare_exchange(previous, now_ts, Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
return Ok(());
}
sqlx::query(
r#"
DELETE FROM gateway_connection_registry_realtime
WHERE last_seen_at < now() - ($1::bigint * interval '1 second')
"#,
)
.bind(inactive_after_secs as i64)
.execute(pool)
.await?;
Ok(())
}
struct OperationLogEntry {
request_id: String,
operation: String,
table_name: Option<String>,
client: String,
method: String,
path: String,
status_code: i32,
duration_ms: i64,
details: Value,
time: i64,
error: bool,
message: Option<String>,
cache_key: Option<String>,
}
async fn insert_operation_log(pool: PgPool, entry: OperationLogEntry) -> Result<(), sqlx::Error> {
let client_name = entry.client.clone();
let table_name = entry.table_name.clone();
let operation = entry.operation.clone();
let is_error = entry.error || entry.status_code >= 400;
sqlx::query(
r#"
INSERT INTO gateway_operation_log
(request_id, operation, table_name, client, method, path, status_code,
duration_ms, details, time, error, message, cache_key)
VALUES
($1, $2, $3, $4, $5, $6, $7,
$8, $9, $10, $11, $12, $13)
"#,
)
.bind(entry.request_id)
.bind(entry.operation)
.bind(entry.table_name)
.bind(entry.client)
.bind(entry.method)
.bind(entry.path)
.bind(entry.status_code)
.bind(entry.duration_ms)
.bind(entry.details)
.bind(entry.time)
.bind(entry.error)
.bind(entry.message)
.bind(entry.cache_key)
.execute(&pool)
.await?;
upsert_client_operation_statistics(&pool, &client_name).await?;
if let Some(table_name) = table_name.as_deref() {
upsert_client_table_statistics(&pool, &client_name, table_name, &operation, is_error)
.await?;
}
let _ = touch_athena_client_last_seen(&pool, &client_name).await;
Ok(())
}
async fn upsert_client_request_statistics(
pool: &PgPool,
client_name: &str,
status_code: i32,
cached: bool,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
INSERT INTO client_statistics (
client_name,
total_requests,
successful_requests,
failed_requests,
total_cached_requests,
total_operations,
last_request_at
)
VALUES (
$1,
1,
CASE WHEN $2 >= 200 AND $2 < 400 THEN 1 ELSE 0 END,
CASE WHEN $2 >= 400 THEN 1 ELSE 0 END,
CASE WHEN $3 THEN 1 ELSE 0 END,
0,
now()
)
ON CONFLICT (client_name) DO UPDATE
SET total_requests = client_statistics.total_requests + 1,
successful_requests = client_statistics.successful_requests
+ CASE WHEN EXCLUDED.successful_requests > 0 THEN 1 ELSE 0 END,
failed_requests = client_statistics.failed_requests
+ CASE WHEN EXCLUDED.failed_requests > 0 THEN 1 ELSE 0 END,
total_cached_requests = client_statistics.total_cached_requests
+ CASE WHEN EXCLUDED.total_cached_requests > 0 THEN 1 ELSE 0 END,
last_request_at = now(),
updated_at = now()
"#,
)
.bind(client_name)
.bind(status_code)
.bind(cached)
.execute(pool)
.await?;
Ok(())
}
async fn upsert_client_operation_statistics(
pool: &PgPool,
client_name: &str,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
INSERT INTO client_statistics (
client_name,
total_requests,
successful_requests,
failed_requests,
total_cached_requests,
total_operations,
last_operation_at
)
VALUES ($1, 0, 0, 0, 0, 1, now())
ON CONFLICT (client_name) DO UPDATE
SET total_operations = client_statistics.total_operations + 1,
last_operation_at = now(),
updated_at = now()
"#,
)
.bind(client_name)
.execute(pool)
.await?;
Ok(())
}
async fn upsert_client_table_statistics(
pool: &PgPool,
client_name: &str,
table_name: &str,
operation: &str,
is_error: bool,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
INSERT INTO client_table_statistics (
client_name,
table_name,
operation,
total_operations,
error_operations,
last_operation_at
)
VALUES ($1, $2, $3, 1, CASE WHEN $4 THEN 1 ELSE 0 END, now())
ON CONFLICT (client_name, table_name, operation) DO UPDATE
SET total_operations = client_table_statistics.total_operations + 1,
error_operations = client_table_statistics.error_operations
+ CASE WHEN EXCLUDED.error_operations > 0 THEN 1 ELSE 0 END,
last_operation_at = now(),
updated_at = now()
"#,
)
.bind(client_name)
.bind(table_name)
.bind(operation)
.bind(is_error)
.execute(pool)
.await?;
Ok(())
}