athena_rs 3.4.7

Database driver
Documentation
//! Inserts rows into `pipeline_step_log` on the logging Postgres database.

use serde_json::Value;
use sqlx::PgPool;
use uuid::Uuid;

/// One row for a pipeline run step (`source`, `transform`, or `sink`).
#[derive(Debug, Clone)]
pub struct PipelineStepLogEntry {
    pub pipeline_run_id: Uuid,
    pub request_id: Option<String>,
    pub pipeline_name: Option<String>,
    pub step: String,
    pub athena_client: Option<String>,
    pub table_name: Option<String>,
    pub status_code: Option<i32>,
    pub duration_ms: Option<i64>,
    pub error_message: Option<String>,
    pub details: Option<Value>,
}

/// Persists a pipeline step log row. Callers typically spawn this so the request path stays fast.
pub async fn insert_pipeline_step_log(
    pool: &PgPool,
    entry: PipelineStepLogEntry,
) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        INSERT INTO pipeline_step_log (
            pipeline_run_id, request_id, pipeline_name, step, athena_client, table_name,
            status_code, duration_ms, error_message, details
        )
        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
        "#,
    )
    .bind(entry.pipeline_run_id)
    .bind(entry.request_id.as_deref())
    .bind(entry.pipeline_name.as_deref())
    .bind(&entry.step)
    .bind(entry.athena_client.as_deref())
    .bind(entry.table_name.as_deref())
    .bind(entry.status_code)
    .bind(entry.duration_ms)
    .bind(entry.error_message.as_deref())
    .bind(entry.details)
    .execute(pool)
    .await?;
    Ok(())
}