rustcdc 0.6.3

Embeddable Rust CDC library focused on correctness-first capture primitives
Documentation
#![cfg(feature = "postgres")]

use rustcdc::{
    checkpoint::{Checkpoint, FileCheckpoint, PostgresOffset},
    schema_history::InMemorySchemaHistory,
    AckMode, CdcRuntime, PostgresSourceConfig, RuntimeConfig, RuntimeSourceConfig,
};
use testcontainers::{
    core::{IntoContainerPort, WaitFor},
    runners::AsyncRunner,
    GenericImage, ImageExt,
};

#[tokio::test]
async fn runtime_postgres_stream_resume_from_checkpoint() -> rustcdc::Result<()> {
    if std::env::var("CDC_RS_RUN_DOCKER_TESTS").as_deref() != Ok("1") {
        eprintln!("skipping postgres runtime integration test (set CDC_RS_RUN_DOCKER_TESTS=1)");
        return Ok(());
    }

    let container = GenericImage::new("postgres", "16-alpine")
        .with_exposed_port(5432.tcp())
        .with_wait_for(WaitFor::message_on_stderr(
            "database system is ready to accept connections",
        ))
        .with_env_var("POSTGRES_USER", "postgres")
        .with_env_var("POSTGRES_PASSWORD", "postgres")
        .with_env_var("POSTGRES_DB", "cdc")
        .with_cmd(vec![
            "postgres",
            "-c",
            "wal_level=logical",
            "-c",
            "max_replication_slots=8",
            "-c",
            "max_wal_senders=8",
        ])
        .start()
        .await
        .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;

    let host = container
        .get_host()
        .await
        .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;
    let port = container
        .get_host_port_ipv4(5432.tcp())
        .await
        .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;

    let admin_dsn = format!(
        "host={host} port={port} user=postgres password=postgres dbname=cdc connect_timeout=30"
    );
    let (admin_client, admin_conn) = tokio_postgres::connect(&admin_dsn, tokio_postgres::NoTls)
        .await
        .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;
    tokio::spawn(async move {
        let _ = admin_conn.await;
    });

    admin_client
        .batch_execute(
            "
            CREATE TABLE IF NOT EXISTS public.runtime_users (
              id BIGINT PRIMARY KEY,
              payload TEXT NOT NULL
            );
            ALTER TABLE public.runtime_users REPLICA IDENTITY FULL;
            DROP PUBLICATION IF EXISTS rustcdc_runtime_pub;
            CREATE PUBLICATION rustcdc_runtime_pub FOR TABLE public.runtime_users;
            ",
        )
        .await
        .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;

    let checkpoint_dir = tempfile::tempdir().map_err(rustcdc::Error::IoError)?;

    let source_cfg = PostgresSourceConfig {
        host: host.to_string(),
        port,
        user: "postgres".to_string(),
        password: "postgres".into(),
        database: "cdc".to_string(),
        replication_slot_name: "rustcdc_runtime_slot".to_string(),
        publication_name: "rustcdc_runtime_pub".to_string(),
        conn_timeout_secs: 30,
        stream_poll_interval_ms: 50,
        max_events_per_poll: 1_000,
        ..PostgresSourceConfig::default()
    };

    let mut runtime = CdcRuntime::new(
        RuntimeConfig::new(
            RuntimeSourceConfig::Postgres(source_cfg.clone()),
            FileCheckpoint::new(checkpoint_dir.path()),
            InMemorySchemaHistory::default(),
        )
        .with_max_buffer_size(256)
        .with_max_poll_wait_ms(150),
    )?;

    runtime.start().await?;

    admin_client
        .batch_execute("TRUNCATE TABLE public.runtime_users;")
        .await
        .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;

    for id in 1_i64..=100_i64 {
        admin_client
            .execute(
                "INSERT INTO public.runtime_users (id, payload) VALUES ($1, $2)",
                &[&id, &format!("payload-{id}")],
            )
            .await
            .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;
    }

    let first_batch = poll_non_empty_batch(&mut runtime, 40).await?;
    assert!(first_batch.len() >= 100);

    let AckMode::Required(token) = first_batch.ack_mode() else {
        panic!("non-empty batch should include ack token");
    };
    let (accepted, _remaining) = token.split_at(50)?;
    runtime.commit_ack(accepted).await?;

    let reader = FileCheckpoint::new(checkpoint_dir.path());
    assert_eq!(reader.get_committed_count().await?, 50);
    let saved = reader
        .load()
        .await?
        .ok_or_else(|| rustcdc::Error::StateError("checkpoint should exist after commit".into()))?;
    let saved_offset = PostgresOffset::from_bytes(&saved.encode()?)?;
    let target_lsn = format_pg_lsn(saved_offset.lsn);
    let advance_sql = format!(
        "SELECT end_lsn::text FROM pg_replication_slot_advance('rustcdc_runtime_slot', '{target_lsn}'::pg_lsn)"
    );
    admin_client
        .query_one(&advance_sql, &[])
        .await
        .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;

    drop(runtime);

    let mut resumed = CdcRuntime::new(
        RuntimeConfig::new(
            RuntimeSourceConfig::Postgres(source_cfg),
            FileCheckpoint::new(checkpoint_dir.path()),
            InMemorySchemaHistory::default(),
        )
        .with_max_buffer_size(256)
        .with_max_poll_wait_ms(150),
    )?;

    resumed.start().await?;

    for id in 101_i64..=150_i64 {
        admin_client
            .execute(
                "INSERT INTO public.runtime_users (id, payload) VALUES ($1, $2)",
                &[&id, &format!("payload-{id}")],
            )
            .await
            .map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;
    }

    let second_batch = poll_non_empty_batch(&mut resumed, 40).await?;
    assert!(second_batch.len() >= 50);

    resumed.commit_ack(second_batch.ack_mode()).await?;
    let reader_after = FileCheckpoint::new(checkpoint_dir.path());
    assert!(reader_after.get_committed_count().await? >= 100);

    Ok(())
}

async fn poll_non_empty_batch(
    runtime: &mut CdcRuntime,
    rounds: usize,
) -> rustcdc::Result<rustcdc::EventBatch> {
    for _ in 0..rounds {
        let chunk = runtime.poll_event_batch().await?;
        if !chunk.is_empty() {
            return Ok(chunk);
        }
    }

    Err(rustcdc::Error::TimeoutError(
        "timed out waiting for a non-empty event batch".to_string(),
    ))
}

fn format_pg_lsn(lsn: u64) -> String {
    format!("{:X}/{:08X}", (lsn >> 32), (lsn & 0xFFFF_FFFF))
}