rustcdc 0.6.4

Embeddable Rust CDC library focused on correctness-first capture primitives
Documentation

rustcdc

rustcdc is an embeddable CDC library for Rust with a correctness-first design. The repository includes canonical event contracts, checkpoint safety primitives, schema history abstractions, an embedded runtime, and PostgreSQL/MySQL/MariaDB/SQL Server source connectors.

Status 🚀

Active development. Core connector/runtime library paths are implemented and validated by unit and integration suites.

Current crate release: 0.6.4.

MSRV 🛠️

This crate targets Rust 1.92 or newer, matching the rust-version declared in Cargo.toml.

Build 📦

cargo build
cargo build --features postgres

Default profile enables postgres + tls. WASM transforms are opt-in (--features wasm).

Feature Profiles ⚙️

  • default profile: postgres + tls (lean, no JIT runtime in the default binary)
  • --features wasm: WASM transform sandbox via wasmtime (~15 MB release binary overhead; opt-in by design)
  • --features postgres: PostgreSQL connector profile (TLS transport is required and enabled transitively)
  • --features mysql: MySQL connector profile (TLS transport is required and enabled transitively)
  • --features mariadb: MariaDB connector profile (reuses the MySQL transport stack with MariaDB source identity)
  • --features sqlserver: SQL Server connector profile (TLS transport is required and enabled transitively)
  • --features tls: explicit TLS transport surface (already included by relational connector features)
  • --features outbox: enables outbox helpers and transforms
  • --features encryption: enables encryption-oriented transforms and helpers
  • --features metrics: enables OpenTelemetry metrics/tracing integrations
  • --no-default-features: foundation-only validation without source connectors
  • --all-features: validates the full additive feature surface

For self-signed or private-CA deployments, configure TLS directly with TransportConfig::tls_with_ca_cert_path(...) or TransportConfig::mtls(...). No Cargo feature is required for those production-safe paths.

License

Licensed under either of:

Run local quality checks:

cargo check --all-targets --all-features
cargo clippy --all-targets --all-features -- -D warnings
bash scripts/ci-policy-gate.sh

Run full connector-backed evidence locally (requires Docker daemon):

bash scripts/ci-benchmark-gate.sh
bash scripts/run_full_integration_matrix_evidence.sh

To validate the foundation profile without source-specific features:

cargo test --lib --no-default-features

Benchmark Evidence Policy

Benchmark evidence is produced via scripts/ci-benchmark-gate.sh. Local runs are allowed, but are classified as non-release evidence unless strict release-policy inputs are satisfied.

Example local run (non-release classification expected):

bash scripts/ci-benchmark-gate.sh

Release-grade benchmark classification now requires commit-pinned metadata plus a named Criterion baseline:

BENCHMARK_STRICT=1 \
BENCHMARK_MAX_REGRESSION_PERCENT=5 \
BENCHMARK_BASELINE_COMMIT="$(git rev-parse HEAD)" \
BENCHMARK_BASELINE_ARTIFACT="BENCHMARK_REPORT.md" \
CRITERION_BASELINE="ci-baseline" \
bash scripts/ci-benchmark-gate.sh

Use the same CRITERION_BASELINE=ci-baseline value in CI so release evidence and local reports compare against the same named baseline.

Quick Start ✅

use rustcdc::{checkpoint::InMemoryCheckpoint, schema_history::InMemorySchemaHistory, RuntimeConfig, RuntimeSourceConfig};

let checkpoint = InMemoryCheckpoint::default();
let schema_history = InMemorySchemaHistory::default();
let config = RuntimeConfig::new(RuntimeSourceConfig::Disabled, checkpoint, schema_history);

let _config = config;

Delivery Guarantees 🔁

  • Runtime delivery contract is at-least-once.
  • Duplicate event delivery is possible after crashes, restart boundaries, and partial ack/commit windows.
  • Ordering is preserved within committed ack prefixes, but consumers must still tolerate duplicates.
  • Downstream systems should apply idempotency using stable keys (for example: source + table + primary key + source offset/transaction metadata).

