Rivven CDC
Native Change Data Capture for PostgreSQL, MySQL, and MariaDB.
Documentation
| Guide | Description |
|---|---|
| PostgreSQL CDC Guide | Complete PostgreSQL setup, TLS, authentication, production deployment |
| MySQL/MariaDB CDC Guide | MySQL and MariaDB binary log replication setup |
| Configuration Reference | All CDC configuration options and environment variables |
| Troubleshooting Guide | Diagnose and resolve common CDC issues |
| CDC Overview | Feature overview and quick start |
Features
- 🚀 Native Implementation - Direct TCP connections, no external dependencies
- 🐘 PostgreSQL - Logical replication via pgoutput plugin (v10+)
- 🐬 MySQL/MariaDB - Binlog replication with GTID support (MySQL 5.7+, MariaDB 10.2+)
- 🔒 TLS/mTLS - Secure connections with optional client certificate auth
- 📦 Zero-Copy - Efficient binary protocol parsing
- ⚡ Async - Built on Tokio for high-performance streaming
- 📡 Signal Table - Runtime control with Debezium-compatible signaling
- 🔄 Incremental Snapshots - Re-snapshot tables while streaming continues
Debezium Comparison
Rivven-cdc provides ~100% feature parity with Debezium:
| Feature | Rivven-cdc | Debezium | Notes |
|---|---|---|---|
| Logical Replication | ✅ | ✅ | pgoutput plugin |
| Binlog Streaming | ✅ | ✅ | MySQL/MariaDB GTID |
| Initial Snapshot | ✅ | ✅ | Parallel, resumable |
| TLS/SSL | ✅ | ✅ | rustls-based |
| Table/Column Filtering | ✅ | ✅ | Glob patterns |
| Column Masking | ✅ | ✅ | Redacted |
| Heartbeats | ✅ | ✅ | WAL acknowledgment |
| Heartbeat Action Query | ✅ | ✅ | Multi-database support |
| Checkpointing | ✅ | ✅ | LSN/binlog position |
| Schema Inference | ✅ | ✅ | Avro schema generation |
| Tombstone Events | ✅ | ✅ | TombstoneEmitter for log compaction |
| REPLICA IDENTITY | ✅ | ✅ | ReplicaIdentityEnforcer with warn/skip/fail |
| Schema Change Topic | ✅ | ✅ | SchemaChangeEmitter for DDL tracking |
| SCRAM-SHA-256 | ✅ | ✅ | RFC 5802 PostgreSQL auth |
| Signal Table | ✅ | ✅ | Multi-channel (source/topic/file) |
| Transaction Metadata Topic | ✅ | ✅ | TransactionTopicEmitter BEGIN/END events |
| Read-Only Replicas | ✅ | ✅ | Heartbeat-based watermarking for standbys |
| Incremental Snapshots | ✅ | ✅ | DBLog watermarks, chunk-based |
| Circuit Breaker | ✅ | - | Rivven advantage |
| Rate Limiting | ✅ | - | Token bucket algorithm |
Signal Table (Runtime Control)
Control CDC connectors at runtime without restarting - trigger snapshots, pause/resume streaming:
use ;
use ;
// Configure signal table via CDC stream (default)
let signal_config = builder
.enabled_channels
.signal_data_collection
.build;
let config = builder
.connection_string
.slot_name
.publication_name
.signal_config
.build?;
let mut cdc = new;
// Register custom signal handler
cdc.signal_processor
.register_handler
.await;
cdc.start.await?;
Signal Types:
| Signal | Description |
|---|---|
execute-snapshot |
Trigger ad-hoc incremental/blocking snapshot |
stop-snapshot |
Stop in-progress snapshot |
pause-snapshot |
Pause streaming |
resume-snapshot |
Resume streaming |
log |
Log a diagnostic message |
Signal Channels:
| Channel | Description |
|---|---|
source |
Signal table captured via CDC stream (default, recommended) |
topic |
Signals from Rivven/Kafka topic |
file |
Signals from JSON file |
Incremental Snapshots
Re-snapshot tables while streaming continues using DBLog-style watermark deduplication:
use ;
// Configure incremental snapshots
let config = builder
.chunk_size // Rows per chunk
.watermark_strategy
.max_buffer_memory // 64MB buffer
.signal_table
.build;
let coordinator = new;
// Start incremental snapshot for tables
let request = new
.with_condition
.with_surrogate_key;
let snapshot_id = coordinator.start.await?;
// Process chunks with watermark-based deduplication
while let Some = coordinator.next_chunk.await?
// Check statistics
let stats = coordinator.stats;
println!;
Watermark Deduplication:
During the window, streaming events with matching primary keys cause buffered snapshot entries to be dropped:
// While window is open, check streaming events for conflicts
let key = "123"; // Primary key from streaming event
if coordinator.check_streaming_conflict.await
Tombstone Events
Tombstones are emitted after DELETE events for Kafka log compaction:
use ;
use CdcEvent;
// Enable tombstones (default)
let config = default;
let emitter = new;
// Process events - DELETE will be followed by a TOMBSTONE
let delete = delete;
let events = emitter.process;
// events = [DELETE, TOMBSTONE]
REPLICA IDENTITY Enforcement
Ensure PostgreSQL tables have proper REPLICA IDENTITY for complete before images:
use ;
// Warn when tables don't have REPLICA IDENTITY FULL
let config = builder
.require_full
.enforcement_mode
.exclude_table // Skip audit tables
.build;
let enforcer = new;
// Check table identity (from RELATION message)
let result = enforcer.check_sync;
// Logs: Table "public"."users" has REPLICA IDENTITY DEFAULT (not FULL).
// Fix with: ALTER TABLE "public"."users" REPLICA IDENTITY FULL
Schema Change Topic
Publish DDL events to a dedicated topic for downstream schema synchronization:
use ;
// Configure schema change publishing
let config = builder
.enabled
.topic_prefix // -> mydb.schema_changes
.include_ddl // Include DDL SQL
.include_columns // Include column details
.exclude_tables
.build;
let emitter = new;
// Detect schema changes from PostgreSQL RELATION messages
let columns = vec!;
let event = emitter.detect_postgres_change.await;
// Or detect from MySQL DDL queries
let event = emitter.detect_mysql_change.await;
Transaction Metadata Topic
Publish transaction BEGIN/END markers to a dedicated topic for downstream correlation:
use ;
use CdcEvent;
// Configure transaction metadata publishing
let config = builder
.enabled
.topic_suffix // -> cdc.orders.transaction
.enrich_events // Add tx context to events
.build;
let emitter = new;
// Begin a transaction
emitter.begin_transaction;
// Process events within the transaction
let event = insert;
let tx_meta = new;
// Enrich event with transaction context (total_order, data_collection_order)
if let Some = emitter.enrich_event
// End transaction - returns END event with per-table counts
if let Some = emitter.end_transaction
Read-Only Replica Support
CDC from PostgreSQL standbys without write access:
use ;
use SignalChannelType;
// Configure for read-only replica
let config = builder
.read_only
.min_postgres_version // Required for pg_current_xact_id_if_assigned()
.allowed_channel // Source channel disabled
.allowed_channel
.heartbeat_watermarking // Use heartbeat-based watermarks
.build;
// Check what channels are available
let guard = new;
assert!;
assert!; // Blocked
// Verify features available in read-only mode
assert!;
assert!; // Blocked
// Track watermarks for incremental snapshot deduplication
let mut tracker = new;
// Open snapshot window
tracker.snapshot_started;
// Process streaming events - deduplicate against snapshot
let result = tracker.should_keep_event;
match result
// Close snapshot window
tracker.snapshot_completed;
Restricted Features in Read-Only Mode:
| Feature | Status |
|---|---|
SignalTableSource |
❌ Requires INSERT |
IncrementalSnapshotWrite |
❌ Requires signal table writes |
AutoCreateSlot |
❌ Requires pg_create_logical_replication_slot() |
AutoCreatePublication |
❌ Requires CREATE PUBLICATION |
Streaming |
✅ Available |
IncrementalSnapshotRead |
✅ Via heartbeat watermarking |
TopicSignals |
✅ Available |
FileSignals |
✅ Available |
Quick Start
PostgreSQL
use ;
use ;
use CdcSource;
// With TLS
let tls = new;
let config = builder
.connection_string
.slot_name
.publication_name
.tls_config
.build?;
let mut cdc = new;
cdc.start.await?;
MySQL / MariaDB
use ;
use ;
// With TLS
let tls = new;
let config = new
.with_password
.with_database
.with_server_id
.with_tls;
let cdc = new;
Documentation
- CDC Overview - Feature overview and concepts
- PostgreSQL CDC Guide - PostgreSQL setup and configuration
- MySQL/MariaDB CDC Guide - MySQL and MariaDB setup
- Configuration Reference - All configuration options
- Troubleshooting Guide - Diagnose and resolve issues
Benchmarks
Run CDC performance benchmarks:
# Throughput benchmarks (schema inference, event parsing, filtering)
# Latency benchmarks (metrics overhead, pipeline latency, e2e processing)
Benchmark Categories:
| Benchmark | Description |
|---|---|
schema_inference |
Avro schema generation throughput |
type_mapping |
PostgreSQL type → Avro conversion |
event_serialization |
JSON encode/decode performance |
filter_evaluation |
Table include/exclude matching |
batch_processing |
End-to-end batch filter performance |
extended_metrics |
Metrics collection overhead |
pipeline_latency |
Transform chain latency |
e2e_processing |
Full event pipeline with metrics |
memory_efficiency |
Allocation per operation |
Sample Results (M3 MacBook Pro):
| Operation | Throughput | Notes |
|---|---|---|
| Event serialization (small) | ~2.5M/sec | 80 bytes/event |
| Event serialization (large) | ~150K/sec | 5KB/event |
| Filter evaluation (simple) | ~15M/sec | Single glob pattern |
| Filter evaluation (complex) | ~5M/sec | Multi-pattern + masking |
| Metrics record_event | ~50M/sec | Atomic counter |
| Metrics snapshot export | ~200K/sec | Full metrics dump |
License
See root LICENSE file.