use std::collections::VecDeque;
use std::sync::Arc;
use futures::stream::{self, BoxStream};
use schema_core::{ColumnName, TableName};
use sources_core::cdc::{Ack, AckSink, Change, ChangeEvent};
use sources_core::{Result, RowKey, SnapshotTable, SourceError};
use sqlx::pool::PoolConnection;
use sqlx::postgres::{PgPoolOptions, PgRow};
use sqlx::{PgPool, Postgres};
use crate::document::value;
const CURSOR: &str = "flusso_backfill_cursor";
const FETCH_SQL: &str = "FETCH FORWARD 1024 FROM flusso_backfill_cursor";
pub(crate) async fn snapshot(
connection_url: &str,
tables: &[SnapshotTable],
) -> Result<BoxStream<'static, Result<Change>>> {
let pool = PgPoolOptions::new()
.max_connections(2)
.connect(connection_url)
.await
.map_err(|e| SourceError::Connection(e.to_string()))?;
let tables = resolve_tables(&pool, tables).await?;
Ok(build_stream(pool, tables))
}
struct BackfillTable {
qualified: String,
pk_quoted: String,
table: TableName,
pk: ColumnName,
}
async fn resolve_tables(pool: &PgPool, tables: &[SnapshotTable]) -> Result<Vec<BackfillTable>> {
let mut out = Vec::with_capacity(tables.len());
for table in tables {
let schema = table.db_schema.as_ref();
let name = table.table.as_ref();
let Some(pk) = primary_key(pool, schema, name).await? else {
continue; };
out.push(BackfillTable {
qualified: format!("{}.{}", quote_ident(schema), quote_ident(name)),
pk_quoted: quote_ident(pk.as_ref()),
table: table.table.clone(),
pk,
});
}
Ok(out)
}
async fn primary_key(pool: &PgPool, schema: &str, table: &str) -> Result<Option<ColumnName>> {
let qualified = format!("{}.{}", quote_ident(schema), quote_ident(table));
let names = crate::document::primary_key_column_names(pool, qualified).await?;
let mut columns = Vec::with_capacity(names.len());
for name in &names {
match ColumnName::try_new(name) {
Ok(column) => columns.push(column),
Err(e) => {
tracing::warn!(%table, column = %name, error = %e, "skipping backfill: invalid primary key column");
return Ok(None);
}
}
}
match columns.as_slice() {
[single] => Ok(Some(single.clone())),
[] => {
tracing::warn!(%table, "skipping backfill: table has no primary key");
Ok(None)
}
_ => {
tracing::warn!(%table, "skipping backfill: composite primary key is unsupported");
Ok(None)
}
}
}
fn build_stream(pool: PgPool, tables: Vec<BackfillTable>) -> BoxStream<'static, Result<Change>> {
let phase = if tables.is_empty() {
Phase::Done
} else {
Phase::Pending {
tables: tables.into(),
}
};
let state = Backfill { pool, phase };
Box::pin(stream::unfold(state, |mut state| async move {
state.step().await.map(|item| (item, state))
}))
}
struct Backfill {
pool: PgPool,
phase: Phase,
}
enum Phase {
Pending { tables: VecDeque<BackfillTable> },
Reading {
conn: PoolConnection<Postgres>,
current: BackfillTable,
remaining: VecDeque<BackfillTable>,
buf: VecDeque<RowKey>,
},
Done,
}
impl Backfill {
async fn step(&mut self) -> Option<Result<Change>> {
loop {
match std::mem::replace(&mut self.phase, Phase::Done) {
Phase::Pending { mut tables } => {
let mut conn = match self.pool.acquire().await {
Ok(conn) => conn,
Err(e) => return Some(Err(SourceError::Connection(e.to_string()))),
};
if let Err(e) = begin_snapshot(&mut conn).await {
return Some(Err(e));
}
let current = tables.pop_front()?;
if let Err(e) = declare_cursor(&mut conn, ¤t).await {
return Some(Err(e));
}
self.phase = Phase::Reading {
conn,
current,
remaining: tables,
buf: VecDeque::new(),
};
}
Phase::Reading {
mut conn,
current,
mut remaining,
mut buf,
} => {
if let Some(key) = buf.pop_front() {
let change = upsert_change(current.table.clone(), key);
self.phase = Phase::Reading {
conn,
current,
remaining,
buf,
};
return Some(Ok(change));
}
let rows = match fetch_batch(&mut conn).await {
Ok(rows) => rows,
Err(e) => return Some(Err(e)),
};
if rows.is_empty() {
if let Err(e) = close_cursor(&mut conn).await {
return Some(Err(e));
}
match remaining.pop_front() {
None => {
if let Err(e) = commit(&mut conn).await {
return Some(Err(e));
}
return None;
}
Some(next) => {
if let Err(e) = declare_cursor(&mut conn, &next).await {
return Some(Err(e));
}
self.phase = Phase::Reading {
conn,
current: next,
remaining,
buf,
};
}
}
} else {
for row in &rows {
let value = value::first_column_to_generic(row);
buf.push_back(RowKey(vec![(current.pk.clone(), value)]));
}
self.phase = Phase::Reading {
conn,
current,
remaining,
buf,
};
}
}
Phase::Done => return None,
}
}
}
}
fn upsert_change(table: TableName, key: RowKey) -> Change {
Change {
event: ChangeEvent::Upsert { table, key },
ack: Ack::new(0, Arc::new(NoopAck)),
}
}
#[derive(Debug)]
struct NoopAck;
impl AckSink for NoopAck {
fn confirm(&self, _seq: u64) {}
}
async fn begin_snapshot(conn: &mut PoolConnection<Postgres>) -> Result<()> {
sqlx::query("BEGIN ISOLATION LEVEL REPEATABLE READ READ ONLY")
.execute(&mut **conn)
.await
.map_err(query_err)?;
Ok(())
}
async fn declare_cursor(conn: &mut PoolConnection<Postgres>, table: &BackfillTable) -> Result<()> {
let sql = format!(
"DECLARE {CURSOR} NO SCROLL CURSOR FOR SELECT {} FROM {}",
table.pk_quoted, table.qualified,
);
sqlx::query(sqlx::AssertSqlSafe(sql))
.execute(&mut **conn)
.await
.map_err(query_err)?;
Ok(())
}
async fn fetch_batch(conn: &mut PoolConnection<Postgres>) -> Result<Vec<PgRow>> {
sqlx::query(FETCH_SQL)
.fetch_all(&mut **conn)
.await
.map_err(query_err)
}
async fn close_cursor(conn: &mut PoolConnection<Postgres>) -> Result<()> {
sqlx::query("CLOSE flusso_backfill_cursor")
.execute(&mut **conn)
.await
.map_err(query_err)?;
Ok(())
}
async fn commit(conn: &mut PoolConnection<Postgres>) -> Result<()> {
sqlx::query("COMMIT")
.execute(&mut **conn)
.await
.map_err(query_err)?;
Ok(())
}
fn quote_ident(ident: &str) -> String {
format!("\"{}\"", ident.replace('"', "\"\""))
}
fn query_err(error: sqlx::Error) -> SourceError {
SourceError::Query(error.to_string())
}