Operational expectation:

  • Treat rustcdc as correctness-first at-least-once transport, not exactly-once.
  • Validate sink-side deduplication in staging before production rollout.

Runtime Transform Error Policy 🧯

RuntimeConfig defaults to halting on transform failures via TransformErrorPolicy::Halt. For best-effort pipelines, switch to TransformErrorPolicy::Skip:

use rustcdc::{
	checkpoint::InMemoryCheckpoint,
	schema_history::InMemorySchemaHistory,
	PostgresSourceConfig,
	RuntimeConfig,
	RuntimeSourceConfig,
	TransformErrorPolicy,
};
let checkpoint = InMemoryCheckpoint::default();
let schema_history = InMemorySchemaHistory::default();
let source = PostgresSourceConfig {
	host: "localhost".into(),
	port: 5432,
	user: "postgres".into(),
	password: "postgres".into(),
	database: "app".into(),
	replication_slot_name: "rustcdc_slot".into(),
	publication_name: "rustcdc_publication".into(),
	conn_timeout_secs: 30,
	..PostgresSourceConfig::default()
};

let config = RuntimeConfig::new(RuntimeSourceConfig::Postgres(source), checkpoint, schema_history)
	.with_transform_error_policy(TransformErrorPolicy::Skip);

Halt is the safe default because it preserves strict failure visibility.

Post-Commit Confirmation Policy

RuntimeConfig now defaults to PostCommitSourceConfirmPolicy::FailFast. If source confirmation fails after durable checkpoint commit, runtime returns an error by default to surface confirmation divergence immediately.

For availability-biased pipelines, opt into continue behavior explicitly:

use rustcdc::PostCommitSourceConfirmPolicy;

let config = config.with_post_commit_source_confirm_policy(
	PostCommitSourceConfirmPolicy::Continue,
);

TRUNCATE Event Support

PostgreSQL TRUNCATE statements are surfaced as Operation::Truncate events. before and after are both None for truncate events. Connectors that support truncate events advertise ConnectorCapabilities::truncate.

Connection Retry 🔄

Configure ConnectionRetryPolicy for automatic reconnection on transient source failures:

use rustcdc::core::ConnectionRetryPolicy;

let config = config.with_connection_retry(ConnectionRetryPolicy {
    max_retries: Some(5),    // None = retry indefinitely
    initial_delay_ms: 300,
    max_delay_ms: 10_000,
});

Only recoverable errors (SourceError, TimeoutError) trigger retry. Fatal errors propagate immediately.

Transport Configuration 🔒

All connectors default to TLS. For trusted private networks or local testing only, use the explicit plaintext escape hatch:

use rustcdc::TransportConfig;

let transport = TransportConfig::plaintext(); // ⚠️ never use in production

PostgreSQL Example 🐘

Build and run the PostgreSQL example:

cargo build --example pg_to_stdout --features postgres
./target/debug/examples/pg_to_stdout --host localhost --port 5432 --database testdb --snapshot-tables public.users

The example also accepts environment variables (CDC_RS_HOST, CDC_RS_PORT, CDC_RS_DB, CDC_RS_SNAPSHOT_TABLES, and related settings) and commits every 100 events by default.

MariaDB Example 🐬

Build and run the MariaDB example:

cargo build --example mariadb_to_stdout --features mariadb
./target/debug/examples/mariadb_to_stdout --host localhost --port 3306 --database testdb --snapshot-tables public.users

The MariaDB example uses the same runtime loop as the PostgreSQL example, but it starts from MariaDbSourceConfig and a MariaDB-specific source identity.

Docker Compose Example 🐳

Bring up the local PostgreSQL + pg_to_stdout demo stack:

docker compose up --build

The compose setup initializes public.users and publication rustcdc_example_pub automatically.

Stop and clean up:

docker compose down -v

Documentation Map 📚

Operational Documentation

Developer Documentation

Project Documentation