Skip to main content

Crate pg_walstream

Crate pg_walstream 

Source
Expand description

§PostgreSQL Logical Replication Protocol Library

A platform-agnostic library for parsing and streaming PostgreSQL logical replication protocol messages. The protocol parser is reusable on its own, and the crate also includes a libpq-based connection layer for replication streaming.

§Features

  • Full PostgreSQL logical replication protocol support (versions 1-4)
  • Streaming transaction support (protocol v2+)
  • Two-phase commit support (protocol v3+)
  • Parallel streaming support (protocol v4+)
  • Zero-copy buffer operations using bytes crate
  • Thread-safe LSN tracking
  • Truly async, non-blocking I/O - Tasks properly yield to the executor
  • futures::Stream trait implementation - Works with all stream combinators
  • Graceful cancellation - All operations support cancellation tokens
  • Protocol parsing is portable; the connection module uses libpq

§Async Stream API

The EventStream type implements both:

  • A native .next_event().await API for simple usage without trait imports
  • The futures_core::Stream trait for use with stream combinators
use futures::StreamExt;

let mut event_stream = stream.into_stream(cancel_token);

// Use as a futures::Stream with combinators
while let Some(result) = event_stream.next().await {
    let event = result?;
    println!("Event: {:?}", event);
    event_stream.update_applied_lsn(event.lsn.value());
}

§Async I/O Performance

The library implements proper async I/O patterns that allow tokio to efficiently schedule tasks without blocking threads:

  • When waiting for data from PostgreSQL, the task is suspended and the thread is released back to the executor to run other tasks
  • Uses AsyncFd with proper edge-triggered readiness handling
  • Zero-copy Bytes throughout the WAL data path
  • Supports concurrent processing of multiple replication streams on a single thread
  • Enables efficient resource utilization in high-concurrency scenarios

§Protocol Support

This library implements the PostgreSQL logical replication protocol as documented at:

§Quick Start

use pg_walstream::{
    LogicalReplicationStream, ReplicationStreamConfig, RetryConfig,
    SharedLsnFeedback, CancellationToken,
};
use std::sync::Arc;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = ReplicationStreamConfig::new(
        "my_slot".to_string(),
        "my_publication".to_string(),
        2,
        StreamingMode::On,
        Duration::from_secs(10),
        Duration::from_secs(30),
        Duration::from_secs(60),
        RetryConfig::default(),
    );

    let mut stream = LogicalReplicationStream::new(
        "postgresql://postgres:password@localhost:5432/mydb?replication=database",
        config,
    ).await?;

    // Optional: create the slot first to use the exported snapshot
    // for an initial consistent table read (before start() destroys it).
    // stream.ensure_replication_slot().await?;
    // if let Some(snap) = stream.exported_snapshot_name() { /* read snapshot */ }

    stream.start(None).await?;

    let cancel_token = CancellationToken::new();
    let mut event_stream = stream.into_stream(cancel_token.clone());

    // Option 1: Use as futures::Stream
    // use futures::StreamExt;
    // while let Some(result) = event_stream.next().await { ... }

    // Option 2: Use native API
    loop {
        match event_stream.next_event().await {
            Ok(event) => {
                println!("Received event: {:?}", event);
                event_stream.update_applied_lsn(event.lsn.value());
            }
            Err(e) if matches!(e, pg_walstream::ReplicationError::Cancelled(_)) => {
                println!("Cancelled, shutting down gracefully");
                break;
            }
            Err(e) => {
                eprintln!("Error: {}", e);
                break;
            }
        }
    }

    // Graceful shutdown
    event_stream.shutdown().await?;

    Ok(())
}

Re-exports§

pub use buffer::BufferReader;
pub use buffer::BufferWriter;
pub use error::ReplicationError;
pub use error::Result;
pub use lsn::SharedLsnFeedback;
pub use column_value::ColumnValue;
pub use column_value::RowData;
pub use deserializer::FieldError;
pub use deserializer::RowDataDeserializer;
pub use deserializer::TryDeserializeResult;
pub use types::format_lsn;
pub use types::parse_lsn;
pub use types::postgres_timestamp_to_chrono;
pub use types::system_time_to_postgres_timestamp;
pub use types::BaseBackupOptions;
pub use types::ChangeEvent;
pub use types::EventType;
pub use types::Lsn;
pub use types::Oid;
pub use types::RelationColumn;
pub use types::ReplicaIdentity;
pub use types::ReplicationSlotInfo;
pub use types::ReplicationSlotOptions;
pub use types::SlotType;
pub use types::TimestampTz;
pub use types::XLogRecPtr;
pub use types::Xid;
pub use types::INVALID_XLOG_REC_PTR;
pub use types::PG_EPOCH_OFFSET_SECS;
pub use protocol::message_types;
pub use protocol::parse_keepalive_message;
pub use protocol::ColumnData;
pub use protocol::ColumnInfo;
pub use protocol::KeepaliveMessage;
pub use protocol::LogicalReplicationMessage;
pub use protocol::LogicalReplicationParser;
pub use protocol::MessageType;
pub use protocol::RelationInfo;
pub use protocol::ReplicationState;
pub use protocol::StreamingReplicationMessage;
pub use protocol::TupleData;
pub use stream::EventStream;
pub use stream::EventStreamRef;
pub use stream::LogicalReplicationStream;
pub use stream::OriginFilter;
pub use stream::ReplicationStreamConfig;
pub use stream::StreamingMode;
pub use connection::PgReplicationConnection;
pub use connection::PgResult;
pub use retry::ExponentialBackoff;
pub use retry::ReplicationConnectionRetry;
pub use retry::RetryConfig;
pub use sql_builder::build_create_subscription_sql;
pub use sql_builder::build_detach_slot_sql;
pub use sql_builder::build_disable_subscription_sql;
pub use sql_builder::build_drop_subscription_sql;
pub use sql_builder::quote_ident;
pub use sql_builder::quote_literal;
pub use sql_builder::CreateSubscriptionOptions;

Modules§

buffer
Buffer utilities for reading and writing replication protocol messages
column_value
Column value types for PostgreSQL logical replication
connection
PostgreSQL connection backends.
deserializer
Custom serde Deserializer for converting RowData into user-defined types.
error
Error types for PostgreSQL logical replication operations
lsn
Thread-safe LSN tracking for CDC replication feedback
protocol
Unified PostgreSQL logical replication protocol implementation
retry
Connection retry logic with exponential backoff
sql_builder
SQL builder utilities for PostgreSQL replication management.
stream
PostgreSQL logical replication stream management
types
PostgreSQL type aliases and utility functions

Structs§

CancellationToken
A token which can be used to signal a cancellation request to one or more tasks.