pg2any_lib/lib.rs
1//! # PostgreSQL CDC Library
2//!
3//! A comprehensive Change Data Capture (CDC) library for PostgreSQL using logical replication.
4//! This library allows you to stream database changes in real-time from PostgreSQL to other databases
5//! such as SQL Server, MySQL, and more.
6//!
7
8//! ## Features
9//!
10//! - PostgreSQL logical replication support
11//! - Real-time change streaming (INSERT, UPDATE, DELETE, TRUNCATE)
12//! - Multiple destination database support (SQL Server, MySQL)
13//! - Async/await support with Tokio
14//! - Comprehensive error handling
15//! - Thread-safe operations
16//! - Built-in backpressure handling
17//!
18//! ## Quick Start
19//!
20//! ```rust,ignore
21//! use pg2any_lib::{load_config_from_env, run_cdc_app};
22//! use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
23//!
24//! #[tokio::main]
25//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
26//! // Initialize comprehensive logging
27//! init_logging();
28//! tracing::info!("Starting PostgreSQL CDC Application");
29//! // Load configuration from environment variables
30//! let config = load_config_from_env()?;
31//! // Run the CDC application with graceful shutdown handling
32//! run_cdc_app(config, None).await?;
33//! tracing::info!("CDC application stopped");
34//! Ok(())
35//! }
36//!
37//! pub fn init_logging() {
38//! // Create a sophisticated logging setup
39//! let env_filter = EnvFilter::try_from_default_env()
40//! .unwrap_or_else(|_| EnvFilter::new("pg2any=debug,tokio_postgres=info,sqlx=info"));
41//!
42//! let fmt_layer = fmt::layer()
43//! .with_target(true)
44//! .with_thread_ids(true)
45//! .with_level(true)
46//! .with_ansi(true)
47//! .compact();
48//!
49//! tracing_subscriber::registry()
50//! .with(env_filter)
51//! .with(fmt_layer)
52//! .init();
53//!
54//! tracing::info!("Logging initialized with level filtering");
55//! }
56//! ```
57
58// Core modules
59pub mod app;
60pub mod config;
61pub mod env;
62pub mod error;
63
64// Destination handlers
65pub mod types;
66
67pub mod lsn_tracker;
68
69// High-level client interface
70pub mod client;
71mod consumer;
72mod producer;
73
74// Transaction file persistence
75pub mod transaction_manager;
76
77// Storage abstraction for transaction files
78pub mod storage;
79
80// Monitoring and metrics
81pub mod monitoring;
82
83// Public API exports
84pub use app::{run_cdc_app, CdcApp, CdcAppConfig};
85pub use client::CdcClient;
86pub use config::{Config, ConfigBuilder};
87pub use consumer::drain_and_shutdown;
88pub use env::load_config_from_env;
89pub use error::CdcError;
90pub use lsn_tracker::{create_lsn_tracker_with_load, LsnTracker};
91pub type CdcResult<T> = Result<T, CdcError>;
92
93pub mod destinations;
94
95pub use pg_walstream::{
96 // Type aliases and utilities
97 format_lsn,
98 // Protocol types
99 message_types,
100 parse_lsn,
101 postgres_timestamp_to_chrono,
102 system_time_to_postgres_timestamp,
103 // Buffer types
104 BufferReader,
105 BufferWriter,
106 // Cancellation token
107 CancellationToken,
108 ColumnData,
109 ColumnInfo,
110 KeepaliveMessage,
111 LogicalReplicationMessage,
112 LogicalReplicationParser,
113 LogicalReplicationStream,
114 // PostgreSQL-specific types
115 Lsn,
116 MessageType,
117 Oid,
118 RelationInfo,
119 ReplicaIdentity,
120 ReplicationState,
121 ReplicationStreamConfig,
122 StreamingReplicationMessage,
123 TimestampTz,
124 TupleData,
125 XLogRecPtr,
126 Xid,
127 INVALID_XLOG_REC_PTR,
128 PG_EPOCH_OFFSET_SECS,
129};
130
131// Re-export SharedLsnFeedback from lsn_tracker (pg2any-lib's version with log_status method)
132pub use lsn_tracker::SharedLsnFeedback;
133
134// Re-export implementations
135#[cfg(feature = "mysql")]
136pub use crate::destinations::MySQLDestination;
137
138#[cfg(feature = "sqlserver")]
139pub use crate::destinations::SqlServerDestination;
140
141#[cfg(feature = "sqlite")]
142pub use crate::destinations::SQLiteDestination;
143
144#[cfg(feature = "kafka")]
145pub use crate::destinations::KafkaDestination;
146
147pub use crate::destinations::{DestinationFactory, DestinationHandler};
148pub use crate::types::{DestinationType, Transaction};
149
150// Conditionally export metrics server functionality
151#[cfg(feature = "metrics")]
152pub use crate::monitoring::{
153 create_metrics_server, create_metrics_server_with_config, init_real_metrics, MetricsServer,
154 MetricsServerConfig,
155};
156
157// Always export metrics abstraction layer
158pub use crate::monitoring::{
159 gather_metrics, init_metrics, MetricsCollector, MetricsCollectorTrait, ProcessingTimer,
160 ProcessingTimerTrait,
161};