Skip to main content

Crate pg_walstream

Crate pg_walstream 

Source
Expand description

§PostgreSQL Logical Replication Protocol Library

A platform-agnostic library for parsing and streaming PostgreSQL logical replication protocol messages. The protocol parser is reusable on its own, and the crate also includes a libpq-based connection layer for replication streaming.

§Features

  • Full PostgreSQL logical replication protocol support (versions 1-4)
  • Streaming transaction support (protocol v2+)
  • Two-phase commit support (protocol v3+)
  • Parallel streaming support (protocol v4+)
  • Zero-copy buffer operations using bytes crate
  • Thread-safe LSN tracking
  • Truly async, non-blocking I/O - Tasks properly yield to the executor
  • futures::Stream trait implementation - Works with all stream combinators
  • Graceful cancellation - All operations support cancellation tokens
  • Protocol parsing is portable; the connection module uses libpq

§Async Stream API

The EventStream type implements both:

  • A native .next_event().await API for simple usage without trait imports
  • The futures_core::Stream trait for use with stream combinators
use futures::StreamExt;

let mut event_stream = stream.into_stream(cancel_token);

// Use as a futures::Stream with combinators
while let Some(result) = event_stream.next().await {
    let event = result?;
    println!("Event: {:?}", event);
    event_stream.update_applied_lsn(event.lsn.value());
}

§Async I/O Performance

The library implements proper async I/O patterns that allow tokio to efficiently schedule tasks without blocking threads:

  • When waiting for data from PostgreSQL, the task is suspended and the thread is released back to the executor to run other tasks
  • Uses AsyncFd with proper edge-triggered readiness handling
  • Zero-copy Bytes throughout the WAL data path
  • Supports concurrent processing of multiple replication streams on a single thread
  • Enables efficient resource utilization in high-concurrency scenarios

§Protocol Support

This library implements the PostgreSQL logical replication protocol as documented at:

§Quick Start

use pg_walstream::{
    LogicalReplicationStream, ReplicationStreamConfig, RetryConfig,
    SharedLsnFeedback, CancellationToken,
};
use std::sync::Arc;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = ReplicationStreamConfig::new(
        "my_slot".to_string(),
        "my_publication".to_string(),
        2,
        StreamingMode::On,
        Duration::from_secs(10),
        Duration::from_secs(30),
        Duration::from_secs(60),
        RetryConfig::default(),
    );

    let mut stream = LogicalReplicationStream::new(
        "postgresql://postgres:password@localhost:5432/mydb?replication=database",
        config,
    ).await?;

    // Optional: create the slot first to use the exported snapshot
    // for an initial consistent table read (before start() destroys it).
    // stream.ensure_replication_slot().await?;
    // if let Some(snap) = stream.exported_snapshot_name() { /* read snapshot */ }

    stream.start(None).await?;

    let cancel_token = CancellationToken::new();
    let mut event_stream = stream.into_stream(cancel_token.clone());

    // Option 1: Use as futures::Stream
    // use futures::StreamExt;
    // while let Some(result) = event_stream.next().await { ... }

    // Option 2: Use native API
    loop {
        match event_stream.next_event().await {
            Ok(event) => {
                println!("Received event: {:?}", event);
                event_stream.update_applied_lsn(event.lsn.value());
            }
            Err(e) if matches!(e, pg_walstream::ReplicationError::Cancelled(_)) => {
                println!("Cancelled, shutting down gracefully");
                break;
            }
            Err(e) => {
                eprintln!("Error: {}", e);
                break;
            }
        }
    }

    // Graceful shutdown
    event_stream.shutdown().await?;

    Ok(())
}

Re-exports§

pub use buffer::BufferReader;
pub use buffer::BufferWriter;
pub use error::ReplicationError;
pub use error::Result;
pub use lsn::SharedLsnFeedback;
pub use column_value::ColumnValue;
pub use column_value::RowData;
pub use types::format_lsn;
pub use types::parse_lsn;
pub use types::postgres_timestamp_to_chrono;
pub use types::system_time_to_postgres_timestamp;
pub use types::BaseBackupOptions;
pub use types::ChangeEvent;
pub use types::EventType;
pub use types::Lsn;
pub use types::Oid;
pub use types::RelationColumn;
pub use types::ReplicaIdentity;
pub use types::ReplicationSlotInfo;
pub use types::ReplicationSlotOptions;
pub use types::SlotType;
pub use types::TimestampTz;
pub use types::XLogRecPtr;
pub use types::Xid;
pub use types::INVALID_XLOG_REC_PTR;
pub use types::PG_EPOCH_OFFSET_SECS;
pub use protocol::message_types;
pub use protocol::parse_keepalive_message;
pub use protocol::ColumnData;
pub use protocol::ColumnInfo;
pub use protocol::KeepaliveMessage;
pub use protocol::LogicalReplicationMessage;
pub use protocol::LogicalReplicationParser;
pub use protocol::MessageType;
pub use protocol::RelationInfo;
pub use protocol::ReplicationState;
pub use protocol::StreamingReplicationMessage;
pub use protocol::TupleData;
pub use stream::EventStream;
pub use stream::EventStreamRef;
pub use stream::LogicalReplicationStream;
pub use stream::OriginFilter;
pub use stream::ReplicationStreamConfig;
pub use stream::StreamingMode;
pub use connection::PgReplicationConnection;
pub use connection::PgResult;
pub use retry::ExponentialBackoff;
pub use retry::ReplicationConnectionRetry;
pub use retry::RetryConfig;

Modules§

buffer
Buffer utilities for reading and writing replication protocol messages
column_value
Column value types for PostgreSQL logical replication
connection
PostgreSQL connection backends.
error
Error types for PostgreSQL logical replication operations
lsn
Thread-safe LSN tracking for CDC replication feedback
protocol
Unified PostgreSQL logical replication protocol implementation
retry
Connection retry logic with exponential backoff
stream
PostgreSQL logical replication stream management
types
PostgreSQL type aliases and utility functions

Structs§

CancellationToken
A token which can be used to signal a cancellation request to one or more tasks.