rivven-cdc
Native Change Data Capture for PostgreSQL, MySQL, MariaDB, and SQL Server.
Overview
rivven-cdc captures database changes and outputs CdcEvent structs with JSON data. The serialization format (Avro, Protobuf, JSON Schema) is decided by the consumer (typically rivven-connect).
Supported Databases
| Database | Version | Protocol | Status |
|---|---|---|---|
| PostgreSQL | 10+ | WAL streaming (pgoutput) | ✅ Available |
| MySQL | 5.7+ | Binlog streaming (GTID) | ✅ Available |
| MariaDB | 10.2+ | Binlog streaming (GTID) | ✅ Available |
| SQL Server | 2016 SP1+ | CDC table polling | ✅ Available |
| Oracle | 19c+ | LogMiner | 📋 Planned |
Design Philosophy
rivven-cdc is format-agnostic. This separation of concerns allows:
- Using CDC events without any schema registry
- Choosing serialization format at the connector level, not CDC level
- Simpler CDC library with fewer dependencies
rivven-cdc rivven-connect
┌──────────────────┐ ┌──────────────────────┐
│ Database │ │ CDC Source Connector │
│ ↓ │ │ ↓ │
│ CdcSource │ ──CdcEvent──► │ Schema Inference │
│ ↓ │ │ ↓ │
│ CdcConnector │ │ Schema Registry │
│ ↓ │ │ ↓ │
│ Topic Routing │ │ Serialization │
└──────────────────┘ └──────────────────────┘
Public API Organization
The crate exports types in three tiers for clarity:
Tier 1: Core Types (crate root)
Essential types for basic CDC operations:
CdcEvent,CdcOp,CdcSource- Core event typesCdcError,Result,ErrorCategory- Error handlingCdcFilter,CdcFilterConfig,TableColumnConfig- Filtering
Tier 2: Feature Types (crate root)
Optional features for production use:
- SMT Transforms:
SmtChain,MaskField,Filter,Cast, etc. - Deduplication:
Deduplicator,DeduplicatorConfig - Encryption:
FieldEncryptor,EncryptionConfig,MemoryKeyProvider - Tombstones:
TombstoneEmitter,TombstoneConfig - Schema Changes:
SchemaChangeEmitter,SchemaChangeEvent - Transactions:
TransactionTopicEmitter,TransactionEvent - Signals:
SignalProcessor,Signal,SignalResult
Tier 3: Advanced Types (common:: module)
Internal/advanced types for custom implementations:
- Resilience:
CircuitBreaker,RateLimiter,RetryConfig - Routing:
EventRouter,RouteRule - Snapshot:
SnapshotCoordinator,SnapshotConfig - And many more via
use rivven_cdc::common::*
Documentation
| Guide | Description |
|---|---|
| PostgreSQL CDC Guide | Complete PostgreSQL setup, TLS, authentication, production deployment |
| MySQL/MariaDB CDC Guide | MySQL and MariaDB binary log replication setup |
| Configuration Reference | All CDC configuration options and environment variables |
| Troubleshooting Guide | Diagnose and resolve common CDC issues |
| CDC Overview | Feature overview and quick start |
Features
- 🚀 Native Implementation - Direct TCP connections, no external dependencies
- 🐘 PostgreSQL - Logical replication via pgoutput plugin (v14+ recommended, v10+ supported)
- 🐬 MySQL/MariaDB - Binlog replication with GTID support (MySQL 8.0+, MariaDB 10.5+)
- 🔒 TLS/mTLS - Secure connections with optional client certificate auth
- 🔑 Full Auth Support - SCRAM-SHA-256 (PostgreSQL), caching_sha2_password + ed25519 (MySQL/MariaDB)
- 📦 Zero-Copy - Efficient binary protocol parsing
- ⚡ Async - Built on Tokio for high-performance streaming
- 📡 Signal Table - Runtime control with ad-hoc snapshots and pause/resume
- 🔄 Incremental Snapshots - Re-snapshot tables while streaming continues
- 🎯 Format-Agnostic - No schema registry coupling; serialization handled by consumers
- 📊 Avro Serialization - Built-in
CdcEventSerializerwith JSON and Avro binary formats, Confluent wire format support
Supported Versions
PostgreSQL
| Version | Status | EOL | Notes |
|---|---|---|---|
| 14.x | ✅ Tested | Nov 2026 | Streaming large transactions |
| 15.x | ✅ Tested | Nov 2027 | Row filters, column lists |
| 16.x | ✅ Recommended | Nov 2028 | Parallel apply |
| 17.x | ✅ Tested | Nov 2029 | Enhanced logical replication |
MySQL / MariaDB
MySQL
| Version | Status | EOL | Notes |
|---|---|---|---|
| 8.0.x | ✅ Tested | Apr 2026 | GTID, caching_sha2_password |
| 8.4.x | ✅ Recommended | Apr 2032 | LTS, enhanced replication |
| 9.0.x | ✅ Tested | TBD | Innovation release (latest) |
MariaDB
| Version | Status | EOL | Notes |
|---|---|---|---|
| 10.6.x | ✅ Tested | Jul 2026 | LTS, GTID improvements |
| 10.11.x | ✅ Recommended | Feb 2028 | LTS, enhanced JSON, parallel replication |
| 11.4.x | ✅ Tested | May 2029 | LTS, latest features |
SQL Server
| Version | Status | EOL | Notes |
|---|---|---|---|
| 2016 SP1+ | ✅ Tested | Jul 2026 | CDC support introduced |
| 2019 | ✅ Recommended | Jan 2030 | Enhanced CDC performance |
| 2022 | ✅ Tested | TBD | Latest features |
| Azure SQL | ✅ Tested | N/A | Managed CDC support |
Optional Features
| Feature | Description |
|---|---|
postgres |
PostgreSQL CDC (default) |
mysql |
MySQL/MariaDB CDC (default) |
sqlserver |
SQL Server CDC |
postgres-tls |
PostgreSQL with TLS |
mysql-tls |
MySQL/MariaDB with TLS |
sqlserver-tls |
SQL Server with TLS |
mariadb |
MariaDB CDC (alias for mysql with MariaDB extensions) |
Feature Matrix
| Feature | Supported | Notes |
|---|---|---|
| Logical Replication | ✅ | pgoutput plugin |
| Binlog Streaming | ✅ | MySQL/MariaDB GTID |
| CDC Table Polling | ✅ | SQL Server CDC tables |
| Binlog Checksum (CRC32) | ✅ | FDE checksum byte parsing per MySQL internals, auto-negotiation |
| Schema Metadata | ✅ | Real column names from INFORMATION_SCHEMA |
| Initial Snapshot | ✅ | Parallel, resumable |
| TLS/SSL | ✅ | rustls-based |
| Table/Column Filtering | ✅ | Glob patterns (regex-backed) |
| Column Masking | ✅ | Redacted |
| Heartbeats | ✅ | WAL acknowledgment |
| Heartbeat Action Query | ✅ | Multi-database support |
| Checkpointing | ✅ | LSN/binlog position (SQL Server: deferred to next poll cycle for at-least-once delivery) |
| Schema Inference | ✅ | Via rivven-connect |
| Tombstone Events | ✅ | TombstoneEmitter for log compaction |
| REPLICA IDENTITY | ✅ | ReplicaIdentityEnforcer with warn/skip/fail |
| Schema Change Topic | ✅ | SchemaChangeEmitter for DDL tracking |
| SCRAM-SHA-256 | ✅ | RFC 5802 PostgreSQL auth |
| caching_sha2_password | ✅ | MySQL 8.0+ default auth plugin |
| client_ed25519 | ✅ | MariaDB Ed25519 auth |
| Signal Table | ✅ | Multi-channel (source/topic/file) |
| Transaction Metadata Topic | ✅ | TransactionTopicEmitter BEGIN/END events |
| Read-Only Replicas | ✅ | Heartbeat-based watermarking for standbys |
| Incremental Snapshots | ✅ | DBLog watermarks, chunk-based |
| Event Routing | ✅ | EventRouter with content-based routing, DLQ |
| Partitioning | ✅ | Partitioner with KeyHash, TableHash, RoundRobin |
| Log Compaction | ✅ | Compactor for key-based deduplication |
| Parallel CDC | ✅ | ParallelSource for multi-table concurrency |
| Outbox Pattern | ✅ | OutboxProcessor for transactional messaging |
| Health Monitoring | ✅ | HealthMonitor with auto-recovery, lag monitoring |
| Notifications | ✅ | Notifier for snapshot/streaming progress alerts |
| Field Encryption | ✅ | AES-256-GCM column-level encryption |
| Error Classification | ✅ | Categorized errors with retry detection |
Error Handling
The crate provides comprehensive error classification for intelligent retry and alerting:
use ;
// Error categories for metrics/alerting
match error.category
// Check if error is retriable
if error.is_retriable
// Get metric-safe error code
println!; // e.g., "connection_closed", "timeout"
Error Types:
| Error | Category | Retriable | Description |
|---|---|---|---|
ConnectionClosed |
Network | ✅ | Connection lost |
ConnectionRefused |
Network | ✅ | Host unreachable |
Timeout |
Network | ✅ | Operation timed out |
DeadlockDetected |
Database | ✅ | Transaction deadlock |
ReplicationSlotInUse |
Replication | ✅ | Slot occupied |
RateLimitExceeded |
Network | ✅ | Throttled |
SnapshotFailed |
Database | Conditional | Snapshot error (retryable if timeout/lock) |
Authentication |
Database | ❌ | Auth failure |
Schema |
Schema | ❌ | Invalid schema |
Config |
Configuration | ❌ | Bad configuration |
Transform |
Serialization | ❌ | SMT transform error |
Encryption |
Serialization | ❌ | Encrypt/decrypt error |
Checkpoint |
Database | ❌ | Offset store error |
Signal Table (Runtime Control)
Control CDC connectors at runtime without restarting - trigger snapshots, pause/resume streaming:
use ;
use ;
// Configure signal table via CDC stream (default)
let signal_config = builder
.enabled_channels
.signal_data_collection
.build;
let config = builder
.connection_string
.slot_name
.publication_name
.signal_config
.build?;
let mut cdc = new;
// Register custom signal handler
cdc.signal_processor
.register_handler
.await;
cdc.start.await?;
Signal Types:
| Signal | Description |
|---|---|
execute-snapshot |
Trigger ad-hoc incremental/blocking snapshot |
stop-snapshot |
Stop in-progress snapshot |
pause-snapshot |
Pause streaming |
resume-snapshot |
Resume streaming |
log |
Log a diagnostic message |
Signal Channels:
| Channel | Description |
|---|---|
source |
Signal table captured via CDC stream (default, recommended) |
topic |
Signals from Rivven topic |
file |
Signals from JSON file |
Incremental Snapshots
Re-snapshot tables while streaming continues using DBLog-style watermark deduplication:
use ;
// Configure incremental snapshots with parallel processing
let config = builder
.chunk_size // Rows per chunk
.max_concurrent_chunks // Process 4 chunks in parallel
.watermark_strategy
.max_buffer_memory // 64MB buffer per chunk
.signal_table
.build;
let coordinator = new;
// Start incremental snapshot for tables
let request = new
.with_condition
.with_surrogate_key;
let snapshot_id = coordinator.start.await?;
// Process chunks with watermark-based deduplication
while let Some = coordinator.next_chunk.await?
// Check statistics
let stats = coordinator.stats;
println!;
Deduplication Logic:
| Scenario | Action |
|---|---|
Streaming event with ts >= watermark_ts |
Drop snapshot row (streaming wins) |
Streaming event with ts < watermark_ts |
Keep snapshot row (snapshot is fresher) |
| DELETE event | Always drop snapshot row (deletes win) |
Watermark Deduplication:
During the window, streaming events with matching primary keys cause buffered snapshot entries to be dropped:
// While window is open, check streaming events for conflicts
let key = "123"; // Primary key from streaming event
if coordinator.check_streaming_conflict.await
Tombstone Events
Tombstones are emitted after DELETE events for log compaction:
use ;
use CdcEvent;
// Enable tombstones (default)
let config = default;
let emitter = new;
// Process events - DELETE will be followed by a TOMBSTONE
let delete = delete;
let events = emitter.process;
// events = [DELETE, TOMBSTONE]
REPLICA IDENTITY Enforcement
Ensure PostgreSQL tables have proper REPLICA IDENTITY for complete before images:
use ;
// Warn when tables don't have REPLICA IDENTITY FULL
let config = builder
.require_full
.enforcement_mode
.exclude_table // Skip audit tables
.build;
let enforcer = new;
// Check table identity (from RELATION message)
let result = enforcer.check_sync;
// Logs: Table "public"."users" has REPLICA IDENTITY DEFAULT (not FULL).
// Fix with: ALTER TABLE "public"."users" REPLICA IDENTITY FULL
Schema Change Topic
Publish DDL events to a dedicated topic for downstream schema synchronization:
use ;
// Configure schema change publishing
let config = builder
.enabled
.topic_prefix // -> mydb.schema_changes
.include_ddl // Include DDL SQL
.include_columns // Include column details
.exclude_tables
.build;
let emitter = new;
// Detect schema changes from PostgreSQL RELATION messages
let columns = vec!;
let event = emitter.detect_postgres_change.await;
// Or detect from MySQL DDL queries
let event = emitter.detect_mysql_change.await;
Transaction Metadata Topic
Publish transaction BEGIN/END markers to a dedicated topic for downstream correlation:
use ;
use CdcEvent;
// Configure transaction metadata publishing
let config = builder
.enabled
.topic_suffix // -> cdc.orders.transaction
.enrich_events // Add tx context to events
.build;
let emitter = new;
// Begin a transaction
emitter.begin_transaction;
// Process events within the transaction
let event = insert;
let tx_meta = new;
// Enrich event with transaction context (total_order, data_collection_order)
if let Some = emitter.enrich_event
// End transaction - returns END event with per-table counts
if let Some = emitter.end_transaction
Read-Only Replica Support
CDC from PostgreSQL standbys without write access:
use ;
use SignalChannelType;
// Configure for read-only replica
let config = builder
.read_only
.min_postgres_version // Required for pg_current_xact_id_if_assigned()
.allowed_channel // Source channel disabled
.allowed_channel
.heartbeat_watermarking // Use heartbeat-based watermarks
.build;
// Check what channels are available
let guard = new;
assert!;
assert!; // Blocked
// Verify features available in read-only mode
assert!;
assert!; // Blocked
// Track watermarks for incremental snapshot deduplication
let mut tracker = new;
// Open snapshot window
tracker.snapshot_started;
// Process streaming events - deduplicate against snapshot
let result = tracker.should_keep_event;
match result
// Close snapshot window
tracker.snapshot_completed;
Restricted Features in Read-Only Mode:
| Feature | Status |
|---|---|
SignalTableSource |
❌ Requires INSERT |
IncrementalSnapshotWrite |
❌ Requires signal table writes |
AutoCreateSlot |
❌ Requires pg_create_logical_replication_slot() |
AutoCreatePublication |
❌ Requires CREATE PUBLICATION |
Streaming |
✅ Available |
IncrementalSnapshotRead |
✅ Via heartbeat watermarking |
TopicSignals |
✅ Available |
FileSignals |
✅ Available |
Quick Start
PostgreSQL
use ;
use ;
use CdcSource;
// With TLS
let tls = new;
let config = builder
.connection_string
.slot_name
.publication_name
.tls_config
.build?;
let mut cdc = new;
cdc.start.await?;
MySQL / MariaDB
use ;
use ;
// With TLS
let tls = new;
let config = new
.with_password
.with_database
.with_server_id
.with_tls;
let cdc = new;
SQL Server
use ;
use CdcSource;
// SQL Server CDC uses poll-based change capture
let config = builder
.host
.port
.username
.password
.database
.poll_interval_ms
.snapshot_mode // or Never, Always, WhenNeeded
.include_table
.include_table
.build?;
let mut cdc = new;
let rx = cdc.take_event_receiver.expect;
cdc.start.await?;
// Access metrics for observability
let metrics = cdc.metrics;
println!;
while let Some = rx.recv.await
SQL Server Setup:
-- Enable CDC on database
USE mydb;
EXEC sys.sp_cdc_enable_db;
-- Enable CDC on tables
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'users',
@role_name = NULL;
-- Start SQL Server Agent (required for CDC capture job)
-- In container: /opt/mssql/bin/sqlservr
Documentation
- CDC Overview - Feature overview and concepts
- PostgreSQL CDC Guide - PostgreSQL setup and configuration
- MySQL/MariaDB CDC Guide - MySQL and MariaDB setup
- SQL Server CDC Guide - SQL Server setup and configuration
- Configuration Reference - All configuration options
- Troubleshooting Guide - Diagnose and resolve issues
Documentation
License
Apache-2.0. See LICENSE.