pg_walstream 0.0.1

PostgreSQL logical replication protocol library - parse and handle PostgreSQL WAL streaming messages
Documentation

pg-walstream

A high-performance Rust library for PostgreSQL logical replication protocol parsing and streaming. This library provides a robust, type-safe interface for consuming PostgreSQL Write-Ahead Log (WAL) streams.

Features

  • Full Protocol Support: Implements PostgreSQL logical replication protocol versions 1-4
  • Streaming Transactions: Support for streaming large transactions (protocol v2+)
  • Two-Phase Commit: Prepared transaction support (protocol v3+)
  • Parallel Streaming: Multi-stream parallel replication (protocol v4+)
  • Zero-Copy Operations: Efficient buffer management using the bytes crate
  • Thread-Safe LSN Tracking: Atomic LSN feedback for producer-consumer patterns
  • Connection Management: Built-in connection handling with exponential backoff retry logic
  • Type-Safe API: Strongly typed message parsing with comprehensive error handling

Installation

Add this to your Cargo.toml:

[dependencies]
pg_walstream = "0.1.0"

System Dependencies

Make sure you have libpq development libraries installed:

Ubuntu/Debian:

sudo apt-get install libpq-dev \
    clang \
    libclang-dev 

CentOS/RHEL/Fedora:

sudo yum install postgresql-devel
# or
sudo dnf install postgresql-devel

Quick Start

Complete Replication Stream

use pg_walstream::{
    LogicalReplicationStream, ReplicationStreamConfig, RetryConfig,
    SharedLsnFeedback, CancellationToken,
};
use std::sync::Arc;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Configure the replication stream
    let config = ReplicationStreamConfig::new(
        "my_slot".to_string(),           // Replication slot name
        "my_publication".to_string(),     // Publication name
        2,                                // Protocol version
        true,                             // Enable streaming
        Duration::from_secs(10),          // Feedback interval
        Duration::from_secs(30),          // Connection timeout
        Duration::from_secs(60),          // Health check interval
        RetryConfig::default(),           // Retry configuration
    );

    // Create connection string
    let connection_string = "postgresql://postgres:test.123@postgres:5432/postgres?replication=database";

    // Create and initialize the stream
    let mut stream = LogicalReplicationStream::new(connection_string, config).await?;
    
    // Set up LSN feedback for tracking progress
    let lsn_feedback = SharedLsnFeedback::new_shared();
    stream.set_shared_lsn_feedback(lsn_feedback.clone());
    
    // Initialize the stream (creates slot if needed) && Start replication from a specific LSN (or None for latest)
    stream.start(None).await?;

    // Create cancellation token for graceful shutdown
    let cancel_token = CancellationToken::new();

    // Process events
    loop {
        match stream.next_event(&cancel_token).await? {
            Some(event) => {
                println!("Received event: {:?}", event);
                
                // Update LSN feedback after processing
                if let Some(lsn) = event.lsn {
                    lsn_feedback.update_applied_lsn(lsn.value());
                }
            }
            None => {
                // No event available, continue
            }
        }
    }
}

LSN Tracking

Thread-safe LSN tracking for feedback to PostgreSQL:

use pg_walstream::SharedLsnFeedback;
use std::sync::Arc;

let feedback = SharedLsnFeedback::new_shared();

// Producer thread: read LSN from feedback
let (flushed_lsn, applied_lsn) = feedback.get_feedback_lsn();

// Consumer thread: update LSN after processing
feedback.update_applied_lsn(commit_lsn);

PostgreSQL Setup

Before using this library, you need to configure PostgreSQL for logical replication:

1. Configure PostgreSQL

Edit postgresql.conf:

wal_level = logical
max_replication_slots = 4
max_wal_senders = 4

Restart PostgreSQL after making these changes.

2. Create a Publication

-- Create a publication for specific tables
CREATE PUBLICATION my_publication FOR TABLE users, orders;

-- Or publish all tables
CREATE PUBLICATION my_publication FOR ALL TABLES;

3. Create Replication User

-- Create a user with replication privileges
CREATE USER replication_user WITH REPLICATION PASSWORD 'secure_password';

-- Grant necessary permissions
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
GRANT USAGE ON SCHEMA public TO replication_user;

Message Types

The library supports all PostgreSQL logical replication message types:

Protocol Version 1 Messages

  • BEGIN: Transaction start
  • COMMIT: Transaction commit
  • ORIGIN: Replication origin
  • RELATION: Table schema definition
  • TYPE: Data type definition
  • INSERT: Row insertion
  • UPDATE: Row update
  • DELETE: Row deletion
  • TRUNCATE: Table truncation
  • MESSAGE: Generic message

Protocol Version 2+ Messages (Streaming)

  • STREAM_START: Streaming transaction start
  • STREAM_STOP: Streaming transaction segment end
  • STREAM_COMMIT: Streaming transaction commit
  • STREAM_ABORT: Streaming transaction abort

Protocol Version 3+ Messages (Two-Phase Commit)

  • BEGIN_PREPARE: Prepared transaction start
  • PREPARE: Transaction prepare
  • COMMIT_PREPARED: Commit prepared transaction
  • ROLLBACK_PREPARED: Rollback prepared transaction
  • STREAM_PREPARE: Stream prepare message

The project includes 95 comprehensive unit tests covering:

  • Protocol message parsing
  • Buffer operations
  • LSN tracking and thread safety
  • Error handling
  • Retry logic
  • Type conversions

Architecture

┌─────────────────────────────────────────┐
│         Application Layer               │
│  (Your CDC / Replication Logic)        │
└──────────────┬──────────────────────────┘
               │
┌──────────────▼──────────────────────────┐
│    LogicalReplicationStream             │
│  - Connection management                │
│  - Event processing                     │
│  - LSN feedback                         │
└──────────────┬──────────────────────────┘
               │
┌──────────────▼──────────────────────────┐
│  LogicalReplicationParser               │
│  - Protocol parsing                     │
│  - Message deserialization              │
└──────────────┬──────────────────────────┘
               │
┌──────────────▼──────────────────────────┐
│     BufferReader / BufferWriter         │
│  - Zero-copy operations                 │
│  - Binary protocol handling             │
└─────────────────────────────────────────┘

Performance Considerations

  • Zero-Copy: Uses bytes::Bytes for efficient buffer management
  • Atomic Operations: Thread-safe LSN tracking with minimal overhead
  • Connection Pooling: Reusable connection with automatic retry
  • Streaming Support: Handle large transactions without memory issues

Limitations

  • Requires PostgreSQL 14 or later for full protocol support
  • Logical replication slot must be created before streaming
  • Binary protocol only (no text-based protocol support)
  • Requires replication permission for the database user

Resources

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Author

Daniel Shih (dog830228@gmail.com)