Expand description
§PostgreSQL Logical Replication Protocol Library
A platform-agnostic library for parsing and building PostgreSQL logical replication protocol messages. This library does not include connection management - it focuses purely on protocol parsing, allowing users to bring their own connection layer.
§Features
- Full PostgreSQL logical replication protocol support (versions 1-4)
- Streaming transaction support (protocol v2+)
- Two-phase commit support (protocol v3+)
- Parallel streaming support (protocol v4+)
- Zero-copy buffer operations using
bytescrate - Thread-safe LSN tracking
- Truly async, non-blocking I/O - Tasks properly yield to the executor
- Graceful cancellation - All operations support cancellation tokens
- No platform-specific dependencies (no libpq required)
§Async I/O Performance
The library implements proper async I/O patterns that allow tokio to efficiently schedule tasks without blocking threads:
- When waiting for data from PostgreSQL, the task is suspended and the thread is released back to the executor to run other tasks
- Uses
AsyncFdwith proper edge-triggered readiness handling - Supports concurrent processing of multiple replication streams on a single thread
- Enables efficient resource utilization in high-concurrency scenarios
§Protocol Support
This library implements the PostgreSQL logical replication protocol as documented at:
- https://www.postgresql.org/docs/current/protocol-logical-replication.html
- https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html
- https://www.postgresql.org/docs/current/protocol-replication.html
§Quick Start
ⓘ
use pg_walstream::{
LogicalReplicationStream, ReplicationStreamConfig, RetryConfig,
SharedLsnFeedback, CancellationToken,
};
use std::sync::Arc;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = ReplicationStreamConfig::new(
"my_slot".to_string(),
"my_publication".to_string(),
2, true,
Duration::from_secs(10),
Duration::from_secs(30),
Duration::from_secs(60),
RetryConfig::default(),
);
let mut stream = LogicalReplicationStream::new(
"postgresql://postgres:password@localhost:5432/mydb?replication=database",
config,
).await?;
stream.start(None).await?;
let cancel_token = CancellationToken::new();
// Traditional polling loop with automatic retry
loop {
match stream.next_event_with_retry(&cancel_token).await {
Ok(event) => {
println!("Received event: {:?}", event);
stream.shared_lsn_feedback.update_applied_lsn(event.lsn.value());
}
Err(e) if matches!(e, pg_walstream::ReplicationError::Cancelled(_)) => {
println!("Cancelled, shutting down gracefully");
break;
}
Err(e) => {
eprintln!("Error: {}", e);
break;
}
}
}
Ok(())
}Re-exports§
pub use buffer::BufferReader;pub use buffer::BufferWriter;pub use error::ReplicationError;pub use error::Result;pub use types::format_lsn;pub use types::format_postgres_timestamp;pub use types::parse_lsn;pub use types::postgres_timestamp_to_chrono;pub use types::system_time_to_postgres_timestamp;pub use types::ChangeEvent;pub use types::EventType;pub use types::Lsn;pub use types::Oid;pub use types::ReplicaIdentity;pub use types::TimestampTz;pub use types::XLogRecPtr;pub use types::Xid;pub use types::INVALID_XLOG_REC_PTR;pub use types::PG_EPOCH_OFFSET_SECS;pub use protocol::message_types;pub use protocol::parse_keepalive_message;pub use protocol::ColumnData;pub use protocol::ColumnInfo;pub use protocol::KeepaliveMessage;pub use protocol::LogicalReplicationMessage;pub use protocol::LogicalReplicationParser;pub use protocol::MessageType;pub use protocol::RelationInfo;pub use protocol::ReplicationState;pub use protocol::StreamingReplicationMessage;pub use protocol::TupleData;pub use stream::EventStream;pub use stream::EventStreamRef;pub use stream::LogicalReplicationStream;pub use stream::ReplicationStreamConfig;pub use connection::PgReplicationConnection;pub use connection::PgResult;pub use retry::ExponentialBackoff;pub use retry::ReplicationConnectionRetry;pub use retry::RetryConfig;
Modules§
- buffer
- Buffer utilities for reading and writing replication protocol messages
- connection
- Low-level PostgreSQL connection using libpq-sys
- error
- Error types for PostgreSQL logical replication operations
- lsn
- Thread-safe LSN tracking for CDC replication feedback
- protocol
- Unified PostgreSQL logical replication protocol implementation
- retry
- Connection retry logic with exponential backoff
- stream
- PostgreSQL logical replication stream management
- types
- PostgreSQL type aliases and utility functions
Structs§
- Cancellation
Token - A token which can be used to signal a cancellation request to one or more tasks.