use chrono::{DateTime, Utc};
use serde::Serialize;
use serde_json::Value;
use sqlx::Row;
use sqlx::postgres::{PgPool, PgRow};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize)]
pub struct PipelineTemplateRecord {
pub id: String,
pub template_name: String,
pub definition: Value,
pub metadata: Value,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
fn map_pipeline_template_row(row: &PgRow) -> Result<PipelineTemplateRecord, sqlx::Error> {
Ok(PipelineTemplateRecord {
id: row.try_get::<Uuid, _>("id")?.to_string(),
template_name: row.try_get("template_name")?,
definition: row.try_get("definition")?,
metadata: row.try_get("metadata")?,
created_at: row.try_get("created_at")?,
updated_at: row.try_get("updated_at")?,
})
}
pub async fn ensure_pipeline_template_table(pool: &PgPool) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS athena_pipeline_templates (
id bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY,
pipeline_template_id uuid NOT NULL DEFAULT gen_random_uuid() UNIQUE,
template_name text NOT NULL,
definition jsonb NOT NULL,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (id)
)
"#,
)
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE UNIQUE INDEX IF NOT EXISTS idx_athena_pipeline_templates_name
ON athena_pipeline_templates (lower(template_name))
"#,
)
.execute(pool)
.await?;
Ok(())
}
pub async fn list_pipeline_templates(
pool: &PgPool,
) -> Result<Vec<PipelineTemplateRecord>, sqlx::Error> {
let rows: Vec<PgRow> = sqlx::query(
r#"
SELECT
pipeline_template_id AS id,
template_name,
definition,
metadata,
created_at,
updated_at
FROM athena_pipeline_templates
ORDER BY lower(template_name)
"#,
)
.fetch_all(pool)
.await?;
rows.iter().map(map_pipeline_template_row).collect()
}
pub async fn get_pipeline_template_by_name(
pool: &PgPool,
template_name: &str,
) -> Result<Option<PipelineTemplateRecord>, sqlx::Error> {
let row: Option<PgRow> = sqlx::query(
r#"
SELECT
pipeline_template_id AS id,
template_name,
definition,
metadata,
created_at,
updated_at
FROM athena_pipeline_templates
WHERE lower(template_name) = lower($1)
LIMIT 1
"#,
)
.bind(template_name)
.fetch_optional(pool)
.await?;
row.as_ref().map(map_pipeline_template_row).transpose()
}
pub async fn upsert_pipeline_template(
pool: &PgPool,
template_name: &str,
definition: Value,
metadata: Value,
) -> Result<PipelineTemplateRecord, sqlx::Error> {
let row: PgRow = sqlx::query(
r#"
INSERT INTO athena_pipeline_templates (
pipeline_template_id,
template_name,
definition,
metadata
)
VALUES ($1, $2, $3, $4)
ON CONFLICT (lower(template_name))
DO UPDATE SET
definition = EXCLUDED.definition,
metadata = EXCLUDED.metadata,
updated_at = now()
RETURNING
pipeline_template_id AS id,
template_name,
definition,
metadata,
created_at,
updated_at
"#,
)
.bind(Uuid::new_v4())
.bind(template_name)
.bind(definition)
.bind(metadata)
.fetch_one(pool)
.await?;
map_pipeline_template_row(&row)
}
pub async fn delete_pipeline_template_by_name(
pool: &PgPool,
template_name: &str,
) -> Result<Option<PipelineTemplateRecord>, sqlx::Error> {
let row: Option<PgRow> = sqlx::query(
r#"
DELETE FROM athena_pipeline_templates
WHERE lower(template_name) = lower($1)
RETURNING
pipeline_template_id AS id,
template_name,
definition,
metadata,
created_at,
updated_at
"#,
)
.bind(template_name)
.fetch_optional(pool)
.await?;
row.as_ref().map(map_pipeline_template_row).transpose()
}