use chrono::{DateTime, Utc};
use serde::Serialize;
use serde_json::Value;
use sqlx::postgres::{PgPool, PgRow};
use sqlx::{Postgres, Row, Transaction};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize)]
pub struct AthenaClientRecord {
pub id: String,
pub client_name: String,
pub description: Option<String>,
pub pg_uri: Option<String>,
pub pg_uri_env_var: Option<String>,
pub config_uri_template: Option<String>,
pub source: String,
pub is_active: bool,
pub is_frozen: bool,
pub last_synced_from_config_at: Option<DateTime<Utc>>,
pub last_seen_at: Option<DateTime<Utc>>,
pub metadata: Value,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub deleted_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ClientStatisticsRecord {
pub client_name: String,
pub total_requests: i64,
pub successful_requests: i64,
pub failed_requests: i64,
pub total_cached_requests: i64,
pub total_operations: i64,
pub last_request_at: Option<DateTime<Utc>>,
pub last_operation_at: Option<DateTime<Utc>>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ClientTableStatisticsRecord {
pub client_name: String,
pub table_name: String,
pub operation: String,
pub total_operations: i64,
pub error_operations: i64,
pub last_operation_at: Option<DateTime<Utc>>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ClientOperationStatusFilter {
All,
Errors,
Normal,
}
impl ClientOperationStatusFilter {
fn as_str(self) -> &'static str {
match self {
Self::All => "all",
Self::Errors => "errors",
Self::Normal => "normal",
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct ClientOperationDrilldownRecord {
pub created_at: DateTime<Utc>,
pub request_id: String,
pub operation: Option<String>,
pub table_name: Option<String>,
pub client_name: Option<String>,
pub method: Option<String>,
pub path: Option<String>,
pub status_code: Option<i32>,
pub duration_ms: Option<i64>,
pub operation_time_ms: Option<f64>,
pub details: Option<Value>,
pub error: Option<bool>,
pub message: Option<String>,
pub cache_key: Option<String>,
}
#[derive(Debug, Clone)]
pub struct SaveAthenaClientParams {
pub client_name: String,
pub description: Option<String>,
pub pg_uri: Option<String>,
pub pg_uri_env_var: Option<String>,
pub config_uri_template: Option<String>,
pub source: String,
pub is_active: bool,
pub is_frozen: bool,
pub metadata: Value,
}
fn map_client_row(row: &PgRow) -> Result<AthenaClientRecord, sqlx::Error> {
Ok(AthenaClientRecord {
id: row.try_get::<Uuid, _>("id")?.to_string(),
client_name: row.try_get("client_name")?,
description: row.try_get("description")?,
pg_uri: row.try_get("pg_uri")?,
pg_uri_env_var: row.try_get("pg_uri_env_var")?,
config_uri_template: row.try_get("config_uri_template")?,
source: row.try_get("source")?,
is_active: row.try_get("is_active")?,
is_frozen: row.try_get("is_frozen")?,
last_synced_from_config_at: row.try_get("last_synced_from_config_at")?,
last_seen_at: row.try_get("last_seen_at")?,
metadata: row.try_get("metadata")?,
created_at: row.try_get("created_at")?,
updated_at: row.try_get("updated_at")?,
deleted_at: row.try_get("deleted_at")?,
})
}
fn map_client_statistics_row(row: &PgRow) -> Result<ClientStatisticsRecord, sqlx::Error> {
Ok(ClientStatisticsRecord {
client_name: row.try_get("client_name")?,
total_requests: row.try_get("total_requests")?,
successful_requests: row.try_get("successful_requests")?,
failed_requests: row.try_get("failed_requests")?,
total_cached_requests: row.try_get("total_cached_requests")?,
total_operations: row.try_get("total_operations")?,
last_request_at: row.try_get("last_request_at")?,
last_operation_at: row.try_get("last_operation_at")?,
updated_at: row.try_get("updated_at")?,
})
}
fn map_client_table_statistics_row(
row: &PgRow,
) -> Result<ClientTableStatisticsRecord, sqlx::Error> {
Ok(ClientTableStatisticsRecord {
client_name: row.try_get("client_name")?,
table_name: row.try_get("table_name")?,
operation: row.try_get("operation")?,
total_operations: row.try_get("total_operations")?,
error_operations: row.try_get("error_operations")?,
last_operation_at: row.try_get("last_operation_at")?,
updated_at: row.try_get("updated_at")?,
})
}
fn map_client_operation_drilldown_row(
row: &PgRow,
) -> Result<ClientOperationDrilldownRecord, sqlx::Error> {
Ok(ClientOperationDrilldownRecord {
created_at: row.try_get("created_at")?,
request_id: row.try_get("request_id")?,
operation: row.try_get("operation")?,
table_name: row.try_get("table_name")?,
client_name: row.try_get("client_name")?,
method: row.try_get("method")?,
path: row.try_get("path")?,
status_code: row.try_get("status_code")?,
duration_ms: row.try_get("duration_ms")?,
operation_time_ms: row.try_get("operation_time_ms")?,
details: row.try_get("details")?,
error: row.try_get("error")?,
message: row.try_get("message")?,
cache_key: row.try_get("cache_key")?,
})
}
pub async fn list_athena_clients(pool: &PgPool) -> Result<Vec<AthenaClientRecord>, sqlx::Error> {
let rows = sqlx::query(
r#"
SELECT
athena_client_id AS id,
client_name,
description,
pg_uri,
pg_uri_env_var,
config_uri_template,
source,
is_active,
is_frozen,
last_synced_from_config_at,
last_seen_at,
metadata,
created_at,
updated_at,
deleted_at
FROM athena_clients
WHERE deleted_at IS NULL
ORDER BY lower(client_name)
"#,
)
.fetch_all(pool)
.await?;
rows.iter().map(map_client_row).collect()
}
pub async fn list_active_athena_clients(
pool: &PgPool,
) -> Result<Vec<AthenaClientRecord>, sqlx::Error> {
let rows = sqlx::query(
r#"
SELECT
athena_client_id AS id,
client_name,
description,
pg_uri,
pg_uri_env_var,
config_uri_template,
source,
is_active,
is_frozen,
last_synced_from_config_at,
last_seen_at,
metadata,
created_at,
updated_at,
deleted_at
FROM athena_clients
WHERE deleted_at IS NULL
AND is_active = true
AND is_frozen = false
ORDER BY lower(client_name)
"#,
)
.fetch_all(pool)
.await?;
rows.iter().map(map_client_row).collect()
}
pub async fn get_athena_client_by_name(
pool: &PgPool,
client_name: &str,
) -> Result<Option<AthenaClientRecord>, sqlx::Error> {
let row: Option<PgRow> = sqlx::query(
r#"
SELECT
athena_client_id AS id,
client_name,
description,
pg_uri,
pg_uri_env_var,
config_uri_template,
source,
is_active,
is_frozen,
last_synced_from_config_at,
last_seen_at,
metadata,
created_at,
updated_at,
deleted_at
FROM athena_clients
WHERE lower(client_name) = lower($1)
AND deleted_at IS NULL
LIMIT 1
"#,
)
.bind(client_name)
.fetch_optional(pool)
.await?;
row.as_ref().map(map_client_row).transpose()
}
pub async fn upsert_athena_client(
pool: &PgPool,
params: SaveAthenaClientParams,
) -> Result<AthenaClientRecord, sqlx::Error> {
let row: PgRow = sqlx::query(
r#"
INSERT INTO athena_clients (
athena_client_id,
client_name,
description,
pg_uri,
pg_uri_env_var,
config_uri_template,
source,
is_active,
is_frozen,
last_synced_from_config_at,
metadata
)
VALUES (
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8,
$9,
CASE WHEN $7 = 'config' THEN now() ELSE NULL END,
$10
)
ON CONFLICT (lower(client_name)) WHERE deleted_at IS NULL
DO UPDATE SET
description = EXCLUDED.description,
pg_uri = EXCLUDED.pg_uri,
pg_uri_env_var = EXCLUDED.pg_uri_env_var,
config_uri_template = EXCLUDED.config_uri_template,
source = CASE
WHEN athena_clients.source <> EXCLUDED.source THEN 'merged'
ELSE EXCLUDED.source
END,
is_active = EXCLUDED.is_active,
is_frozen = CASE
WHEN EXCLUDED.source = 'config' AND athena_clients.is_frozen = true THEN true
ELSE EXCLUDED.is_frozen
END,
metadata = EXCLUDED.metadata,
last_synced_from_config_at = CASE
WHEN EXCLUDED.source = 'config' THEN now()
ELSE athena_clients.last_synced_from_config_at
END,
deleted_at = NULL,
updated_at = now()
RETURNING
athena_client_id AS id,
client_name,
description,
pg_uri,
pg_uri_env_var,
config_uri_template,
source,
is_active,
is_frozen,
last_synced_from_config_at,
last_seen_at,
metadata,
created_at,
updated_at,
deleted_at
"#,
)
.bind(Uuid::new_v4())
.bind(¶ms.client_name)
.bind(¶ms.description)
.bind(¶ms.pg_uri)
.bind(¶ms.pg_uri_env_var)
.bind(¶ms.config_uri_template)
.bind(¶ms.source)
.bind(params.is_active)
.bind(params.is_frozen)
.bind(¶ms.metadata)
.fetch_one(pool)
.await?;
map_client_row(&row)
}
pub async fn set_client_frozen_state(
pool: &PgPool,
client_name: &str,
is_frozen: bool,
) -> Result<Option<AthenaClientRecord>, sqlx::Error> {
let row = sqlx::query(
r#"
UPDATE athena_clients
SET is_frozen = $2,
updated_at = now()
WHERE lower(client_name) = lower($1)
AND deleted_at IS NULL
RETURNING
athena_client_id AS id,
client_name,
description,
pg_uri,
pg_uri_env_var,
config_uri_template,
source,
is_active,
is_frozen,
last_synced_from_config_at,
last_seen_at,
metadata,
created_at,
updated_at,
deleted_at
"#,
)
.bind(client_name)
.bind(is_frozen)
.fetch_optional(pool)
.await?;
row.as_ref().map(map_client_row).transpose()
}
pub async fn delete_athena_client(
pool: &PgPool,
client_name: &str,
) -> Result<Option<AthenaClientRecord>, sqlx::Error> {
let row = sqlx::query(
r#"
UPDATE athena_clients
SET deleted_at = now(),
is_active = false,
updated_at = now()
WHERE lower(client_name) = lower($1)
AND deleted_at IS NULL
RETURNING
athena_client_id AS id,
client_name,
description,
pg_uri,
pg_uri_env_var,
config_uri_template,
source,
is_active,
is_frozen,
last_synced_from_config_at,
last_seen_at,
metadata,
created_at,
updated_at,
deleted_at
"#,
)
.bind(client_name)
.fetch_optional(pool)
.await?;
row.as_ref().map(map_client_row).transpose()
}
pub async fn touch_athena_client_last_seen(
pool: &PgPool,
client_name: &str,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE athena_clients
SET last_seen_at = now(),
updated_at = now()
WHERE lower(client_name) = lower($1)
AND deleted_at IS NULL
"#,
)
.bind(client_name)
.execute(pool)
.await?;
Ok(())
}
pub async fn refresh_client_statistics(pool: &PgPool) -> Result<(), sqlx::Error> {
let mut tx: Transaction<'_, Postgres> = pool.begin().await?;
sqlx::query("DELETE FROM client_table_statistics")
.execute(&mut *tx)
.await?;
sqlx::query("DELETE FROM client_statistics")
.execute(&mut *tx)
.await?;
sqlx::query(
r#"
INSERT INTO client_statistics (
client_name,
total_requests,
successful_requests,
failed_requests,
total_cached_requests,
total_operations,
last_request_at,
last_operation_at
)
SELECT
request_stats.client_name,
request_stats.total_requests,
request_stats.successful_requests,
request_stats.failed_requests,
request_stats.total_cached_requests,
COALESCE(operation_stats.total_operations, 0),
request_stats.last_request_at,
operation_stats.last_operation_at
FROM (
SELECT
client AS client_name,
COUNT(*)::bigint AS total_requests,
COUNT(*) FILTER (WHERE status_code >= 200 AND status_code < 400)::bigint AS successful_requests,
COUNT(*) FILTER (WHERE status_code >= 400)::bigint AS failed_requests,
COUNT(*) FILTER (WHERE cached IS TRUE)::bigint AS total_cached_requests,
MAX(created_at) AS last_request_at
FROM gateway_request_log
WHERE client IS NOT NULL AND client <> ''
GROUP BY client
) AS request_stats
LEFT JOIN (
SELECT
client AS client_name,
COUNT(*)::bigint AS total_operations,
MAX(created_at) AS last_operation_at
FROM gateway_operation_log
WHERE client IS NOT NULL AND client <> ''
GROUP BY client
) AS operation_stats
ON operation_stats.client_name = request_stats.client_name
"#,
)
.execute(&mut *tx)
.await?;
sqlx::query(
r#"
INSERT INTO client_table_statistics (
client_name,
table_name,
operation,
total_operations,
error_operations,
last_operation_at
)
SELECT
client_name,
table_name,
operation,
total_operations,
error_operations,
last_operation_at
FROM (
SELECT
client AS client_name,
table_name,
operation,
COUNT(*)::bigint AS total_operations,
COUNT(*) FILTER (WHERE error IS TRUE OR status_code >= 400)::bigint AS error_operations,
MAX(created_at) AS last_operation_at
FROM gateway_operation_log
WHERE client IS NOT NULL
AND client <> ''
AND table_name IS NOT NULL
AND table_name <> ''
AND operation IS NOT NULL
AND operation <> ''
GROUP BY client, table_name, operation
) stats
"#,
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
pub async fn list_client_statistics(
pool: &PgPool,
) -> Result<Vec<ClientStatisticsRecord>, sqlx::Error> {
let rows = sqlx::query(
r#"
SELECT
client_name,
total_requests,
successful_requests,
failed_requests,
total_cached_requests,
total_operations,
last_request_at,
last_operation_at,
updated_at
FROM client_statistics
ORDER BY total_requests DESC, lower(client_name)
"#,
)
.fetch_all(pool)
.await?;
rows.iter().map(map_client_statistics_row).collect()
}
pub async fn get_client_statistics(
pool: &PgPool,
client_name: &str,
) -> Result<Option<ClientStatisticsRecord>, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT
client_name,
total_requests,
successful_requests,
failed_requests,
total_cached_requests,
total_operations,
last_request_at,
last_operation_at,
updated_at
FROM client_statistics
WHERE lower(client_name) = lower($1)
"#,
)
.bind(client_name)
.fetch_optional(pool)
.await?;
row.as_ref().map(map_client_statistics_row).transpose()
}
pub async fn list_client_table_statistics(
pool: &PgPool,
client_name: &str,
) -> Result<Vec<ClientTableStatisticsRecord>, sqlx::Error> {
let rows: Vec<PgRow> = sqlx::query(
r#"
SELECT
client_name,
table_name,
operation,
total_operations,
error_operations,
last_operation_at,
updated_at
FROM client_table_statistics
WHERE lower(client_name) = lower($1)
ORDER BY total_operations DESC, lower(table_name), lower(operation)
"#,
)
.bind(client_name)
.fetch_all(pool)
.await?;
rows.iter().map(map_client_table_statistics_row).collect()
}
pub async fn list_client_operation_drilldown(
pool: &PgPool,
client_name: &str,
table_name: &str,
operation: &str,
status_filter: ClientOperationStatusFilter,
limit: i64,
offset: i64,
) -> Result<Vec<ClientOperationDrilldownRecord>, sqlx::Error> {
let rows: Vec<PgRow> = sqlx::query(
r#"
SELECT
created_at,
request_id,
operation,
table_name,
client AS client_name,
method,
path,
status_code,
duration_ms,
time::double precision AS operation_time_ms,
details,
error,
message,
cache_key
FROM gateway_operation_log
WHERE lower(client) = lower($1)
AND lower(table_name) = lower($2)
AND lower(operation) = lower($3)
AND (
$4 = 'all'
OR (
$4 = 'errors'
AND (error IS TRUE OR COALESCE(status_code, 0) >= 400)
)
OR (
$4 = 'normal'
AND NOT (error IS TRUE OR COALESCE(status_code, 0) >= 400)
)
)
ORDER BY created_at DESC
LIMIT $5
OFFSET $6
"#,
)
.bind(client_name)
.bind(table_name)
.bind(operation)
.bind(status_filter.as_str())
.bind(limit)
.bind(offset)
.fetch_all(pool)
.await?;
rows.iter()
.map(map_client_operation_drilldown_row)
.collect()
}