athena_rs 3.22.1

Hyper performant polyglot Database driver
Documentation
//! Athena runtime adapters for the extracted `athena-cdc` Sequin engine.

use anyhow::{Context, Result};
use async_trait::async_trait;
use serde_json::Value;
use std::{collections::HashMap, path::Path};
use tokio::time::Duration;

pub use athena_cdc::postgres::{
    CdcConfig, CdcState, CdcTableConfig, SequinAction, SequinEvent, build_delete_sql,
    build_insert_sql, build_update_sql, infer_pk_columns, load_table_configs,
    sanitize_table_reference, value_to_sql_literal,
};

use crate::client::AthenaClient;

struct AthenaClientExecutor<'a> {
    client: &'a AthenaClient,
}

#[async_trait]
impl athena_cdc::postgres::CdcExecutor for AthenaClientExecutor<'_> {
    async fn execute_sql(&self, sql: &str) -> Result<Vec<Value>> {
        let result = self
            .client
            .execute_sql(sql)
            .await
            .map_err(|err| anyhow::anyhow!(err.to_string()))?;
        Ok(result.rows)
    }
}

/// Sends CDC audit rows through the Athena client surface.
pub struct AuditLogger {
    client: AthenaClient,
    source: String,
    user: String,
}

impl AuditLogger {
    /// Creates a new Athena-backed CDC audit logger.
    pub fn new(client: AthenaClient, source: impl Into<String>, user: impl Into<String>) -> Self {
        Self {
            client,
            source: source.into(),
            user: user.into(),
        }
    }
}

#[async_trait]
impl athena_cdc::postgres::CdcAuditSink for AuditLogger {
    async fn log_event(&self, event: &SequinEvent, dry_run: bool) -> Result<()> {
        let payload =
            athena_cdc::postgres::build_audit_log_payload(event, dry_run, &self.source, &self.user);
        self.client
            .insert("audit_logs")
            .payload(payload)
            .execute()
            .await
            .context("writing audit log entry")?;
        Ok(())
    }
}

/// Backfills Sequin events from a CSV export using the Athena client runtime.
pub async fn backfill_from_csv(
    client: &AthenaClient,
    csv_path: &Path,
    table_configs: &HashMap<String, CdcTableConfig>,
    state_path: &Path,
    dry_run: bool,
    audit_logger: Option<&AuditLogger>,
) -> Result<CdcState> {
    let executor = AthenaClientExecutor { client };
    let audit_sink = audit_logger
        .map(|logger| logger as &(dyn athena_cdc::postgres::CdcAuditSink + Send + Sync));
    athena_cdc::postgres::backfill_from_csv(
        &executor,
        csv_path,
        table_configs,
        state_path,
        dry_run,
        audit_sink,
    )
    .await
}

/// Streams live Sequin events using the Athena client runtime.
#[expect(
    clippy::too_many_arguments,
    reason = "wrapper preserves the existing athena_rs CLI surface"
)]
pub async fn stream_events(
    client: &AthenaClient,
    table_configs: &HashMap<String, CdcTableConfig>,
    sequin_table: &str,
    batch_size: usize,
    poll_interval: Duration,
    state_path: &Path,
    dry_run: bool,
    audit_logger: Option<&AuditLogger>,
) -> Result<()> {
    let executor = AthenaClientExecutor { client };
    let audit_sink = audit_logger
        .map(|logger| logger as &(dyn athena_cdc::postgres::CdcAuditSink + Send + Sync));
    athena_cdc::postgres::stream_events(
        &executor,
        table_configs,
        sequin_table,
        batch_size,
        poll_interval,
        state_path,
        dry_run,
        audit_sink,
    )
    .await
}