Expand description
§PostgreSQL CDC Library
A comprehensive Change Data Capture (CDC) library for PostgreSQL using logical replication. This library allows you to stream database changes in real-time from PostgreSQL to other databases such as SQL Server, MySQL, and more.
§Features
- PostgreSQL logical replication support
- Real-time change streaming (INSERT, UPDATE, DELETE, TRUNCATE)
- Multiple destination database support (SQL Server, MySQL)
- Async/await support with Tokio
- Comprehensive error handling
- Thread-safe operations
- Built-in backpressure handling
§Quick Start
ⓘ
use pg2any_lib::{load_config_from_env, run_cdc_app};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize comprehensive logging
init_logging();
tracing::info!("Starting PostgreSQL CDC Application");
// Load configuration from environment variables
let config = load_config_from_env()?;
// Run the CDC application with graceful shutdown handling
run_cdc_app(config, None).await?;
tracing::info!("CDC application stopped");
Ok(())
}
pub fn init_logging() {
// Create a sophisticated logging setup
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("pg2any=debug,tokio_postgres=info,sqlx=info"));
let fmt_layer = fmt::layer()
.with_target(true)
.with_thread_ids(true)
.with_level(true)
.with_ansi(true)
.compact();
tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer)
.init();
tracing::info!("Logging initialized with level filtering");
}Re-exports§
pub use app::run_cdc_app;pub use app::CdcApp;pub use app::CdcAppConfig;pub use client::CdcClient;pub use config::Config;pub use config::ConfigBuilder;pub use env::load_config_from_env;pub use error::CdcError;pub use lsn_tracker::create_lsn_tracker_with_load_async;pub use lsn_tracker::LsnTracker;pub use crate::destinations::MySQLDestination;pub use crate::destinations::SqlServerDestination;pub use crate::destinations::SQLiteDestination;pub use crate::destinations::DestinationFactory;pub use crate::destinations::DestinationHandler;pub use crate::types::DestinationType;pub use crate::types::Transaction;pub use crate::monitoring::gather_metrics;pub use crate::monitoring::init_metrics;pub use crate::monitoring::MetricsCollector;pub use crate::monitoring::MetricsCollectorTrait;pub use crate::monitoring::ProcessingTimer;pub use crate::monitoring::ProcessingTimerTrait;
Modules§
- app
- CDC Application Runner
- client
- config
- destinations
- env
- Environment variable loading and configuration utilities
- error
- lsn_
tracker - Thread-safe LSN tracking for CDC replication
- message_
types - Message type constants for logical replication protocol
- monitoring
- Monitoring and Metrics Module
- pg_
replication - High-level PostgreSQL replication wrappers
- types
Structs§
- Buffer
Reader - Buffer reader for parsing binary protocol messages
- Buffer
Writer - Buffer writer for creating binary protocol messages
- Cancellation
Token - A token which can be used to signal a cancellation request to one or more tasks.
- Column
Data - Data for a single column
- Column
Info - Column information in a relation
- Keepalive
Message - Keepalive message from the server
- Logical
Replication Parser - Unified logical replication parser
- Logical
Replication Stream - PostgreSQL logical replication stream
- Lsn
- LSN (Log Sequence Number) representation
- PgReplication
Connection - Safe wrapper around PostgreSQL connection for replication
- PgResult
- Safe wrapper for PostgreSQL result
- Relation
Info - Information about a relation (table)
- Replication
Connection Retry - Retry wrapper for PostgreSQL replication connection operations
- Replication
State - State for managing replication relations and tracking
- Replication
Stream Config - Configuration for the replication stream
- Retry
Config - Configuration for retry logic
- Shared
LsnFeedback - Thread-safe tracker for LSN positions used in replication feedback
- Streaming
Replication Message - Replication message with streaming context
- Tuple
Data - Tuple (row) data
Enums§
- Logical
Replication Message - Unified logical replication message enum
- Message
Type - PostgreSQL logical replication message types enum
- Replica
Identity - PostgreSQL replica identity settings
Constants§
- INVALID_
XLOG_ REC_ PTR - Invalid/zero LSN pointer
- PG_
EPOCH_ OFFSET_ SECS - Seconds from Unix epoch (1970-01-01) to PostgreSQL epoch (2000-01-01)
Functions§
- format_
lsn - Format LSN as string (e.g., “0/12345678”)
- format_
postgres_ timestamp - Convert PostgreSQL timestamp to formatted string
- parse_
lsn - Parse LSN from string format (e.g., “0/12345678”)
- postgres_
timestamp_ to_ chrono - Convert PostgreSQL timestamp (microseconds since 2000-01-01) into
chrono::DateTime<Utc>. - system_
time_ to_ postgres_ timestamp - Convert SystemTime to PostgreSQL timestamp format (microseconds since 2000-01-01)
Type Aliases§
- CdcResult
- Oid
- Object ID (32-bit)
- Timestamp
Tz - PostgreSQL Timestamp (microseconds since 2000-01-01)
- XLog
RecPtr - Write-Ahead Log Record Pointer (64-bit LSN)
- Xid
- Transaction ID (32-bit)