Crate pg_walstream

Crate pg_walstream 

Source
Expand description

§PostgreSQL Logical Replication Protocol Library

A platform-agnostic library for parsing and building PostgreSQL logical replication protocol messages. This library does not include connection management - it focuses purely on protocol parsing, allowing users to bring their own connection layer.

§Features

  • Full PostgreSQL logical replication protocol support (versions 1-4)
  • Streaming transaction support (protocol v2+)
  • Two-phase commit support (protocol v3+)
  • Parallel streaming support (protocol v4+)
  • Zero-copy buffer operations using bytes crate
  • Thread-safe LSN tracking
  • Truly async, non-blocking I/O - Tasks properly yield to the executor
  • Graceful cancellation - All operations support cancellation tokens
  • No platform-specific dependencies (no libpq required)

§Async I/O Performance

The library implements proper async I/O patterns that allow tokio to efficiently schedule tasks without blocking threads:

  • When waiting for data from PostgreSQL, the task is suspended and the thread is released back to the executor to run other tasks
  • Uses AsyncFd with proper edge-triggered readiness handling
  • Supports concurrent processing of multiple replication streams on a single thread
  • Enables efficient resource utilization in high-concurrency scenarios

§Protocol Support

This library implements the PostgreSQL logical replication protocol as documented at:

§Quick Start

use pg_walstream::{
    LogicalReplicationStream, ReplicationStreamConfig, RetryConfig,
    SharedLsnFeedback, CancellationToken,
};
use std::sync::Arc;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = ReplicationStreamConfig::new(
        "my_slot".to_string(),
        "my_publication".to_string(),
        2, true,
        Duration::from_secs(10),
        Duration::from_secs(30),
        Duration::from_secs(60),
        RetryConfig::default(),
    );

    let mut stream = LogicalReplicationStream::new(
        "postgresql://postgres:password@localhost:5432/mydb?replication=database",
        config,
    ).await?;
     
    stream.start(None).await?;

    let cancel_token = CancellationToken::new();

    // Traditional polling loop with automatic retry
    loop {
        match stream.next_event_with_retry(&cancel_token).await {
            Ok(event) => {
                println!("Received event: {:?}", event);
                stream.shared_lsn_feedback.update_applied_lsn(event.lsn.value());
            }
            Err(e) if matches!(e, pg_walstream::ReplicationError::Cancelled(_)) => {
                println!("Cancelled, shutting down gracefully");
                break;
            }
            Err(e) => {
                eprintln!("Error: {}", e);
                break;
            }
        }
    }
     
    Ok(())
}

Re-exports§

pub use buffer::BufferReader;
pub use buffer::BufferWriter;
pub use error::ReplicationError;
pub use error::Result;
pub use lsn::SharedLsnFeedback;
pub use types::format_lsn;
pub use types::format_postgres_timestamp;
pub use types::parse_lsn;
pub use types::postgres_timestamp_to_chrono;
pub use types::system_time_to_postgres_timestamp;
pub use types::ChangeEvent;
pub use types::EventType;
pub use types::Lsn;
pub use types::Oid;
pub use types::ReplicaIdentity;
pub use types::TimestampTz;
pub use types::XLogRecPtr;
pub use types::Xid;
pub use types::INVALID_XLOG_REC_PTR;
pub use types::PG_EPOCH_OFFSET_SECS;
pub use protocol::message_types;
pub use protocol::parse_keepalive_message;
pub use protocol::ColumnData;
pub use protocol::ColumnInfo;
pub use protocol::KeepaliveMessage;
pub use protocol::LogicalReplicationMessage;
pub use protocol::LogicalReplicationParser;
pub use protocol::MessageType;
pub use protocol::RelationInfo;
pub use protocol::ReplicationState;
pub use protocol::StreamingReplicationMessage;
pub use protocol::TupleData;
pub use stream::EventStream;
pub use stream::EventStreamRef;
pub use stream::LogicalReplicationStream;
pub use stream::ReplicationStreamConfig;
pub use connection::PgReplicationConnection;
pub use connection::PgResult;
pub use retry::ExponentialBackoff;
pub use retry::ReplicationConnectionRetry;
pub use retry::RetryConfig;

Modules§

buffer
Buffer utilities for reading and writing replication protocol messages
connection
Low-level PostgreSQL connection using libpq-sys
error
Error types for PostgreSQL logical replication operations
lsn
Thread-safe LSN tracking for CDC replication feedback
protocol
Unified PostgreSQL logical replication protocol implementation
retry
Connection retry logic with exponential backoff
stream
PostgreSQL logical replication stream management
types
PostgreSQL type aliases and utility functions

Structs§

CancellationToken
A token which can be used to signal a cancellation request to one or more tasks.