PostgreSQL to Any Database Replication (pg2any) v0.1.0
A comprehensive PostgreSQL to Any database replication tool using Change Data Capture (CDC) with logical replication. This tool allows you to stream database changes in real-time from PostgreSQL to other databases such as SQL Server, MySQL, and more.
Project Status
This is a working CDC implementation that provides comprehensive PostgreSQL to Any database replication using logical replication.
Current Status: This is a functional CDC tool with complete PostgreSQL logical replication protocol implementation, comprehensive test coverage, and real-time change streaming capabilities.
What's Implemented ✅
- ✅ Complete project structure with Rust workspace configuration
- ✅ Comprehensive library (
pg2any-lib
) with modular architecture - ✅ Rust async architecture with Tokio runtime
- ✅ Configuration management with builder pattern and environment variable support
- ✅ Comprehensive error handling framework with typed errors (
thiserror
) - ✅ PostgreSQL logical replication protocol implementation with full message parsing
- ✅ WAL (Write-Ahead Log) record interpretation and processing
- ✅ Binary protocol message handling with efficient buffer operations
- ✅ LSN (Log Sequence Number) tracking and feedback mechanisms with persistence
- ✅ Transaction boundary handling (BEGIN, COMMIT) with consistency guarantees
- ✅ Graceful shutdown with proper resource cleanup and synchronization
- ✅ Complete destination handlers for MySQL and SQL Server
- ✅ Real-time change streaming (INSERT, UPDATE, DELETE, TRUNCATE) with bug fixes
- ✅ WHERE clause generation with replica identity support for accurate updates
- ✅ Docker containerization with multi-database development environment
- ✅ Development tooling (Makefile, formatting, testing, linting)
- ✅ Production-ready logging and structured error handling with enhanced visibility
What Needs Enhancement 🚧
- 🚧 Advanced monitoring with metrics collection and observability dashboards
- 🚧 Additional destination databases (Oracle, SQLite, ClickHouse, etc.)
- 🚧 Multi-table replication with table filtering and routing
- 🚧 Schema evolution support for DDL changes and migrations
- 🚧 Performance benchmarking and optimization for high-throughput scenarios
Features
- ✅ Architecture: Complete modular library structure with
pg2any-lib
core - ✅ Configuration: Environment-based configuration with builder pattern
- ✅ Async Runtime: Full async/await support with Tokio
- ✅ Error Handling: Comprehensive error types with
thiserror
- ✅ Replication Protocol: Complete PostgreSQL logical replication protocol implementation
- ✅ WAL Processing: Full Write-Ahead Log processing and interpretation
- ✅ Real-time Streaming: Live change streaming (INSERT, UPDATE, DELETE, TRUNCATE)
- ✅ Destinations: Working implementations for MySQL and SQL Server
- ✅ Transaction Handling: BEGIN/COMMIT transaction boundary processing
- ✅ Docker Support: Complete containerized development environment
- ✅ Development Tools: Makefile, formatting, testing, and linting setup
Basic Usage
use ;
async
Configuration Options
use ;
use Duration;
let config = builder
// Required configuration
.source_connection_string
.destination_type // or SqlServer
.destination_connection_string
.replication_slot_name
.publication_name
// Optional configuration
.protocol_version // Logical replication protocol version
.binary_format // Use text format for debugging
.streaming // Stream in-progress transactions
.auto_create_tables // Auto-create destination tables
.connection_timeout
.query_timeout
.heartbeat_interval
.build?;
Architecture
Core Components
- CdcClient: Main orchestrator managing the entire CDC pipeline
- Config/ConfigBuilder: Comprehensive configuration management with environment variable support
- LogicalReplicationStream: PostgreSQL logical replication lifecycle and protocol implementation
- LogicalReplicationParser: Complete PostgreSQL replication protocol message parsing
- DestinationHandler: Production-ready database destination handling (MySQL, SQL Server)
- Error Types: Comprehensive error handling with
CdcError
and proper error propagation - Buffer Operations: Efficient binary protocol handling with zero-copy optimizations
Data Flow Architecture
PostgreSQL WAL → Logical Replication → Message Parser → Change Events → Destination Handler → Target DB
↓ ↓ ↓ ↓ ↓ ↓
Transactions Protocol Messages Parsed Events Typed Changes SQL Operations Replicated Data
Project Structure
This workspace uses Cargo's workspace feature for optimal organization of a working CDC implementation:
pg2any/
├── Cargo.toml # Workspace configuration
├── src/main.rs # Application entry point with full CDC pipeline
├── pg2any-lib/ # Core CDC library (fully implemented)
│ ├── Cargo.toml # Library dependencies
│ ├── src/
│ │ ├── lib.rs # Library public API
│ │ ├── config.rs # Configuration management
│ │ ├── client.rs # Main CDC client with producer/consumer
│ │ ├── error.rs # Comprehensive error types
│ │ ├── destinations/ # Database destination implementations
│ │ │ ├── mod.rs # Destination trait and factory
│ │ │ ├── mysql.rs # MySQL destination handler
│ │ │ └── sqlserver.rs # SQL Server destination handler
│ │ ├── connection.rs # PostgreSQL connection management
│ │ ├── logical_stream.rs # Logical replication stream handling
│ │ ├── pg_replication.rs # Low-level PostgreSQL replication
│ │ ├── replication_protocol.rs # Message parsing implementation
│ │ ├── buffer.rs # Binary protocol buffer operations
│ │ └── types.rs # Core data types and enums
│ └── tests/ # Comprehensive integration tests (24 tests)
│ ├── integration_tests.rs
│ ├── destination_integration_tests.rs
│ ├── event_type_refactor_tests.rs
│ └── where_clause_fix_tests.rs
├── docker-compose.yml # Multi-database development setup
├── Dockerfile # Application containerization
├── Makefile # Development commands
└── scripts/ # Database initialization scripts
├── init_postgres.sql
└── init_mysql.sql
Supported Destination Databases
- MySQL: Complete implementation with type mapping, table creation, and DML operations
- SQL Server: Full implementation with type mapping, table creation, and DML operations
- Extensible: Architecture designed for easy addition of new destination types
Destination Features
- ✅ Automatic table creation with proper schema mapping
- ✅ INSERT, UPDATE, DELETE, TRUNCATE operation support
- ✅ PostgreSQL to destination type conversion
- ✅ WHERE clause generation for UPDATE/DELETE operations
- ✅ Null value handling and data validation
- ✅ Connection pooling and error recovery
Change Event Types
Error Handling
The library provides comprehensive error types using thiserror
:
Configuration
All configuration uses environment variables or the ConfigBuilder
pattern:
// Environment variables (used in Docker setup)
set_var;
set_var;
set_var;
set_var;
set_var;
set_var;
set_var;
set_var;
set_var;
set_var;
set_var;
// Or using the builder pattern
let config = builder
.source_connection_string
.destination_type
.destination_connection_string
.replication_slot_name
.publication_name
.protocol_version
.binary_format
.streaming
.auto_create_tables
.connection_timeout
.query_timeout
.heartbeat_interval
.build?;
Development Status
This project provides working PostgreSQL to Any database replication with comprehensive functionality:
✅ Completed Implementation (v0.1.0)
- Core CDC Pipeline: Complete end-to-end replication from PostgreSQL to destination databases
- PostgreSQL Protocol: Full logical replication protocol implementation with message parsing
- WAL Processing: Complete Write-Ahead Log record parsing and interpretation
- Transaction Processing: BEGIN/COMMIT transaction boundary handling with consistency
- Change Event Processing: Real-time INSERT, UPDATE, DELETE, TRUNCATE operations
- Binary Protocol: Efficient binary message format support with buffer operations
- LSN Management: Log Sequence Number tracking and feedback mechanisms with persistence
- Error Handling: Production-ready error handling with proper error propagation
- Destination Adapters: Working MySQL and SQL Server destination implementations
- Configuration: Environment-based configuration with validation and defaults
- Docker Environment: Working multi-database development environment
- Async Architecture: Full async/await support with graceful shutdown via CancellationToken
- Recent Bug Fixes: Resolved UPDATE/DELETE operations and LSN synchronization issues
- Enhanced Reliability: Improved graceful shutdown and resource management
🚧 Enhancement Opportunities
- Advanced Monitoring: Production metrics collection, dashboards, and alerting
- Schema Evolution: DDL change handling and schema migration support
- Multi-Database Support: Additional destination databases (Oracle, SQLite, ClickHouse)
- Advanced Features: Table filtering, data transformations, and custom routing
- Performance Benchmarking: High-throughput testing and optimization analysis
📊 Current Repository Status
- Version: 0.1.0 (Tagged as
Simple-implementation
) - Branch:
main
(up to date with origin) - Last Updates: Enhanced LSN persistence, graceful shutdown improvements, and WHERE clause fixes
- Test Coverage: Comprehensive test suite covering core functionality and edge cases
- Code Quality: Clean, formatted codebase with proper linting and type safety
Quick Start with Docker
The easiest way to get started is using the provided Docker setup:
# Clone and navigate to the project
# Start the multi-database development environment
# Check service status
# View application logs
# Connect to databases for testing
# Insert test data and watch CDC processing
; ;
Local Development
For local development without Docker:
# Build the project
# Run code quality checks
# Run tests
# Format code
# Run the application locally (requires databases)
Example Application Output
When you run the application, you'll see structured logging output like this:
2025-08-15T10:30:00.123Z INFO pg2any: 🚀 Starting PostgreSQL CDC Application
2025-08-15T10:30:00.124Z INFO pg2any: 📋 Loading configuration from environment variables
2025-08-15T10:30:00.125Z INFO pg2any: 🔗 Configuration loaded successfully
2025-08-15T10:30:00.126Z INFO pg2any: ⚙️ Initializing CDC client
2025-08-15T10:30:00.127Z INFO pg2any: 🔧 Performing CDC client initialization
2025-08-15T10:30:00.128Z INFO pg2any: ✅ CDC client initialized successfully
2025-08-15T10:30:00.129Z INFO pg2any: 🔄 Starting CDC replication pipeline
2025-08-15T10:30:00.130Z DEBUG pg2any_lib::logical_stream: Creating logical replication stream
2025-08-15T10:30:00.131Z DEBUG pg2any_lib::pg_replication: Connected to PostgreSQL server version: 150000
2025-08-15T10:30:00.132Z INFO pg2any_lib::client: Processing BEGIN transaction (LSN: 0/1A2B3C4D)
2025-08-15T10:30:00.133Z INFO pg2any_lib::client: Processing INSERT event on table 'users'
2025-08-15T10:30:00.134Z INFO pg2any_lib::client: Processing COMMIT transaction (LSN: 0/1A2B3C5E)
2025-08-15T10:30:00.135Z INFO pg2any: ✨ CDC replication running! Real-time change streaming active
Note: This shows the complete working application with real PostgreSQL logical replication message processing, LSN tracking, and transaction handling.
Dependencies
tokio
: Async runtime and ecosystemtokio-postgres
: PostgreSQL async client with logical replication supportsqlx
: Multi-database async client (MySQL)tiberius
: Native SQL Server async clientserde
&serde_json
: Serialization frameworkchrono
: Date and time handling with timezone supporttracing
&tracing-subscriber
: Structured logging and observabilitythiserror
: Ergonomic error handling and propagationasync-trait
: Async trait definitionsbytes
: Byte buffer manipulationlibpq-sys
: Low-level PostgreSQL C library bindings for replication
📝 Changelog
Latest Updates (August 2025)
- LSN Persistence Enhancement: Implemented LSN state persistence before shutdown to prevent data loss and ensure consistent restart points
- Graceful Shutdown Improvements: Enhanced shutdown flow with proper resource cleanup, synchronization, and connection management
- WHERE Clause Bug Fixes: Fixed critical UPDATE and DELETE operations with proper replica identity support for accurate row targeting
- Enhanced Transaction Logging: Added comprehensive logging for BEGIN/COMMIT events with better visibility into transaction boundaries
- Bug Resolution: Resolved
last_received_lsn
synchronization issues and various edge cases in change event processing - Test Suite Expansion: Added comprehensive unit tests covering new functionality and edge case scenarios
- Code Quality: Applied consistent formatting and improved error handling patterns
v0.1.0 - Simple Implementation (Tagged Release)
- Complete PostgreSQL logical replication CDC implementation
- Support for MySQL and SQL Server destinations
- Comprehensive test coverage and Docker development environment
- Production-ready async architecture with proper error handling
Test Coverage
Key areas covered by tests:
- PostgreSQL logical replication protocol message parsing
- Buffer operations for binary protocol handling
- LSN (Log Sequence Number) operations and formatting
- Change event creation and processing
- Destination database handlers (MySQL, SQL Server)
- Configuration management and validation
- Error handling and recovery scenarios
- Graceful shutdown and cancellation handling
Contributing
This project provides production-ready PostgreSQL to Any database replication with a solid foundation for contributions. The core CDC functionality is implemented, tested, and continuously improved, making it easy for contributors to focus on specific enhancements:
🎯 High Impact Areas
- Advanced Monitoring: Add production metrics, dashboards, and observability features
- Additional Destinations: Extend support to more databases (Oracle, SQLite, ClickHouse, etc.)
- Schema Evolution: Implement DDL change handling and schema migration capabilities
- Performance Benchmarking: Implement comprehensive performance testing and optimization analysis
- Advanced Features: Add table filtering, data transformations, and routing capabilities
- Documentation: Expand usage examples, troubleshooting guides, and best practices
🏗️ Architecture Benefits for Contributors
- Stable Foundation: Core CDC pipeline is production-ready with recent stability improvements
- Modular Design: Clear separation of concerns makes extending functionality straightforward
- Type Safety: Rust's type system prevents common replication errors and ensures reliability
- Async Architecture: Built for high-performance concurrent processing with Tokio
- Comprehensive Documentation: Well-documented APIs and architecture with recent updates
- Development Environment: Complete Docker setup for immediate local development and testing
- Quality Assurance: Automated testing, formatting, and linting ensure code quality
🔄 Recent Development Activity
- Active Maintenance: Regular bug fixes and improvements (latest commits in August 2025)
- Stability Focus: Recent emphasis on graceful shutdown, LSN persistence, and error handling
- Test-Driven: Expanding test coverage for new features and edge cases
- Production Readiness: Focus on reliability, proper resource management, and operational concerns
🚀 Getting Started Contributing
# Set up development environment
# Start development databases
# Make changes and test
# Database access for testing
📚 Implementation Resources
For extending functionality, refer to:
- PostgreSQL Logical Replication Protocol Documentation
- PostgreSQL WAL Internals
- Logical Decoding Output Plugin
🧪 Testing Your Changes
# Run the comprehensive test suite
# Test specific areas
# Test with real databases
License
MIT OR Apache-2.0