pg2any_lib/lib.rs
1//! # PostgreSQL CDC Library
2//!
3//! A comprehensive Change Data Capture (CDC) library for PostgreSQL using logical replication.
4//! This library allows you to stream database changes in real-time from PostgreSQL to other databases
5//! such as SQL Server, MySQL, and more.
6//!
7
8//! ## Features
9//!
10//! - PostgreSQL logical replication support
11//! - Real-time change streaming (INSERT, UPDATE, DELETE, TRUNCATE)
12//! - Multiple destination database support (SQL Server, MySQL)
13//! - Async/await support with Tokio
14//! - Comprehensive error handling
15//! - Thread-safe operations
16//! - Built-in backpressure handling
17//!
18//! ## Quick Start
19//!
20//! ```rust,ignore
21//! use pg2any_lib::{load_config_from_env, run_cdc_app};
22//! use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
23//!
24//! #[tokio::main]
25//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
26//! // Initialize comprehensive logging
27//! init_logging();
28//! tracing::info!("Starting PostgreSQL CDC Application");
29//! // Load configuration from environment variables
30//! let config = load_config_from_env()?;
31//! // Run the CDC application with graceful shutdown handling
32//! run_cdc_app(config, None).await?;
33//! tracing::info!("CDC application stopped");
34//! Ok(())
35//! }
36//!
37//! pub fn init_logging() {
38//! // Create a sophisticated logging setup
39//! let env_filter = EnvFilter::try_from_default_env()
40//! .unwrap_or_else(|_| EnvFilter::new("pg2any=debug,tokio_postgres=info,sqlx=info"));
41//!
42//! let fmt_layer = fmt::layer()
43//! .with_target(true)
44//! .with_thread_ids(true)
45//! .with_level(true)
46//! .with_ansi(true)
47//! .compact();
48//!
49//! tracing_subscriber::registry()
50//! .with(env_filter)
51//! .with(fmt_layer)
52//! .init();
53//!
54//! tracing::info!("Logging initialized with level filtering");
55//! }
56//! ```
57
58// Core modules
59pub mod app;
60pub mod config;
61pub mod env;
62pub mod error;
63
64// Destination handlers
65pub mod types;
66
67// Low-level PostgreSQL replication using libpq-sys
68pub mod pg_replication;
69
70pub mod lsn_tracker;
71
72// High-level client interface
73pub mod client;
74
75// Transaction file persistence
76mod transaction_manager;
77
78// Monitoring and metrics
79pub mod monitoring;
80
81// Public API exports
82pub use app::{run_cdc_app, CdcApp, CdcAppConfig};
83pub use client::CdcClient;
84pub use config::{Config, ConfigBuilder};
85pub use env::load_config_from_env;
86pub use error::CdcError;
87pub use lsn_tracker::{create_lsn_tracker_with_load_async, LsnTracker};
88pub use pg_replication::{PgReplicationConnection, ReplicationConnectionRetry, RetryConfig};
89pub type CdcResult<T> = Result<T, CdcError>;
90
91pub mod destinations;
92
93pub use pg_walstream::{
94 // Type aliases and utilities
95 format_lsn,
96 format_postgres_timestamp,
97 // Protocol types
98 message_types,
99 parse_lsn,
100 postgres_timestamp_to_chrono,
101 system_time_to_postgres_timestamp,
102 // Buffer types
103 BufferReader,
104 BufferWriter,
105 // Cancellation token
106 CancellationToken,
107 ColumnData,
108 ColumnInfo,
109 KeepaliveMessage,
110 LogicalReplicationMessage,
111 LogicalReplicationParser,
112 LogicalReplicationStream,
113 // PostgreSQL-specific types
114 Lsn,
115 MessageType,
116 Oid,
117 RelationInfo,
118 ReplicaIdentity,
119 ReplicationState,
120 ReplicationStreamConfig,
121 StreamingReplicationMessage,
122 TimestampTz,
123 TupleData,
124 XLogRecPtr,
125 Xid,
126 INVALID_XLOG_REC_PTR,
127 PG_EPOCH_OFFSET_SECS,
128};
129
130// Re-export PgResult from pg_replication (pg2any-lib's version with libpq)
131pub use pg_replication::PgResult;
132
133// Re-export SharedLsnFeedback from lsn_tracker (pg2any-lib's version with log_status method)
134pub use lsn_tracker::SharedLsnFeedback;
135
136// Re-export implementations
137#[cfg(feature = "mysql")]
138pub use crate::destinations::MySQLDestination;
139
140#[cfg(feature = "sqlserver")]
141pub use crate::destinations::SqlServerDestination;
142
143#[cfg(feature = "sqlite")]
144pub use crate::destinations::SQLiteDestination;
145
146pub use crate::destinations::{DestinationFactory, DestinationHandler};
147pub use crate::types::{DestinationType, Transaction};
148
149// Conditionally export metrics server functionality
150#[cfg(feature = "metrics")]
151pub use crate::monitoring::{
152 create_metrics_server, create_metrics_server_with_config, init_real_metrics, MetricsServer,
153 MetricsServerConfig,
154};
155
156// Always export metrics abstraction layer
157pub use crate::monitoring::{
158 gather_metrics, init_metrics, MetricsCollector, MetricsCollectorTrait, ProcessingTimer,
159 ProcessingTimerTrait,
160};