use std::collections::{HashSet, VecDeque};
use async_trait::async_trait;
use bytes::Bytes;
use crabka_connect::{ConnectError, ConnectRecord, OffsetValue, Source, SourceOffset};
use tokio_postgres::{Client, NoTls};
use crate::model::Operation;
use crate::pgoutput::{
DecodedMessage, RelationCache, RelationEvent, RowEvent, decode_pgoutput_message,
};
use crate::schema::PostgresProtoEncoder;
use crate::{PgLsn, PostgresSourceConfig};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LogicalEvent {
Begin {
final_lsn: PgLsn,
xid: i64,
},
Commit {
commit_lsn: PgLsn,
end_lsn: PgLsn,
commit_timestamp_ms: i64,
},
Relation(RelationEvent),
Row(RowEvent),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct TransactionState {
xid: i64,
}
#[derive(Debug)]
pub struct PostgresWalSource {
config: PostgresSourceConfig,
database_name: String,
client: Option<Client>,
relation_cache: RelationCache,
encoder: PostgresProtoEncoder,
pending: VecDeque<LogicalEvent>,
transaction: Option<TransactionState>,
transaction_rows: Vec<RowEvent>,
checkpoint: Option<PgLsn>,
resume_lsn: Option<PgLsn>,
}
impl PostgresWalSource {
pub fn scripted(
config: PostgresSourceConfig,
database_name: impl Into<String>,
events: impl IntoIterator<Item = LogicalEvent>,
) -> Result<Self, ConnectError> {
Ok(Self {
config,
database_name: database_name.into(),
client: None,
relation_cache: RelationCache::default(),
encoder: PostgresProtoEncoder::new()?,
pending: events.into_iter().collect(),
transaction: None,
transaction_rows: Vec::new(),
checkpoint: None,
resume_lsn: None,
})
}
pub async fn connect(config: PostgresSourceConfig) -> Result<Self, ConnectError> {
let (client, connection) =
tokio_postgres::connect(config.database_url.expose_secret(), NoTls)
.await
.map_err(|error| ConnectError::Backend(error.to_string()))?;
tokio::spawn(async move {
if let Err(error) = connection.await {
tracing::warn!(%error, "postgres source connection task failed");
}
});
let database_name = current_database(&client).await?;
if !config.table_names.is_empty() {
client
.batch_execute(&create_publication_sql(
&config.publication_name,
&config.schema,
&config.table_names,
))
.await
.map_err(|error| ConnectError::Backend(error.to_string()))?;
validate_publication_tables(
&client,
&config.publication_name,
&config.schema,
&config.table_names,
)
.await?;
validate_publication_settings(&client, &config.publication_name).await?;
}
ensure_slot(&client, &config.slot_name, &database_name).await?;
Ok(Self {
config,
database_name,
client: Some(client),
relation_cache: RelationCache::default(),
encoder: PostgresProtoEncoder::new()?,
pending: VecDeque::new(),
transaction: None,
transaction_rows: Vec::new(),
checkpoint: None,
resume_lsn: None,
})
}
#[cfg(test)]
fn pending_len(&self) -> usize {
self.pending.len()
}
async fn fill_pending_from_slot(&mut self) -> Result<(), ConnectError> {
let Some(client) = &self.client else {
return Ok(());
};
let rows = client
.query(
&peek_binary_changes_sql(
&self.config.slot_name,
self.config.max_messages_per_poll,
&self.config.publication_name,
),
&[],
)
.await
.map_err(|error| ConnectError::Backend(error.to_string()))?;
for row in rows {
let lsn: String = row.get("lsn");
let data: Vec<u8> = row.get("data");
let lsn = lsn.parse::<PgLsn>()?;
self.apply_decoded_message(decode_pgoutput_message(&data, lsn, None)?);
}
Ok(())
}
fn apply_decoded_message(&mut self, message: DecodedMessage) {
match message {
DecodedMessage::Begin { final_lsn, xid } => {
self.apply_logical_event(LogicalEvent::Begin { final_lsn, xid });
}
DecodedMessage::Commit {
commit_lsn,
end_lsn,
commit_timestamp_ms,
} => self.apply_logical_event(LogicalEvent::Commit {
commit_lsn,
end_lsn,
commit_timestamp_ms,
}),
DecodedMessage::Relation(relation) => {
self.apply_logical_event(LogicalEvent::Relation(relation));
}
DecodedMessage::Row(row) => self.apply_logical_event(LogicalEvent::Row(row)),
DecodedMessage::Keepalive => {}
}
}
fn apply_logical_event(&mut self, event: LogicalEvent) {
match event {
LogicalEvent::Begin { final_lsn: _, xid } => {
self.transaction = Some(TransactionState { xid });
self.transaction_rows.clear();
}
LogicalEvent::Commit {
commit_lsn: _,
end_lsn,
commit_timestamp_ms,
} => {
self.commit_transaction(end_lsn, commit_timestamp_ms);
}
LogicalEvent::Relation(relation) => {
self.pending.push_back(LogicalEvent::Relation(relation));
}
LogicalEvent::Row(row) => {
self.enqueue_row(row);
}
}
}
fn enqueue_row(&mut self, row: RowEvent) {
if self.should_skip_row(&row) {
return;
}
if self.transaction.is_some() {
if !self.transaction_rows.contains(&row) {
self.transaction_rows.push(row);
}
} else {
self.pending.push_back(LogicalEvent::Row(row));
}
}
fn commit_transaction(&mut self, end_lsn: PgLsn, commit_timestamp_ms: i64) {
let Some(transaction) = self.transaction.take() else {
return;
};
if self.should_skip_lsn(end_lsn) {
self.transaction_rows.clear();
return;
}
for mut row in self.transaction_rows.drain(..) {
row.commit_lsn = Some(end_lsn);
row.txid = Some(transaction.xid);
row.commit_timestamp_ms = Some(commit_timestamp_ms);
self.pending.push_back(LogicalEvent::Row(row));
}
}
fn should_skip_row(&self, row: &RowEvent) -> bool {
let checkpoint_lsn = row.commit_lsn.unwrap_or(row.lsn);
if self.should_skip_resume_lsn(checkpoint_lsn) {
return true;
}
if row.commit_lsn.is_some() {
return false;
}
self.should_skip_checkpoint_lsn(row.lsn)
}
fn should_skip_lsn(&self, lsn: PgLsn) -> bool {
self.should_skip_resume_lsn(lsn) || self.should_skip_checkpoint_lsn(lsn)
}
fn should_skip_resume_lsn(&self, lsn: PgLsn) -> bool {
self.resume_lsn.is_some_and(|resume_lsn| lsn <= resume_lsn)
}
fn should_skip_checkpoint_lsn(&self, lsn: PgLsn) -> bool {
self.checkpoint
.is_some_and(|checkpoint_lsn| lsn <= checkpoint_lsn)
}
}
#[async_trait]
impl Source<Bytes, Bytes> for PostgresWalSource {
async fn poll(&mut self) -> Result<Option<ConnectRecord<Bytes, Bytes>>, ConnectError> {
if self.pending.is_empty() {
self.fill_pending_from_slot().await?;
}
while let Some(event) = self.pending.front().cloned() {
match event {
LogicalEvent::Begin { .. } | LogicalEvent::Commit { .. } => {
self.pending.pop_front();
self.apply_logical_event(event);
}
LogicalEvent::Relation(relation) => {
self.pending.pop_front();
self.relation_cache.apply_relation(relation);
}
LogicalEvent::Row(row) => {
if self.transaction.is_some() && row.commit_lsn.is_none() {
self.pending.pop_front();
self.apply_logical_event(LogicalEvent::Row(row));
continue;
}
if self.should_skip_row(&row) {
self.pending.pop_front();
continue;
}
let diff = self.relation_cache.translate(row)?;
let key = self.encoder.encode_key(&diff.key)?;
let value = if diff.op == Operation::Delete {
None
} else {
Some(self.encoder.encode_value(&diff)?)
};
self.checkpoint = Some(diff.lsn);
let mut record = ConnectRecord::new(Some(key), value)
.with_header("crabka.pg.table", Some(Bytes::from(diff.table.clone())))
.with_header("crabka.pg.lsn", Some(Bytes::from(diff.lsn.to_string())))
.with_header(
"crabka.pg.operation",
Some(Bytes::from_static(operation_header(diff.op).as_bytes())),
);
if let Some(commit_timestamp_ms) = diff.commit_timestamp_ms {
record = record.with_timestamp(commit_timestamp_ms);
}
self.pending.pop_front();
return Ok(Some(record));
}
}
}
Ok(None)
}
fn checkpoint(&self) -> Option<SourceOffset> {
self.checkpoint
.map(|lsn| lsn.to_source_offset(&self.database_name, &self.config.slot_name))
}
async fn seek(&mut self, offset: SourceOffset) -> Result<(), ConnectError> {
validate_database(&offset, &self.database_name)?;
let lsn = PgLsn::from_source_offset(&offset, &self.config.slot_name)?;
self.checkpoint = Some(lsn);
self.resume_lsn = Some(lsn);
Ok(())
}
async fn acknowledge(&mut self, offset: &SourceOffset) -> Result<(), ConnectError> {
validate_database(offset, &self.database_name)?;
let lsn = PgLsn::from_source_offset(offset, &self.config.slot_name)?;
if let Some(client) = &self.client {
let lsn_text = lsn.to_string();
client
.execute(advance_slot_sql(), &[&self.config.slot_name, &lsn_text])
.await
.map_err(|error| ConnectError::Backend(error.to_string()))?;
}
self.resume_lsn = Some(lsn);
Ok(())
}
}
fn quote_ident(value: &str) -> String {
format!("\"{}\"", value.replace('"', "\"\""))
}
fn sql_string(value: &str) -> String {
format!("'{}'", value.replace('\'', "''"))
}
fn create_publication_sql(publication: &str, schema: &str, tables: &[String]) -> String {
let table_list = tables
.iter()
.map(|table| format!("{}.{}", quote_ident(schema), quote_ident(table)))
.collect::<Vec<_>>()
.join(", ");
let create_sql = format!(
"CREATE PUBLICATION {} FOR TABLE {} WITH (publish = 'insert, update, delete')",
quote_ident(publication),
table_list
);
format!(
"DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_publication WHERE pubname = {}) THEN EXECUTE {}; END IF; END $$",
sql_string(publication),
sql_string(&create_sql)
)
}
fn peek_binary_changes_sql(slot: &str, max_messages: u32, publication: &str) -> String {
format!(
"SELECT lsn::text AS lsn, data FROM pg_logical_slot_peek_binary_changes({}, NULL, {}, 'proto_version', '1', 'publication_names', {})",
sql_string(slot),
max_messages,
sql_string(publication)
)
}
fn publication_tables_sql() -> &'static str {
"SELECT tablename FROM pg_publication_tables WHERE pubname = $1 AND schemaname = $2"
}
fn publication_settings_sql() -> &'static str {
"SELECT pubinsert, pubupdate, pubdelete, pubtruncate FROM pg_publication WHERE pubname = $1"
}
fn replication_slot_sql() -> &'static str {
"SELECT slot_name, plugin, slot_type, database FROM pg_replication_slots WHERE slot_name = $1"
}
fn advance_slot_sql() -> &'static str {
"SELECT pg_replication_slot_advance($1, $2::pg_lsn)"
}
async fn validate_publication_tables(
client: &Client,
publication: &str,
schema: &str,
tables: &[String],
) -> Result<(), ConnectError> {
let rows = client
.query(publication_tables_sql(), &[&publication, &schema])
.await
.map_err(|error| ConnectError::Backend(error.to_string()))?;
let published_tables = rows
.into_iter()
.map(|row| row.get::<_, String>("tablename"))
.collect::<Vec<_>>();
let missing_tables = missing_publication_tables(tables, published_tables);
if missing_tables.is_empty() {
Ok(())
} else {
Err(ConnectError::Backend(format!(
"publication {publication:?} does not cover configured tables: {}",
missing_tables.join(", ")
)))
}
}
fn missing_publication_tables(
tables: &[String],
published_tables: impl IntoIterator<Item = String>,
) -> Vec<String> {
let published_tables = published_tables.into_iter().collect::<HashSet<_>>();
tables
.iter()
.filter(|table| !published_tables.contains(*table))
.cloned()
.collect()
}
async fn validate_publication_settings(
client: &Client,
publication: &str,
) -> Result<(), ConnectError> {
let Some(row) = client
.query_opt(publication_settings_sql(), &[&publication])
.await
.map_err(|error| ConnectError::Backend(error.to_string()))?
else {
return Ok(());
};
let pubinsert: bool = row.get("pubinsert");
let pubupdate: bool = row.get("pubupdate");
let pubdelete: bool = row.get("pubdelete");
let pubtruncate: bool = row.get("pubtruncate");
if publication_settings_are_compatible([pubinsert, pubupdate, pubdelete, pubtruncate]) {
Ok(())
} else {
Err(ConnectError::Backend(format!(
"publication {publication:?} must publish insert, update, and delete, and must not publish truncate"
)))
}
}
fn publication_settings_are_compatible([insert, update, delete, truncate]: [bool; 4]) -> bool {
insert && update && delete && !truncate
}
async fn ensure_slot(
client: &Client,
slot_name: &str,
database_name: &str,
) -> Result<(), ConnectError> {
let slot = client
.query_opt(replication_slot_sql(), &[&slot_name])
.await
.map_err(|error| ConnectError::Backend(error.to_string()))?;
let Some(slot) = slot else {
client
.query(
"SELECT * FROM pg_create_logical_replication_slot($1, 'pgoutput')",
&[&slot_name],
)
.await
.map_err(|error| ConnectError::Backend(error.to_string()))?;
return Ok(());
};
let plugin: Option<String> = slot.get("plugin");
let slot_type: String = slot.get("slot_type");
let database: Option<String> = slot.get("database");
validate_slot_metadata(
slot_name,
plugin.as_deref(),
&slot_type,
database.as_deref(),
database_name,
)
}
fn validate_slot_metadata(
slot_name: &str,
plugin: Option<&str>,
slot_type: &str,
database: Option<&str>,
database_name: &str,
) -> Result<(), ConnectError> {
let mut mismatches = Vec::new();
if plugin != Some("pgoutput") {
mismatches.push(format!("plugin is {plugin:?}"));
}
if slot_type != "logical" {
mismatches.push(format!("slot_type is {slot_type:?}"));
}
if database != Some(database_name) {
mismatches.push(format!("database is {database:?}"));
}
if mismatches.is_empty() {
Ok(())
} else {
Err(ConnectError::Backend(format!(
"replication slot {slot_name:?} is not compatible: {}",
mismatches.join(", ")
)))
}
}
async fn current_database(client: &Client) -> Result<String, ConnectError> {
client
.query_one("SELECT current_database()", &[])
.await
.map(|row| row.get(0))
.map_err(|error| ConnectError::Backend(error.to_string()))
}
fn validate_database(offset: &SourceOffset, expected_database: &str) -> Result<(), ConnectError> {
match offset.partition.get("database") {
Some(OffsetValue::String(database)) if database == expected_database => Ok(()),
Some(OffsetValue::String(database)) => Err(ConnectError::Offset(format!(
"source offset database {database:?} does not match expected database {expected_database:?}"
))),
_ => Err(ConnectError::Offset(
"source offset missing string database".to_owned(),
)),
}
}
fn operation_header(operation: Operation) -> &'static str {
match operation {
Operation::Insert => "insert",
Operation::Update => "update",
Operation::Delete => "delete",
}
}
#[cfg(test)]
mod sql_tests {
use assert2::check;
use super::{
advance_slot_sql, create_publication_sql, missing_publication_tables,
peek_binary_changes_sql, publication_settings_are_compatible, publication_settings_sql,
publication_tables_sql, replication_slot_sql, validate_slot_metadata,
};
#[test]
fn publication_sql_quotes_identifiers() {
let sql = create_publication_sql(
"pub\"name",
"sch\"ema",
&["orders".to_owned(), "line\"items".to_owned()],
);
check!(sql.contains("CREATE PUBLICATION \"pub\"\"name\""));
check!(
sql.contains("FOR TABLE \"sch\"\"ema\".\"orders\", \"sch\"\"ema\".\"line\"\"items\"")
);
check!(sql.contains("WHERE pubname = 'pub\"name'"));
}
#[test]
fn peek_binary_changes_sql_escapes_string_literals_without_advancing_slot() {
let sql = peek_binary_changes_sql("slot'name", 25, "pub'name");
check!(
sql == "SELECT lsn::text AS lsn, data FROM pg_logical_slot_peek_binary_changes('slot''name', NULL, 25, 'proto_version', '1', 'publication_names', 'pub''name')"
);
}
#[test]
fn publication_sql_excludes_truncate_events() {
let sql = create_publication_sql("pub", "public", &["orders".to_owned()]);
check!(sql.contains("WITH (publish = ''insert, update, delete'')"));
}
#[test]
fn publication_validation_sql_reads_publication_tables() {
check!(
publication_tables_sql()
== "SELECT tablename FROM pg_publication_tables WHERE pubname = $1 AND schemaname = $2"
);
}
#[test]
fn publication_settings_sql_reads_publish_flags() {
check!(
publication_settings_sql()
== "SELECT pubinsert, pubupdate, pubdelete, pubtruncate FROM pg_publication WHERE pubname = $1"
);
}
#[test]
fn replication_slot_validation_sql_reads_slot_metadata() {
check!(
replication_slot_sql()
== "SELECT slot_name, plugin, slot_type, database FROM pg_replication_slots WHERE slot_name = $1"
);
}
#[test]
fn advance_slot_sql_casts_lsn_parameter() {
check!(advance_slot_sql() == "SELECT pg_replication_slot_advance($1, $2::pg_lsn)");
}
#[test]
fn publication_table_validation_reports_only_missing_configured_tables() {
let missing = missing_publication_tables(
&["orders".to_owned(), "accounts".to_owned()],
["orders".to_owned(), "ignored".to_owned()],
);
check!(missing == vec!["accounts".to_owned()]);
}
#[test]
fn publication_settings_require_insert_update_delete_without_truncate() {
check!(publication_settings_are_compatible([
true, true, true, false
]));
check!(!publication_settings_are_compatible([
false, true, true, false
]));
check!(!publication_settings_are_compatible([
true, false, true, false
]));
check!(!publication_settings_are_compatible([
true, true, false, false
]));
check!(!publication_settings_are_compatible([
true, true, true, true
]));
}
#[test]
fn slot_metadata_accepts_pgoutput_logical_slot_for_current_database() {
validate_slot_metadata("slot_a", Some("pgoutput"), "logical", Some("app"), "app")
.expect("matching slot should validate");
}
#[test]
fn slot_metadata_reports_all_incompatible_fields() {
let error =
validate_slot_metadata("slot_a", Some("test_decoding"), "physical", None, "app")
.expect_err("incompatible slot should fail");
match error {
crabka_connect::ConnectError::Backend(message) => {
check!(message.contains("replication slot \"slot_a\" is not compatible"));
check!(message.contains("plugin is Some(\"test_decoding\")"));
check!(message.contains("slot_type is \"physical\""));
check!(message.contains("database is None"));
}
error => panic!("expected backend error, got {error:?}"),
}
}
}
#[cfg(test)]
mod tests {
use assert2::check;
use crabka_connect::{SecretString, Source as _};
use crabka_schema_serde::wire::MAGIC;
use super::{LogicalEvent, PostgresWalSource, validate_database};
use crate::model::{ColumnSchema, ColumnValue, ScalarValue};
use crate::pgoutput::{RelationEvent, RowEvent, RowEventKind, RowTupleKind};
use crate::{PgLsn, PostgresSourceConfig};
fn header_value(
record: &crabka_connect::ConnectRecord<bytes::Bytes, bytes::Bytes>,
key: &str,
) -> bytes::Bytes {
record
.headers
.iter()
.find(|header| header.key == key)
.and_then(|header| header.value.clone())
.unwrap_or_else(|| panic!("missing header {key}"))
}
fn config(slot_name: &str) -> PostgresSourceConfig {
PostgresSourceConfig {
database_url: SecretString::new("postgres://localhost/app"),
slot_name: slot_name.to_owned(),
publication_name: "crabka_connect".to_owned(),
schema: "public".to_owned(),
table_names: vec!["orders".to_owned()],
max_messages_per_poll: 1000,
}
}
fn orders_relation() -> RelationEvent {
RelationEvent {
relation_id: 7,
schema: "public".to_owned(),
table: "orders".to_owned(),
columns: vec![
ColumnSchema {
name: "id".to_owned(),
type_name: "int8".to_owned(),
key: true,
},
ColumnSchema {
name: "status".to_owned(),
type_name: "text".to_owned(),
key: false,
},
],
}
}
fn id(value: i64) -> ColumnValue {
ColumnValue {
name: "id".to_owned(),
value: ScalarValue::Int(value),
}
}
fn status(value: &str) -> ColumnValue {
ColumnValue {
name: "status".to_owned(),
value: ScalarValue::Text(value.to_owned()),
}
}
fn insert_event(lsn: PgLsn) -> RowEvent {
RowEvent {
relation_id: 7,
lsn,
commit_lsn: None,
txid: Some(99),
commit_timestamp_ms: Some(1_700_000_000_000),
kind: RowEventKind::Insert,
values: vec![id(42), status("paid")],
}
}
fn delete_event(lsn: PgLsn) -> RowEvent {
RowEvent {
relation_id: 7,
lsn,
commit_lsn: None,
txid: None,
commit_timestamp_ms: None,
kind: RowEventKind::Delete {
tuple_kind: RowTupleKind::Full,
},
values: vec![id(42), status("cancelled")],
}
}
#[tokio::test]
async fn insert_poll_emits_framed_record_and_checkpoint() {
let mut source = PostgresWalSource::scripted(
config("slot_a"),
"app",
[
LogicalEvent::Relation(orders_relation()),
LogicalEvent::Row(insert_event(PgLsn(0x2a))),
],
)
.expect("source builds");
let record = source
.poll()
.await
.expect("poll succeeds")
.expect("row emits");
check!(record.key.is_some());
check!(record.value.is_some());
check!(record.key.as_ref().expect("key")[0] == MAGIC);
check!(record.value.as_ref().expect("value")[0] == MAGIC);
check!(record.timestamp == Some(1_700_000_000_000));
check!(header_value(&record, "crabka.pg.table").as_ref() == b"public.orders");
check!(header_value(&record, "crabka.pg.lsn").as_ref() == b"0/2A");
check!(header_value(&record, "crabka.pg.operation").as_ref() == b"insert");
check!(source.checkpoint() == Some(PgLsn(0x2a).to_source_offset("app", "slot_a")));
}
#[tokio::test]
async fn delete_poll_emits_tombstone() {
let mut source = PostgresWalSource::scripted(
config("slot_a"),
"app",
[
LogicalEvent::Relation(orders_relation()),
LogicalEvent::Row(delete_event(PgLsn(0x2b))),
],
)
.expect("source builds");
let record = source
.poll()
.await
.expect("poll succeeds")
.expect("row emits");
check!(record.key.is_some());
check!(record.value.is_none());
check!(record.timestamp.is_none());
check!(header_value(&record, "crabka.pg.table").as_ref() == b"public.orders");
check!(header_value(&record, "crabka.pg.lsn").as_ref() == b"0/2B");
check!(header_value(&record, "crabka.pg.operation").as_ref() == b"delete");
}
#[tokio::test]
async fn seek_restores_lsn_checkpoint() {
let mut source = PostgresWalSource::scripted(config("slot_a"), "app", []).unwrap();
let offset = PgLsn(0x2a).to_source_offset("app", "slot_a");
source.seek(offset).await.expect("seek succeeds");
check!(source.checkpoint() == Some(PgLsn(0x2a).to_source_offset("app", "slot_a")));
}
#[test]
fn skip_lsn_checks_resume_and_checkpoint_offsets_independently() {
let mut source = PostgresWalSource::scripted(config("slot_a"), "app", []).unwrap();
source.resume_lsn = Some(PgLsn(0x20));
check!(source.should_skip_lsn(PgLsn(0x20)));
check!(!source.should_skip_lsn(PgLsn(0x21)));
source.resume_lsn = None;
source.checkpoint = Some(PgLsn(0x30));
check!(source.should_skip_lsn(PgLsn(0x30)));
check!(!source.should_skip_lsn(PgLsn(0x31)));
source.resume_lsn = Some(PgLsn(0x20));
source.checkpoint = Some(PgLsn(0x30));
check!(source.should_skip_lsn(PgLsn(0x20)));
check!(source.should_skip_lsn(PgLsn(0x30)));
check!(!source.should_skip_lsn(PgLsn(0x31)));
}
#[tokio::test]
async fn seek_past_first_row_poll_emits_only_later_row_without_regressing_checkpoint() {
let mut source = PostgresWalSource::scripted(
config("slot_a"),
"app",
[
LogicalEvent::Relation(orders_relation()),
LogicalEvent::Row(insert_event(PgLsn(0x2a))),
LogicalEvent::Row(insert_event(PgLsn(0x2b))),
],
)
.expect("source builds");
source
.seek(PgLsn(0x2a).to_source_offset("app", "slot_a"))
.await
.expect("seek succeeds");
let record = source
.poll()
.await
.expect("poll succeeds")
.expect("later row emits");
check!(header_value(&record, "crabka.pg.lsn").as_ref() == b"0/2B");
check!(source.checkpoint() == Some(PgLsn(0x2b).to_source_offset("app", "slot_a")));
check!(source.poll().await.expect("poll succeeds").is_none());
}
#[tokio::test]
async fn checkpointed_duplicate_row_is_skipped_without_regressing_checkpoint() {
let mut source = PostgresWalSource::scripted(
config("slot_a"),
"app",
[
LogicalEvent::Relation(orders_relation()),
LogicalEvent::Row(insert_event(PgLsn(0x2a))),
LogicalEvent::Row(insert_event(PgLsn(0x2a))),
],
)
.expect("source builds");
let record = source
.poll()
.await
.expect("poll succeeds")
.expect("first row emits");
check!(header_value(&record, "crabka.pg.lsn").as_ref() == b"0/2A");
check!(source.poll().await.expect("poll succeeds").is_none());
check!(source.checkpoint() == Some(PgLsn(0x2a).to_source_offset("app", "slot_a")));
}
#[tokio::test]
async fn translation_failure_does_not_drop_pending_row() {
let mut source = PostgresWalSource::scripted(
config("slot_a"),
"app",
[LogicalEvent::Row(insert_event(PgLsn(0x2a)))],
)
.expect("source builds");
check!(source.poll().await.is_err());
check!(source.pending_len() == 1);
check!(source.checkpoint().is_none());
}
#[tokio::test]
async fn seek_rejects_offset_for_different_database() {
let mut source = PostgresWalSource::scripted(config("slot_a"), "app", []).unwrap();
let offset = PgLsn(0x2a).to_source_offset("other_app", "slot_a");
let error = source.seek(offset).await.expect_err("database mismatch");
check!(matches!(error, crabka_connect::ConnectError::Offset(_)));
check!(source.checkpoint().is_none());
}
#[test]
fn validate_database_rejects_missing_or_non_string_database_partition() {
let missing = crabka_connect::SourceOffset::default();
let mut non_string = crabka_connect::SourceOffset::default();
non_string
.partition
.insert("database".to_owned(), crabka_connect::OffsetValue::Long(7));
check!(matches!(
validate_database(&missing, "app"),
Err(crabka_connect::ConnectError::Offset(_))
));
check!(matches!(
validate_database(&non_string, "app"),
Err(crabka_connect::ConnectError::Offset(_))
));
}
#[test]
fn validate_database_reports_database_mismatch() {
let offset = PgLsn(0x2a).to_source_offset("other_app", "slot_a");
let error = validate_database(&offset, "app").expect_err("database mismatch should fail");
match error {
crabka_connect::ConnectError::Offset(message) => {
check!(message.contains("does not match expected database"));
check!(message.contains("other_app"));
check!(message.contains("app"));
}
error => panic!("expected offset error, got {error:?}"),
}
}
#[tokio::test]
async fn scripted_acknowledge_accepts_matching_offset_and_rejects_database_mismatch() {
let mut source = PostgresWalSource::scripted(config("slot_a"), "app", []).unwrap();
source
.acknowledge(&PgLsn(0x2a).to_source_offset("app", "slot_a"))
.await
.expect("matching offset acknowledged");
let error = source
.acknowledge(&PgLsn(0x2b).to_source_offset("other_app", "slot_a"))
.await
.expect_err("database mismatch rejected");
check!(matches!(error, crabka_connect::ConnectError::Offset(_)));
}
#[tokio::test]
async fn relation_only_poll_returns_none_without_checkpoint() {
let mut source = PostgresWalSource::scripted(
config("slot_a"),
"app",
[LogicalEvent::Relation(orders_relation())],
)
.expect("source builds");
check!(source.poll().await.expect("poll succeeds").is_none());
check!(source.checkpoint().is_none());
}
#[tokio::test]
async fn transaction_commit_metadata_is_applied_to_all_rows() {
let mut second = insert_event(PgLsn(0x2b));
second.values = vec![id(43), status("pending")];
let mut source = PostgresWalSource::scripted(
config("slot_a"),
"app",
[
LogicalEvent::Relation(orders_relation()),
LogicalEvent::Begin {
final_lsn: PgLsn(0x40),
xid: 123,
},
LogicalEvent::Row(insert_event(PgLsn(0x2a))),
LogicalEvent::Row(second),
LogicalEvent::Commit {
commit_lsn: PgLsn(0x41),
end_lsn: PgLsn(0x42),
commit_timestamp_ms: 1_700_000_000_123,
},
],
)
.expect("source builds");
let first = source
.poll()
.await
.expect("first poll succeeds")
.expect("first row emits");
let second = source
.poll()
.await
.expect("second poll succeeds")
.expect("second row emits");
check!(header_value(&first, "crabka.pg.lsn").as_ref() == b"0/42");
check!(header_value(&second, "crabka.pg.lsn").as_ref() == b"0/42");
check!(first.timestamp == Some(1_700_000_000_123));
check!(second.timestamp == Some(1_700_000_000_123));
check!(source.checkpoint() == Some(PgLsn(0x42).to_source_offset("app", "slot_a")));
check!(source.poll().await.expect("poll succeeds").is_none());
}
#[test]
fn decoded_transaction_messages_stage_rows_with_commit_metadata() {
let mut source =
PostgresWalSource::scripted(config("slot_a"), "app", []).expect("source builds");
let mut row = insert_event(PgLsn(0x2a));
row.txid = None;
row.commit_timestamp_ms = None;
source.apply_decoded_message(crate::pgoutput::DecodedMessage::Begin {
final_lsn: PgLsn(0x40),
xid: 123,
});
source.apply_decoded_message(crate::pgoutput::DecodedMessage::Row(row));
source.apply_decoded_message(crate::pgoutput::DecodedMessage::Commit {
commit_lsn: PgLsn(0x41),
end_lsn: PgLsn(0x42),
commit_timestamp_ms: 1_700_000_000_123,
});
let Some(LogicalEvent::Row(row)) = source.pending.pop_front() else {
panic!("committed row should be pending");
};
check!(row.lsn == PgLsn(0x2a));
check!(row.commit_lsn == Some(PgLsn(0x42)));
check!(row.txid == Some(123));
check!(row.commit_timestamp_ms == Some(1_700_000_000_123));
}
}