use anyhow::{Context, Result};
use async_trait::async_trait;
use serde_json::Value;
use std::{collections::HashMap, path::Path};
use tokio::time::Duration;
pub use athena_cdc::postgres::{
CdcConfig, CdcState, CdcTableConfig, SequinAction, SequinEvent, build_delete_sql,
build_insert_sql, build_update_sql, infer_pk_columns, load_table_configs,
sanitize_table_reference, value_to_sql_literal,
};
use crate::client::AthenaClient;
struct AthenaClientExecutor<'a> {
client: &'a AthenaClient,
}
#[async_trait]
impl athena_cdc::postgres::CdcExecutor for AthenaClientExecutor<'_> {
async fn execute_sql(&self, sql: &str) -> Result<Vec<Value>> {
let result = self
.client
.execute_sql(sql)
.await
.map_err(|err| anyhow::anyhow!(err.to_string()))?;
Ok(result.rows)
}
}
pub struct AuditLogger {
client: AthenaClient,
source: String,
user: String,
}
impl AuditLogger {
pub fn new(client: AthenaClient, source: impl Into<String>, user: impl Into<String>) -> Self {
Self {
client,
source: source.into(),
user: user.into(),
}
}
}
#[async_trait]
impl athena_cdc::postgres::CdcAuditSink for AuditLogger {
async fn log_event(&self, event: &SequinEvent, dry_run: bool) -> Result<()> {
let payload =
athena_cdc::postgres::build_audit_log_payload(event, dry_run, &self.source, &self.user);
self.client
.insert("audit_logs")
.payload(payload)
.execute()
.await
.context("writing audit log entry")?;
Ok(())
}
}
pub async fn backfill_from_csv(
client: &AthenaClient,
csv_path: &Path,
table_configs: &HashMap<String, CdcTableConfig>,
state_path: &Path,
dry_run: bool,
audit_logger: Option<&AuditLogger>,
) -> Result<CdcState> {
let executor = AthenaClientExecutor { client };
let audit_sink = audit_logger
.map(|logger| logger as &(dyn athena_cdc::postgres::CdcAuditSink + Send + Sync));
athena_cdc::postgres::backfill_from_csv(
&executor,
csv_path,
table_configs,
state_path,
dry_run,
audit_sink,
)
.await
}
#[expect(
clippy::too_many_arguments,
reason = "wrapper preserves the existing athena_rs CLI surface"
)]
pub async fn stream_events(
client: &AthenaClient,
table_configs: &HashMap<String, CdcTableConfig>,
sequin_table: &str,
batch_size: usize,
poll_interval: Duration,
state_path: &Path,
dry_run: bool,
audit_logger: Option<&AuditLogger>,
) -> Result<()> {
let executor = AthenaClientExecutor { client };
let audit_sink = audit_logger
.map(|logger| logger as &(dyn athena_cdc::postgres::CdcAuditSink + Send + Sync));
athena_cdc::postgres::stream_events(
&executor,
table_configs,
sequin_table,
batch_size,
poll_interval,
state_path,
dry_run,
audit_sink,
)
.await
}