pg2any_lib/
lib.rs

1//! # PostgreSQL CDC Library
2//!
3//! A comprehensive Change Data Capture (CDC) library for PostgreSQL using logical replication.
4//! This library allows you to stream database changes in real-time from PostgreSQL to other databases
5//! such as SQL Server, MySQL, and more.
6//!
7
8//! ## Features
9//!
10//! - PostgreSQL logical replication support
11//! - Real-time change streaming (INSERT, UPDATE, DELETE, TRUNCATE)
12//! - Multiple destination database support (SQL Server, MySQL)
13//! - Async/await support with Tokio
14//! - Comprehensive error handling
15//! - Thread-safe operations
16//! - Built-in backpressure handling
17//!
18//! ## Quick Start
19//!
20//! ```rust,ignore
21//! use pg2any_lib::{load_config_from_env, run_cdc_app};
22//! use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
23//!
24//! #[tokio::main]
25//!     async fn main() -> Result<(), Box<dyn std::error::Error>> {
26//!     // Initialize comprehensive logging
27//!     init_logging();
28//!     tracing::info!("Starting PostgreSQL CDC Application");
29//!     // Load configuration from environment variables
30//!     let config = load_config_from_env()?;
31//!     // Run the CDC application with graceful shutdown handling
32//!     run_cdc_app(config, None).await?;
33//!     tracing::info!("CDC application stopped");
34//!     Ok(())
35//! }
36//!
37//! pub fn init_logging() {
38//!     // Create a sophisticated logging setup
39//!     let env_filter = EnvFilter::try_from_default_env()
40//!         .unwrap_or_else(|_| EnvFilter::new("pg2any=debug,tokio_postgres=info,sqlx=info"));
41//!
42//!     let fmt_layer = fmt::layer()
43//!         .with_target(true)
44//!         .with_thread_ids(true)
45//!         .with_level(true)
46//!         .with_ansi(true)
47//!         .compact();
48//!
49//!     tracing_subscriber::registry()
50//!         .with(env_filter)
51//!         .with(fmt_layer)
52//!         .init();
53//!
54//!     tracing::info!("Logging initialized with level filtering");
55//! }
56//! ```
57
58// Core modules
59pub mod app;
60pub mod config;
61pub mod env;
62pub mod error;
63
64// Destination handlers
65pub mod types;
66
67// Low-level PostgreSQL replication using libpq-sys
68pub mod pg_replication;
69
70pub mod lsn_tracker;
71
72// High-level client interface
73pub mod client;
74
75// Transaction file persistence
76mod transaction_manager;
77
78// Monitoring and metrics
79pub mod monitoring;
80
81// Public API exports
82pub use app::{run_cdc_app, CdcApp, CdcAppConfig};
83pub use client::CdcClient;
84pub use config::{Config, ConfigBuilder};
85pub use env::load_config_from_env;
86pub use error::CdcError;
87pub use lsn_tracker::{create_lsn_tracker_with_load_async, LsnTracker};
88pub use pg_replication::{PgReplicationConnection, ReplicationConnectionRetry, RetryConfig};
89pub type CdcResult<T> = Result<T, CdcError>;
90
91pub mod destinations;
92
93pub use pg_walstream::{
94    // Type aliases and utilities
95    format_lsn,
96    format_postgres_timestamp,
97    // Protocol types
98    message_types,
99    parse_lsn,
100    postgres_timestamp_to_chrono,
101    system_time_to_postgres_timestamp,
102    // Buffer types
103    BufferReader,
104    BufferWriter,
105    // Cancellation token
106    CancellationToken,
107    ColumnData,
108    ColumnInfo,
109    KeepaliveMessage,
110    LogicalReplicationMessage,
111    LogicalReplicationParser,
112    LogicalReplicationStream,
113    // PostgreSQL-specific types
114    Lsn,
115    MessageType,
116    Oid,
117    RelationInfo,
118    ReplicaIdentity,
119    ReplicationState,
120    ReplicationStreamConfig,
121    StreamingReplicationMessage,
122    TimestampTz,
123    TupleData,
124    XLogRecPtr,
125    Xid,
126    INVALID_XLOG_REC_PTR,
127    PG_EPOCH_OFFSET_SECS,
128};
129
130// Re-export PgResult from pg_replication (pg2any-lib's version with libpq)
131pub use pg_replication::PgResult;
132
133// Re-export SharedLsnFeedback from lsn_tracker (pg2any-lib's version with log_status method)
134pub use lsn_tracker::SharedLsnFeedback;
135
136// Re-export implementations
137#[cfg(feature = "mysql")]
138pub use crate::destinations::MySQLDestination;
139
140#[cfg(feature = "sqlserver")]
141pub use crate::destinations::SqlServerDestination;
142
143#[cfg(feature = "sqlite")]
144pub use crate::destinations::SQLiteDestination;
145
146pub use crate::destinations::{DestinationFactory, DestinationHandler};
147pub use crate::types::{DestinationType, Transaction};
148
149// Conditionally export metrics server functionality
150#[cfg(feature = "metrics")]
151pub use crate::monitoring::{
152    create_metrics_server, create_metrics_server_with_config, init_real_metrics, MetricsServer,
153    MetricsServerConfig,
154};
155
156// Always export metrics abstraction layer
157pub use crate::monitoring::{
158    gather_metrics, init_metrics, MetricsCollector, MetricsCollectorTrait, ProcessingTimer,
159    ProcessingTimerTrait,
160};