rustcdc 0.6.2

Embeddable Rust CDC library focused on correctness-first capture primitives
Documentation
#![cfg(feature = "postgres")]

use rustcdc::TransportConfig;
use rustcdc::{source::Source, PostgresConnection, PostgresSourceConfig};
use testcontainers::{
    core::{IntoContainerPort, WaitFor},
    runners::AsyncRunner,
    GenericImage, ImageExt,
};
use tokio_postgres::NoTls;

fn skip_postgres_version_matrix_case() -> bool {
    if std::env::var("CDC_RS_RUN_DOCKER_TESTS").as_deref() == Ok("1") {
        return false;
    }

    eprintln!("skipping postgres version matrix test (set CDC_RS_RUN_DOCKER_TESTS=1)");
    true
}

/// Returns `true` when the error message indicates that the container image
/// could not be pulled from the registry (rate-limit, access denial, network
/// partition, or retired tag).
fn is_image_pull_failure(err: &rustcdc::Error) -> bool {
    let msg = err.to_string().to_ascii_lowercase();
    msg.contains("failed to pull")
        || msg.contains("pull access denied")
        || msg.contains("does not exist or may require")
        || msg.contains("status code 404")
        || msg.contains("request is blocked")
        || msg.contains("toomanyrequests")
}

macro_rules! postgres_version_test {
    ($name:ident, $image_tag:literal, $slot_name:literal, $publication_name:literal, $table_name:literal, $label:literal) => {
        #[tokio::test]
        async fn $name() -> rustcdc::Result<()> {
            if skip_postgres_version_matrix_case() {
                return Ok(());
            }

            match run_postgres_version_connection_test(
                $image_tag,
                $slot_name,
                $publication_name,
                $table_name,
            )
            .await
            {
                Ok(()) => {}
                Err(ref e) if is_image_pull_failure(e) => {
                    eprintln!(
                        "skipping postgres version matrix test for {}: image pull failed — \
                         Docker Hub may be unavailable or the tag may be retired.",
                        $image_tag
                    );
                    return Ok(());
                }
                Err(e) => return Err(e),
            }

            println!($label);
            Ok(())
        }
    };
}

async fn run_postgres_version_connection_test(
    image_tag: &str,
    slot_name: &str,
    publication_name: &str,
    table_name: &str,
) -> rustcdc::Result<()> {
    let container = GenericImage::new("postgres", image_tag)
        .with_exposed_port(5432.tcp())
        .with_wait_for(WaitFor::message_on_stderr(
            "database system is ready to accept connections",
        ))
        .with_env_var("POSTGRES_USER", "postgres")
        .with_env_var("POSTGRES_PASSWORD", "postgres")
        .with_env_var("POSTGRES_DB", "cdc")
        .with_cmd(vec![
            "postgres",
            "-c",
            "wal_level=logical",
            "-c",
            "max_replication_slots=8",
            "-c",
            "max_wal_senders=8",
        ])
        .start()
        .await
        .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;

    let host = container
        .get_host()
        .await
        .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;
    let port = container
        .get_host_port_ipv4(5432.tcp())
        .await
        .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;

    let admin_dsn = format!(
        "host={host} port={port} user=postgres password=postgres dbname=cdc connect_timeout=30"
    );
    let (admin_client, admin_conn) = tokio_postgres::connect(&admin_dsn, NoTls)
        .await
        .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;
    tokio::spawn(async move {
        let _ = admin_conn.await;
    });

    let setup_sql = format!(
        "
        CREATE TABLE IF NOT EXISTS public.{table_name} (
          id BIGINT PRIMARY KEY,
          value TEXT
        );
        ALTER TABLE public.{table_name} REPLICA IDENTITY FULL;
        DROP PUBLICATION IF EXISTS {publication_name};
        CREATE PUBLICATION {publication_name} FOR TABLE public.{table_name};
        "
    );
    admin_client
        .batch_execute(&setup_sql)
        .await
        .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;

    let config = PostgresSourceConfig {
        host: host.to_string(),
        port,
        user: "postgres".to_string(),
        password: "postgres".to_string().into(),
        database: "cdc".to_string(),
        replication_slot_name: slot_name.to_string(),
        publication_name: publication_name.to_string(),
        conn_timeout_secs: 30,
        stream_poll_interval_ms: 50,
        max_events_per_poll: 1_000,
        transport: TransportConfig::tls(),
        ..Default::default()
    };

    let connection = PostgresConnection::new(config);
    connection.connect().await?;
    assert_eq!(connection.source_type(), "postgres");
    connection.close().await;

    Ok(())
}

postgres_version_test!(
    postgres_version_12_connection,
    "12-alpine",
    "pg12_test_slot",
    "pg12_test_pub",
    "pg12_test_table",
    "✓ PostgreSQL 12 compatibility verified"
);

postgres_version_test!(
    postgres_version_14_connection,
    "14-alpine",
    "pg14_test_slot",
    "pg14_test_pub",
    "pg14_test_table",
    "✓ PostgreSQL 14 compatibility verified"
);

postgres_version_test!(
    postgres_version_15_connection,
    "15-alpine",
    "pg15_test_slot",
    "pg15_test_pub",
    "pg15_test_table",
    "✓ PostgreSQL 15 compatibility verified"
);

postgres_version_test!(
    postgres_version_16_connection,
    "16-alpine",
    "pg16_test_slot",
    "pg16_test_pub",
    "pg16_test_table",
    "✓ PostgreSQL 16 compatibility verified"
);