Skip to main content

pg2any_lib/
lib.rs

1//! # PostgreSQL CDC Library
2//!
3//! A comprehensive Change Data Capture (CDC) library for PostgreSQL using logical replication.
4//! This library allows you to stream database changes in real-time from PostgreSQL to other databases
5//! such as SQL Server, MySQL, and more.
6//!
7
8//! ## Features
9//!
10//! - PostgreSQL logical replication support
11//! - Real-time change streaming (INSERT, UPDATE, DELETE, TRUNCATE)
12//! - Multiple destination database support (SQL Server, MySQL)
13//! - Async/await support with Tokio
14//! - Comprehensive error handling
15//! - Thread-safe operations
16//! - Built-in backpressure handling
17//!
18//! ## Quick Start
19//!
20//! ```rust,ignore
21//! use pg2any_lib::{load_config_from_env, run_cdc_app};
22//! use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
23//!
24//! #[tokio::main]
25//!     async fn main() -> Result<(), Box<dyn std::error::Error>> {
26//!     // Initialize comprehensive logging
27//!     init_logging();
28//!     tracing::info!("Starting PostgreSQL CDC Application");
29//!     // Load configuration from environment variables
30//!     let config = load_config_from_env()?;
31//!     // Run the CDC application with graceful shutdown handling
32//!     run_cdc_app(config, None).await?;
33//!     tracing::info!("CDC application stopped");
34//!     Ok(())
35//! }
36//!
37//! pub fn init_logging() {
38//!     // Create a sophisticated logging setup
39//!     let env_filter = EnvFilter::try_from_default_env()
40//!         .unwrap_or_else(|_| EnvFilter::new("pg2any=debug,tokio_postgres=info,sqlx=info"));
41//!
42//!     let fmt_layer = fmt::layer()
43//!         .with_target(true)
44//!         .with_thread_ids(true)
45//!         .with_level(true)
46//!         .with_ansi(true)
47//!         .compact();
48//!
49//!     tracing_subscriber::registry()
50//!         .with(env_filter)
51//!         .with(fmt_layer)
52//!         .init();
53//!
54//!     tracing::info!("Logging initialized with level filtering");
55//! }
56//! ```
57
58// Core modules
59pub mod app;
60pub mod config;
61pub mod env;
62pub mod error;
63
64// Destination handlers
65pub mod types;
66
67pub mod lsn_tracker;
68
69// High-level client interface
70pub mod client;
71mod consumer;
72mod producer;
73
74// Transaction file persistence
75pub mod transaction_manager;
76
77// Storage abstraction for transaction files
78pub mod storage;
79
80// Monitoring and metrics
81pub mod monitoring;
82
83// Public API exports
84pub use app::{run_cdc_app, CdcApp, CdcAppConfig};
85pub use client::CdcClient;
86pub use config::{Config, ConfigBuilder};
87pub use consumer::drain_and_shutdown;
88pub use env::load_config_from_env;
89pub use error::CdcError;
90pub use lsn_tracker::{create_lsn_tracker_with_load, LsnTracker};
91pub type CdcResult<T> = Result<T, CdcError>;
92
93pub mod destinations;
94
95pub use pg_walstream::{
96    // Type aliases and utilities
97    format_lsn,
98    // Protocol types
99    message_types,
100    parse_lsn,
101    postgres_timestamp_to_chrono,
102    system_time_to_postgres_timestamp,
103    // Buffer types
104    BufferReader,
105    BufferWriter,
106    // Cancellation token
107    CancellationToken,
108    ColumnData,
109    ColumnInfo,
110    KeepaliveMessage,
111    LogicalReplicationMessage,
112    LogicalReplicationParser,
113    LogicalReplicationStream,
114    // PostgreSQL-specific types
115    Lsn,
116    MessageType,
117    Oid,
118    RelationInfo,
119    ReplicaIdentity,
120    ReplicationState,
121    ReplicationStreamConfig,
122    StreamingReplicationMessage,
123    TimestampTz,
124    TupleData,
125    XLogRecPtr,
126    Xid,
127    INVALID_XLOG_REC_PTR,
128    PG_EPOCH_OFFSET_SECS,
129};
130
131// Re-export SharedLsnFeedback from lsn_tracker (pg2any-lib's version with log_status method)
132pub use lsn_tracker::SharedLsnFeedback;
133
134// Re-export implementations
135#[cfg(feature = "mysql")]
136pub use crate::destinations::MySQLDestination;
137
138#[cfg(feature = "sqlserver")]
139pub use crate::destinations::SqlServerDestination;
140
141#[cfg(feature = "sqlite")]
142pub use crate::destinations::SQLiteDestination;
143
144#[cfg(feature = "kafka")]
145pub use crate::destinations::KafkaDestination;
146
147pub use crate::destinations::{DestinationFactory, DestinationHandler};
148pub use crate::types::{DestinationType, Transaction};
149
150// Conditionally export metrics server functionality
151#[cfg(feature = "metrics")]
152pub use crate::monitoring::{
153    create_metrics_server, create_metrics_server_with_config, init_real_metrics, MetricsServer,
154    MetricsServerConfig,
155};
156
157// Always export metrics abstraction layer
158pub use crate::monitoring::{
159    gather_metrics, init_metrics, MetricsCollector, MetricsCollectorTrait, ProcessingTimer,
160    ProcessingTimerTrait,
161};