athena_rs 3.11.0

Hyper performant polyglot Database driver
Documentation
use chrono::{DateTime, Utc};
use serde::Serialize;
use serde_json::Value;
use sqlx::Row;
use sqlx::postgres::{PgPool, PgRow};
use uuid::Uuid;

#[derive(Debug, Clone, Serialize)]
pub struct PipelineTemplateRecord {
    pub id: String,
    pub template_name: String,
    pub definition: Value,
    pub metadata: Value,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
}

fn map_pipeline_template_row(row: &PgRow) -> Result<PipelineTemplateRecord, sqlx::Error> {
    Ok(PipelineTemplateRecord {
        id: row.try_get::<Uuid, _>("id")?.to_string(),
        template_name: row.try_get("template_name")?,
        definition: row.try_get("definition")?,
        metadata: row.try_get("metadata")?,
        created_at: row.try_get("created_at")?,
        updated_at: row.try_get("updated_at")?,
    })
}

pub async fn ensure_pipeline_template_table(pool: &PgPool) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        CREATE TABLE IF NOT EXISTS athena_pipeline_templates (
            id bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY,
            pipeline_template_id uuid NOT NULL DEFAULT gen_random_uuid() UNIQUE,
            template_name text NOT NULL,
            definition jsonb NOT NULL,
            metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
            created_at timestamptz NOT NULL DEFAULT now(),
            updated_at timestamptz NOT NULL DEFAULT now(),
            PRIMARY KEY (id)
        )
        "#,
    )
    .execute(pool)
    .await?;

    sqlx::query(
        r#"
        CREATE UNIQUE INDEX IF NOT EXISTS idx_athena_pipeline_templates_name
            ON athena_pipeline_templates (lower(template_name))
        "#,
    )
    .execute(pool)
    .await?;

    Ok(())
}

pub async fn list_pipeline_templates(
    pool: &PgPool,
) -> Result<Vec<PipelineTemplateRecord>, sqlx::Error> {
    let rows: Vec<PgRow> = sqlx::query(
        r#"
        SELECT
            pipeline_template_id AS id,
            template_name,
            definition,
            metadata,
            created_at,
            updated_at
        FROM athena_pipeline_templates
        ORDER BY lower(template_name)
        "#,
    )
    .fetch_all(pool)
    .await?;

    rows.iter().map(map_pipeline_template_row).collect()
}

pub async fn get_pipeline_template_by_name(
    pool: &PgPool,
    template_name: &str,
) -> Result<Option<PipelineTemplateRecord>, sqlx::Error> {
    let row: Option<PgRow> = sqlx::query(
        r#"
        SELECT
            pipeline_template_id AS id,
            template_name,
            definition,
            metadata,
            created_at,
            updated_at
        FROM athena_pipeline_templates
        WHERE lower(template_name) = lower($1)
        LIMIT 1
        "#,
    )
    .bind(template_name)
    .fetch_optional(pool)
    .await?;

    row.as_ref().map(map_pipeline_template_row).transpose()
}

pub async fn upsert_pipeline_template(
    pool: &PgPool,
    template_name: &str,
    definition: Value,
    metadata: Value,
) -> Result<PipelineTemplateRecord, sqlx::Error> {
    let row: PgRow = sqlx::query(
        r#"
        INSERT INTO athena_pipeline_templates (
            pipeline_template_id,
            template_name,
            definition,
            metadata
        )
        VALUES ($1, $2, $3, $4)
        ON CONFLICT (lower(template_name))
        DO UPDATE SET
            definition = EXCLUDED.definition,
            metadata = EXCLUDED.metadata,
            updated_at = now()
        RETURNING
            pipeline_template_id AS id,
            template_name,
            definition,
            metadata,
            created_at,
            updated_at
        "#,
    )
    .bind(Uuid::new_v4())
    .bind(template_name)
    .bind(definition)
    .bind(metadata)
    .fetch_one(pool)
    .await?;

    map_pipeline_template_row(&row)
}

pub async fn delete_pipeline_template_by_name(
    pool: &PgPool,
    template_name: &str,
) -> Result<Option<PipelineTemplateRecord>, sqlx::Error> {
    let row: Option<PgRow> = sqlx::query(
        r#"
        DELETE FROM athena_pipeline_templates
        WHERE lower(template_name) = lower($1)
        RETURNING
            pipeline_template_id AS id,
            template_name,
            definition,
            metadata,
            created_at,
            updated_at
        "#,
    )
    .bind(template_name)
    .fetch_optional(pool)
    .await?;

    row.as_ref().map(map_pipeline_template_row).transpose()
}