use chrono::{DateTime, Utc};
use serde::Serialize;
use serde_json::Value;
use sqlx::Row;
use sqlx::postgres::{PgPool, PgRow};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize)]
pub struct AthenaClientConfigRecord {
pub id: String,
pub client_name: String,
pub config: Value,
pub metadata: Value,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
fn map_client_config_row(row: &PgRow) -> Result<AthenaClientConfigRecord, sqlx::Error> {
Ok(AthenaClientConfigRecord {
id: row.try_get::<Uuid, _>("id")?.to_string(),
client_name: row.try_get("client_name")?,
config: row.try_get("config")?,
metadata: row.try_get("metadata")?,
created_at: row.try_get("created_at")?,
updated_at: row.try_get("updated_at")?,
})
}
pub async fn ensure_athena_client_config_table(pool: &PgPool) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS athena_client_configs (
id bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY,
created_at timestamptz NOT NULL DEFAULT now(),
athena_client_config_id uuid NOT NULL DEFAULT gen_random_uuid() UNIQUE,
client_name text NOT NULL,
config jsonb NOT NULL DEFAULT '{}'::jsonb,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (id)
)
"#,
)
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE UNIQUE INDEX IF NOT EXISTS idx_athena_client_configs_client_name
ON athena_client_configs (lower(client_name))
"#,
)
.execute(pool)
.await?;
Ok(())
}
pub async fn list_athena_client_configs(
pool: &PgPool,
) -> Result<Vec<AthenaClientConfigRecord>, sqlx::Error> {
let rows: Vec<PgRow> = sqlx::query(
r#"
SELECT
athena_client_config_id AS id,
client_name,
config,
metadata,
created_at,
updated_at
FROM athena_client_configs
ORDER BY lower(client_name)
"#,
)
.fetch_all(pool)
.await?;
rows.iter().map(map_client_config_row).collect()
}
pub async fn get_athena_client_config_by_name(
pool: &PgPool,
client_name: &str,
) -> Result<Option<AthenaClientConfigRecord>, sqlx::Error> {
let row: Option<PgRow> = sqlx::query(
r#"
SELECT
athena_client_config_id AS id,
client_name,
config,
metadata,
created_at,
updated_at
FROM athena_client_configs
WHERE lower(client_name) = lower($1)
LIMIT 1
"#,
)
.bind(client_name)
.fetch_optional(pool)
.await?;
row.as_ref().map(map_client_config_row).transpose()
}
pub async fn upsert_athena_client_config(
pool: &PgPool,
client_name: &str,
config: Value,
metadata: Value,
) -> Result<AthenaClientConfigRecord, sqlx::Error> {
let row: PgRow = sqlx::query(
r#"
INSERT INTO athena_client_configs (
athena_client_config_id,
client_name,
config,
metadata
)
VALUES ($1, $2, $3, $4)
ON CONFLICT (lower(client_name))
DO UPDATE SET
config = EXCLUDED.config,
metadata = EXCLUDED.metadata,
updated_at = now()
RETURNING
athena_client_config_id AS id,
client_name,
config,
metadata,
created_at,
updated_at
"#,
)
.bind(Uuid::new_v4())
.bind(client_name)
.bind(config)
.bind(metadata)
.fetch_one(pool)
.await?;
map_client_config_row(&row)
}
pub async fn delete_athena_client_config_by_name(
pool: &PgPool,
client_name: &str,
) -> Result<Option<AthenaClientConfigRecord>, sqlx::Error> {
let row: Option<PgRow> = sqlx::query(
r#"
DELETE FROM athena_client_configs
WHERE lower(client_name) = lower($1)
RETURNING
athena_client_config_id AS id,
client_name,
config,
metadata,
created_at,
updated_at
"#,
)
.bind(client_name)
.fetch_optional(pool)
.await?;
row.as_ref().map(map_client_config_row).transpose()
}