pg2any
A high-performance PostgreSQL Change Data Capture (CDC) tool that streams database changes in real-time to multiple destination databases via logical replication.
Supported Destinations
| Destination | Driver | Feature Flag |
|---|---|---|
| MySQL | SQLx | mysql (default) |
| SQL Server | Tiberius (TDS) | sqlserver (default) |
| SQLite | SQLx | sqlite (default) |
| Kafka | rdkafka (librdkafka) | kafka |
Key Features
- Crash-safe persistence - File-based producer-consumer with automatic crash recovery
- Resumable processing - Restart from the exact position within large transaction files
- Streaming transactions - Handles PostgreSQL protocol v2+ in-progress transaction streaming
- SQL compression - Optional gzip compression for transaction files with streaming decompression
- Schema mapping - Configurable PostgreSQL schema to destination database/schema translation
- Prometheus metrics - Built-in HTTP metrics endpoint (feature:
metrics) - Graceful shutdown - Coordinated producer-consumer shutdown with LSN persistence
Quick Start
Prerequisites
PostgreSQL with logical replication enabled:
ALTER SYSTEM SET wal_level = logical;
-- Restart PostgreSQL after this change
CREATE PUBLICATION my_publication FOR ALL TABLES;
;
SELECT ON ALL TABLES IN SCHEMA public TO replicator;
Docker
&& RUST_LOG=info
As a Library
use ;
async
Programmatic Configuration
use ;
let config = builder
.source_connection_string
.destination_type
.destination_connection_string
.replication_slot_name
.publication_name
.streaming
.build?;
Architecture
pg2any uses a file-based producer-consumer pattern for reliable, crash-safe transaction processing:
PostgreSQL WAL Stream
|
v
+-----------+ +------------------+ +-----------+
| Producer | ----> | File System | ----> | Consumer |
| (WAL | | | | (SQL |
| Reader) | | sql_data_tx/ | | Executor)|
| | | sql_received_tx/ | | |
+-----------+ | sql_pending_tx/ | +-----------+
+------------------+ |
v
Destination DB
Transaction Lifecycle
- BEGIN - Create
.sqlfile insql_data_tx/and.metainsql_received_tx/ - Events - Append SQL commands to
.sqlfile via buffered writes (8MB buffer) - COMMIT - Move
.metafromsql_received_tx/tosql_pending_tx/; notify consumer - Execute - Consumer reads pending
.meta, executes SQL from.sqlin batches - Cleanup - Delete both
.metaand.sqlfiles on success; update flush LSN
Crash Recovery
On restart, pg2any:
- Cleans up incomplete transactions from
sql_received_tx/(uncommitted) - Replays committed transactions from
sql_pending_tx/(committed but not yet applied) - Resumes from
last_executed_command_indexwithin partially-executed transaction files - Starts replication from the persisted
flush_lsn
Workflow Diagrams
Transaction Processing Detail Flow
sequenceDiagram
participant PG as PostgreSQL WAL
participant Producer as Producer Task
participant FS as File System
participant Channel as Notification Channel
participant Consumer as Consumer Task
participant Dest as Destination DB
participant LSN as LSN Tracker
Note over PG,LSN: Transaction Processing Flow
PG->>Producer: BEGIN (tx_id: 12345)
Producer->>FS: Create 12345.sql in sql_data_tx/
Producer->>FS: Create 12345.meta in sql_received_tx/
PG->>Producer: INSERT event
Producer->>Producer: Generate SQL
Producer->>FS: Append to buffer (8MB)
PG->>Producer: UPDATE event
Producer->>Producer: Generate SQL
Producer->>FS: Append to buffer
PG->>Producer: DELETE event
Producer->>Producer: Generate SQL
Producer->>FS: Append to buffer
Note over Producer,FS: Buffer reaches 8MB
Producer->>FS: Flush buffer to 12345.sql
PG->>Producer: COMMIT (LSN: 0/1A2B3C4D)
Producer->>FS: Flush remaining buffer
Producer->>FS: Move 12345.meta to sql_pending_tx/
Producer->>Channel: Send notification
Channel->>Consumer: Transaction ready
Consumer->>FS: Read 12345.meta from sql_pending_tx/
Consumer->>FS: Read SQL commands from 12345.sql
loop For each SQL batch
Consumer->>Dest: BEGIN TRANSACTION
Consumer->>Dest: Execute SQL batch
Consumer->>Dest: COMMIT TRANSACTION
Consumer->>FS: Update pending .meta progress
end
Consumer->>FS: Delete 12345.meta and 12345.sql
Consumer->>LSN: Update and persist flush_lsn
Crash Recovery Workflow
graph TB
Crash([System Crash]) --> Restart[Restart pg2any]
Restart --> LoadMeta[Load LSN Metadata]
LoadMeta --> CheckMeta{Metadata Exists?}
CheckMeta -->|No| StartFresh[Start from Latest]
CheckMeta -->|Yes| GetLSN[Extract flush_lsn]
GetLSN --> CleanReceived[Cleanup sql_received_tx/]
CleanReceived --> ProcessPending[Process sql_pending_tx/]
ProcessPending --> SortByTimestamp[Sort by commit_timestamp]
SortByTimestamp --> ForEachPending{For Each .meta}
ForEachPending -->|More files| CheckResume{Has resume index?}
ForEachPending -->|Done| SetStartLSN[Set start_lsn = flush_lsn]
CheckResume -->|Yes| ResumeFromIndex[Resume from last_executed_command_index + 1]
CheckResume -->|No| ReadAll[Read All SQL Commands]
ResumeFromIndex --> Execute[Execute Remaining Commands]
ReadAll --> Execute
Execute --> UpdateLSN[Update LSN Metadata]
UpdateLSN --> DeleteFiles[Delete .meta and .sql]
DeleteFiles --> ForEachPending
SetStartLSN --> StartReplication[Start Replication]
StartFresh --> StartReplication
StartReplication --> NormalOperation([Normal Operation])
style Crash fill:#ff6b6b
style NormalOperation fill:#51cf66
style ResumeFromIndex fill:#ffd43b
Configuration
All configuration is via environment variables (ideal for containers) or the ConfigBuilder API.
Required
| Variable | Description | Example |
|---|---|---|
CDC_SOURCE_CONNECTION_STRING |
PostgreSQL connection string | postgresql://user:pass@host:5432/db?replication=database |
CDC_DEST_TYPE |
Target database type | MySQL, SqlServer, SQLite, Kafka |
CDC_DEST_URI |
Destination connection string | See format table below |
Destination URI Formats
| Database | Format | Example |
|---|---|---|
| MySQL | mysql://user:pass@host:port/db |
mysql://root:pass@localhost:3306/mydb |
| SQL Server | sqlserver://user:pass@host:port/db |
sqlserver://sa:pass@localhost:1433/master |
| SQLite | File path | ./replica.db or /data/replica.db |
| Kafka | Broker list | broker1:9092,broker2:9092 |
Optional
| Variable | Default | Description |
|---|---|---|
CDC_REPLICATION_SLOT |
cdc_slot |
PostgreSQL replication slot name |
CDC_PUBLICATION |
cdc_pub |
PostgreSQL publication name |
CDC_PROTOCOL_VERSION |
1 |
Replication protocol version (1-4) |
CDC_STREAMING |
true |
Stream in-progress transactions (requires protocol v2+) |
CDC_SCHEMA_MAPPING |
Schema translation, e.g. public:cdc_db,sales:sales_db |
|
CDC_BUFFER_SIZE |
500 |
Transaction channel capacity |
CDC_TRANSACTION_SEGMENT_SIZE_MB |
64 |
Max segment file size in MB |
CDC_CONNECTION_TIMEOUT |
30 |
Connection timeout (seconds) |
CDC_QUERY_TIMEOUT |
10 |
Query timeout (seconds) |
CDC_LAST_LSN_FILE |
./pg2any_last_lsn |
Base path for LSN metadata file |
CDC_TRANSACTION_FILE_BASE_PATH |
./ |
Base directory for transaction files |
PG2ANY_ENABLE_COMPRESSION |
false |
Enable gzip compression for SQL files |
RUST_LOG |
pg2any=debug |
Log level |
Monitoring
Enable with feature flag metrics. Exposes Prometheus-compatible metrics on port 8080.
# Key metrics
The Docker Compose setup includes a full observability stack: Prometheus (:9090), Node Exporter, PostgreSQL Exporter, and MySQL Exporter with predefined alert rules.
Development
# Docker environment
# Chaos & integration tests
Feature Flags
[]
= ["mysql", "sqlserver", "sqlite"]
= ["sqlx/mysql"]
= ["tiberius"]
= ["sqlx/sqlite"]
= ["rdkafka", "futures-util", "base64"]
= ["hyper", "hyper-util", "http-body-util", "prometheus"]
Dependencies
| Crate | Purpose |
|---|---|
| pg_walstream | PostgreSQL logical replication protocol |
| tokio | Async runtime |
| sqlx | MySQL + SQLite async driver |
| tiberius | SQL Server TDS protocol |
| rdkafka | Kafka producer (librdkafka wrapper) |
| serde / serde_json | Serialization |
| prometheus | Metrics collection |
| thiserror | Error handling |
flate2 / async-compression |
SQL file compression |
License
Apache-2.0
References
- PostgreSQL Logical Replication Protocol
- PostgreSQL WAL Internals
- pg_walstream - Underlying replication library