Crate pg2any_lib

Crate pg2any_lib 

Source
Expand description

§PostgreSQL CDC Library

A comprehensive Change Data Capture (CDC) library for PostgreSQL using logical replication. This library allows you to stream database changes in real-time from PostgreSQL to other databases such as SQL Server, MySQL, and more.

§Features

  • PostgreSQL logical replication support
  • Real-time change streaming (INSERT, UPDATE, DELETE, TRUNCATE)
  • Multiple destination database support (SQL Server, MySQL)
  • Async/await support with Tokio
  • Comprehensive error handling
  • Thread-safe operations
  • Built-in backpressure handling

§Quick Start

use pg2any_lib::{load_config_from_env, run_cdc_app};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

#[tokio::main]
    async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Initialize comprehensive logging
    init_logging();
    tracing::info!("Starting PostgreSQL CDC Application");
    // Load configuration from environment variables
    let config = load_config_from_env()?;
    // Run the CDC application with graceful shutdown handling
    run_cdc_app(config, None).await?;
    tracing::info!("CDC application stopped");
    Ok(())
}

pub fn init_logging() {
    // Create a sophisticated logging setup
    let env_filter = EnvFilter::try_from_default_env()
        .unwrap_or_else(|_| EnvFilter::new("pg2any=debug,tokio_postgres=info,sqlx=info"));

    let fmt_layer = fmt::layer()
        .with_target(true)
        .with_thread_ids(true)
        .with_level(true)
        .with_ansi(true)
        .compact();

    tracing_subscriber::registry()
        .with(env_filter)
        .with(fmt_layer)
        .init();

    tracing::info!("Logging initialized with level filtering");
}

Re-exports§

pub use app::run_cdc_app;
pub use app::CdcApp;
pub use app::CdcAppConfig;
pub use client::CdcClient;
pub use config::Config;
pub use config::ConfigBuilder;
pub use env::load_config_from_env;
pub use error::CdcError;
pub use lsn_tracker::create_lsn_tracker_with_load_async;
pub use lsn_tracker::LsnTracker;
pub use crate::destinations::MySQLDestination;
pub use crate::destinations::SqlServerDestination;
pub use crate::destinations::SQLiteDestination;
pub use crate::destinations::DestinationFactory;
pub use crate::destinations::DestinationHandler;
pub use crate::types::DestinationType;
pub use crate::types::Transaction;
pub use crate::monitoring::gather_metrics;
pub use crate::monitoring::init_metrics;
pub use crate::monitoring::MetricsCollector;
pub use crate::monitoring::MetricsCollectorTrait;
pub use crate::monitoring::ProcessingTimer;
pub use crate::monitoring::ProcessingTimerTrait;

Modules§

app
CDC Application Runner
client
config
destinations
env
Environment variable loading and configuration utilities
error
lsn_tracker
Thread-safe LSN tracking for CDC replication
message_types
Message type constants for logical replication protocol
monitoring
Monitoring and Metrics Module
pg_replication
High-level PostgreSQL replication wrappers
types

Structs§

BufferReader
Buffer reader for parsing binary protocol messages
BufferWriter
Buffer writer for creating binary protocol messages
CancellationToken
A token which can be used to signal a cancellation request to one or more tasks.
ColumnData
Data for a single column
ColumnInfo
Column information in a relation
KeepaliveMessage
Keepalive message from the server
LogicalReplicationParser
Unified logical replication parser
LogicalReplicationStream
PostgreSQL logical replication stream
Lsn
LSN (Log Sequence Number) representation
PgReplicationConnection
Safe wrapper around PostgreSQL connection for replication
PgResult
Safe wrapper for PostgreSQL result
RelationInfo
Information about a relation (table)
ReplicationConnectionRetry
Retry wrapper for PostgreSQL replication connection operations
ReplicationState
State for managing replication relations and tracking
ReplicationStreamConfig
Configuration for the replication stream
RetryConfig
Configuration for retry logic
SharedLsnFeedback
Thread-safe tracker for LSN positions used in replication feedback
StreamingReplicationMessage
Replication message with streaming context
TupleData
Tuple (row) data

Enums§

LogicalReplicationMessage
Unified logical replication message enum
MessageType
PostgreSQL logical replication message types enum
ReplicaIdentity
PostgreSQL replica identity settings

Constants§

INVALID_XLOG_REC_PTR
Invalid/zero LSN pointer
PG_EPOCH_OFFSET_SECS
Seconds from Unix epoch (1970-01-01) to PostgreSQL epoch (2000-01-01)

Functions§

format_lsn
Format LSN as string (e.g., “0/12345678”)
format_postgres_timestamp
Convert PostgreSQL timestamp to formatted string
parse_lsn
Parse LSN from string format (e.g., “0/12345678”)
postgres_timestamp_to_chrono
Convert PostgreSQL timestamp (microseconds since 2000-01-01) into chrono::DateTime<Utc>.
system_time_to_postgres_timestamp
Convert SystemTime to PostgreSQL timestamp format (microseconds since 2000-01-01)

Type Aliases§

CdcResult
Oid
Object ID (32-bit)
TimestampTz
PostgreSQL Timestamp (microseconds since 2000-01-01)
XLogRecPtr
Write-Ahead Log Record Pointer (64-bit LSN)
Xid
Transaction ID (32-bit)