pg-walstream
A high-performance Rust library for PostgreSQL logical and physical replication protocol parsing and streaming. Provides a robust, type-safe interface for consuming PostgreSQL Write-Ahead Log (WAL) streams.
Features
- Full Logical Replication Support: Implements PostgreSQL logical replication protocol versions 1-4
- Physical Replication Support: Stream raw WAL data for standby servers and PITR
- Base Backup Support: Full
BASE_BACKUPcommand with progress, compression, and manifest options - Pure-Rust Backend: Optional
rustls-tlsfeature eliminates all C dependencies (no libpq, no libclang), usingaws-lc-rsfor hardware-accelerated TLS (AES-NI, AVX2, SHA-NI) - TLS/SSL Support: All PostgreSQL SSL modes (
disable,allow,prefer,require,verify-ca,verify-full) - Authentication: Cleartext, MD5, and SCRAM-SHA-256 authentication methods
- Streaming Transactions: Support for streaming large transactions (protocol v2+)
- Two-Phase Commit: Prepared transaction support (protocol v3+)
- Parallel Streaming: Multi-stream parallel replication (protocol v4+)
- Zero-Copy Operations: Efficient buffer management using the
bytescrate with drain-loop batch queue optimization - Thread-Safe LSN Tracking: Atomic LSN feedback for producer-consumer patterns
- Connection Management: Built-in connection handling with exponential backoff retry logic
- Type-Safe API: Strongly typed message parsing with comprehensive error handling
- Replication Slot Management: Create, alter, read, and drop slots with full option support
- Hot Standby Feedback: Send hot standby feedback messages for physical replication
Send-Safe Streams:LogicalReplicationStreamisSend, compatible withtokio::spawn
Installation
Add this to your Cargo.toml:
[]
= "0.6.0"
By default, this uses the libpq backend (C FFI). For a pure-Rust build with no system dependencies:
[]
= { = "0.6.0", = false, = ["rustls-tls"] }
Feature Flags
pg-walstream provides two mutually exclusive connection backends, selected at compile time:
| Feature | Default | C Dependencies | Description |
|---|---|---|---|
libpq |
Yes | libpq-dev, libclang-dev |
Uses PostgreSQL's C client library via FFI. Battle-tested, supports all auth methods natively. |
rustls-tls |
No | cmake, gcc (build-time only) |
Pure-Rust implementation using rustls with aws-lc-rs crypto backend for hardware-accelerated TLS. No runtime C dependencies. |
Note: Enabling both features simultaneously will cause a compile error.
System Dependencies
System dependencies are only required when using the default libpq feature. The rustls-tls feature requires only cmake and a C compiler at build time (for the aws-lc-rs crypto library), with no runtime dependencies.
For libpq feature (default)
Ubuntu/Debian:
CentOS/RHEL/Fedora:
# or
For rustls-tls feature
Requires cmake and a C compiler at build time for aws-lc-rs (hardware-accelerated cryptography):
Ubuntu/Debian:
Then add to Cargo.toml:
= { = "0.5.1", = false, = ["rustls-tls"] }
Quick Start
Logical Replication - Stream API
The Stream API provides an ergonomic, iterator-like interface:
use ;
use Arc;
use Duration;
async
Note: The exported snapshot is only valid between
ensure_replication_slot()andstart(). OnceSTART_REPLICATIONis issued, PostgreSQL destroys the snapshot. You must read the snapshot on a separate connection before callingstart().
Working with Event Data
Events carry row data as [RowData] an ordered list of (Arc<str>, ColumnValue) pairs.
[ColumnValue] is a lightweight enum (Null | Text(Bytes) | Binary(Bytes)) that preserves
the raw PostgreSQL wire representation with zero-copy semantics.
Schema, table, and column names are Arc<str> (reference-counted, zero-cost cloning):
use ;
// Pattern match on event types
match &event.event_type
Using the Polling API
For more control, you can use the traditional polling approach:
use ;
use Arc;
use Duration;
async
LSN Tracking
Thread-safe LSN tracking for feedback to PostgreSQL:
use SharedLsnFeedback;
use Arc;
let feedback = new_shared;
// Producer thread: read LSN from feedback
let = feedback.get_feedback_lsn;
// Consumer thread: update LSN after processing
feedback.update_applied_lsn;
PostgreSQL Setup
Before using this library, you need to configure PostgreSQL for replication:
1. Configure PostgreSQL
Edit postgresql.conf:
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
Restart PostgreSQL after making these changes.
2. Create a Publication
-- Create a publication for specific tables
CREATE PUBLICATION my_publication FOR TABLE users, orders;
-- Or publish all tables
CREATE PUBLICATION my_publication FOR ALL TABLES;
3. Create Replication User
-- Create a user with replication privileges
;
-- Grant necessary permissions
SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
USAGE ON SCHEMA public TO replication_user;
4. Replication Slot Options
The library provides full control over replication slot creation. The correct SQL syntax is automatically selected based on the connected PostgreSQL version:
- PG14: Legacy positional keyword syntax (
EXPORT_SNAPSHOT,NOEXPORT_SNAPSHOT,USE_SNAPSHOT,TWO_PHASE,RESERVE_WAL) - PG15+: Modern parenthesized options syntax (
(SNAPSHOT 'export', TWO_PHASE true, ...))
| Option | Description | PG Version |
|---|---|---|
temporary |
Temporary slot (not persisted to disk, dropped on disconnect) | 14+ |
two_phase |
Enable two-phase commit for logical slots | 14+ |
reserve_wal |
Reserve WAL immediately for physical slots | 14+ |
snapshot |
Snapshot behavior: "export", "use", or "nothing" |
14+ |
failover |
Enable slot synchronization to standbys for HA | 16+ |
Note: If both
two_phaseandsnapshotare set,two_phasetakes priority. Thefailoveroption is not available on PG14 and will return an error.
Examples
The examples/ directory contains runnable examples demonstrating various usage patterns:
| Example | Description |
|---|---|
basic-streaming |
High-level futures::Stream API with stream combinators (filter, take_while) |
polling |
Manual polling loop using next_event() for custom integration scenarios |
safe-transaction-consumer |
Production-grade transaction-aware CDC consumer with ordered commits and safe LSN feedback |
rate-limited-streaming |
Rate-limited consumption using tokio_stream::StreamExt::throttle |
tokio-spawn-streaming |
Producer/consumer pattern via tokio::spawn with mpsc channel (demonstrates Send safety) |
pg-basebackup |
Full physical backup tool using BASE_BACKUP with tar extraction and progress reporting |
arbitrary-fuzzing |
Property-based fuzzing of all protocol types using the arbitrary crate |
Message Types
The library supports all PostgreSQL logical replication message types:
Protocol Version 1 Messages
- BEGIN: Transaction start
- COMMIT: Transaction commit
- ORIGIN: Replication origin
- RELATION: Table schema definition
- TYPE: Data type definition
- INSERT: Row insertion
- UPDATE: Row update
- DELETE: Row deletion
- TRUNCATE: Table truncation
- MESSAGE: Generic message
Protocol Version 2+ Messages (Streaming)
- STREAM_START: Streaming transaction start
- STREAM_STOP: Streaming transaction segment end
- STREAM_COMMIT: Streaming transaction commit
- STREAM_ABORT: Streaming transaction abort
Protocol Version 3+ Messages (Two-Phase Commit)
- BEGIN_PREPARE: Prepared transaction start
- PREPARE: Transaction prepare
- COMMIT_PREPARED: Commit prepared transaction
- ROLLBACK_PREPARED: Rollback prepared transaction
- STREAM_PREPARE: Stream prepare message
Architecture
┌──────────────────────────────────────────┐
│ Application Layer │
│ (Your CDC / Replication Logic) │
└──────────────┬───────────────────────────┘
│
┌──────────────▼───────────────────────────┐
│ LogicalReplicationStream │
│ - Connection management & retry │
│ - Event processing & LSN feedback │
│ - Snapshot export support │
└──────────────┬───────────────────────────┘
│
┌──────────────▼───────────────────────────┐
│ LogicalReplicationParser │
│ - Protocol v1-v4 parsing │
│ - Zero-copy message deserialization │
│ - Streaming transaction support │
└──────────────┬───────────────────────────┘
│
┌──────────────▼───────────────────────────┐
│ PgReplicationConnection │
│ ┌─────────────────┬──────────────────┐ │
│ │ libpq backend │ rustls-tls │ │
│ │ (C FFI) │ (pure Rust) │ │
│ │ │ │ │
│ │ libpq-sys │ rustls + │ │
│ │ + libclang │ aws-lc-rs + │ │
│ │ │ postgres-protocol│ │
│ └─────────────────┴──────────────────┘ │
│ Compile-time feature flag selection │
└──────────────┬───────────────────────────┘
│
┌──────────────▼───────────────────────────┐
│ BufferReader / BufferWriter │
│ - Zero-copy operations (bytes crate) │
│ - Binary protocol handling │
│ - Drain-loop batch queue optimization │
└──────────────────────────────────────────┘
Stress Test & System Threshold Analysis
Progressive writer concurrency ramp (16 - 192 writers) to find the library's CPU saturation point and throughput ceiling.
Test Environment
CPU: AMD EPYC 7763 64-Core Processor
CPU cores: 8
Memory: 32812624 kB
OS: Ubuntu 22.04.5 LTS
USE TLS/SSL (feature: libpq)
Stress Ramp: Throughput vs CPU at Increasing Writer Concurrency
Writers | DML ev/s | Proc CPU% | Sys CPU% | RSS (MB)
--------|-------------|-----------|-----------|--------
16 | 48,805 | 31.2% | 5.5% | 18.6
32 | 88,645 | 54.3% | 8.6% | 18.6
48 | 135,669 | 84.3% | 12.8% | 18.7
64 | 135,681 | 82.0% | 11.8% | 18.6
96 | 135,524 | 83.2% | 12.6% | 18.7
128 | 135,367 | 84.3% | 12.8% | 18.7
192 | 132,571 | 82.8% | 13.0% | 18.6
Throughput scaling with writer concurrency:
16w | 48,805 ev/s |██████████████
32w | 88,645 ev/s |██████████████████████████
48w | 135,669 ev/s |████████████████████████████████████████
64w | 135,681 ev/s |████████████████████████████████████████
96w | 135,524 ev/s |████████████████████████████████████████
128w | 135,367 ev/s |████████████████████████████████████████
192w | 132,571 ev/s |███████████████████████████████████████
CPU usage scaling with writer concurrency:
16w | proc 31.2% sys 5.5% |████████████
32w | proc 54.3% sys 8.6% |██████████████████████
48w | proc 84.3% sys 12.8% |██████████████████████████████████
64w | proc 82.0% sys 11.8% |█████████████████████████████████
96w | proc 83.2% sys 12.6% |█████████████████████████████████
128w | proc 84.3% sys 12.8% |██████████████████████████████████
192w | proc 82.8% sys 13.0% |█████████████████████████████████
Legend: █ = process CPU (pg-walstream), ░ = additional system CPU
Memory (RSS) scaling with writer concurrency:
16w | 18.6 MB avg / 18.6 MB peak |████████████████████████████████████████
32w | 18.6 MB avg / 18.6 MB peak |████████████████████████████████████████
48w | 18.7 MB avg / 18.7 MB peak |████████████████████████████████████████
64w | 18.6 MB avg / 18.6 MB peak |████████████████████████████████████████
96w | 18.7 MB avg / 18.7 MB peak |████████████████████████████████████████
128w | 18.7 MB avg / 18.7 MB peak |████████████████████████████████████████
192w | 18.6 MB avg / 18.6 MB peak |████████████████████████████████████████
Threshold Analysis:
- Peak throughput: 135,681 DML events/sec at 64w concurrency
- Throughput saturation detected at 64 writers (throughput gain < 5% or regression)
- Library CPU moderate: Process CPU peaked at 88.8% -- approaching but not yet at saturation
- Memory: Peak process RSS 18.7 MB / 32044 MB total system (0.06% utilization)
- CPU efficiency: ~1,528 DML events/sec per 1% CPU at peak
Linux VM TCP Tuning for Production
When streaming WAL over high-latency links (e.g., cross-region Azure PostgreSQL), the default Linux TCP buffer sizes can become the throughput bottleneck. The kernel's default rmem_max of 208 KB limits the TCP receive window, which — combined with round-trip latency — caps throughput via the Bandwidth-Delay Product (BDP):
Recommended sysctl Settings
# --- TCP buffer sizes ---
# Allow up to 64 MB per-socket receive/send buffers (kernel will auto-tune within this ceiling)
net.core.rmem_max = 67108864
net.core.wmem_max = 67108864
# TCP auto-tuning ranges: min / default / max (bytes)
# The kernel dynamically adjusts each socket's buffer within these bounds
net.ipv4.tcp_rmem = 4096 262144 67108864
net.ipv4.tcp_wmem = 4096 262144 67108864
# --- Congestion control ---
# BBR provides significantly better throughput than cubic on high-latency links
net.ipv4.tcp_congestion_control = bbr
# --- Packet backlog ---
# Increase the NIC receive queue (helps at high packet rates)
net.core.netdev_max_backlog = 5000
Apply immediately:
Why Each Parameter Matters
| Parameter | Default | Recommended | Why |
|---|---|---|---|
rmem_max |
208 KB | 64 MB | Caps TCP receive window; directly limits throughput on high-RTT links |
wmem_max |
208 KB | 64 MB | Caps TCP send window; limits outbound throughput for feedback messages |
tcp_rmem (max) |
6 MB | 64 MB | Per-socket auto-tuned receive buffer ceiling |
tcp_wmem (max) |
4 MB | 64 MB | Per-socket auto-tuned send buffer ceiling |
tcp_congestion_control |
cubic | bbr | BBR reacts to actual bandwidth, not packet loss; better on cloud networks |
netdev_max_backlog |
1000 | 5000 | Prevents packet drops under burst traffic at NIC level |
Note: These settings affect all TCP connections on the VM, not just pg-walstream. The kernel auto-tunes actual buffer usage within the configured ceiling, so idle connections do not consume 64 MB each.
Limitations
- Requires PostgreSQL 14 or later for full protocol support
- Logical replication slot must be created before streaming
- Binary protocol only (no text-based protocol support)
- Requires
replicationpermission for the database user
Resources
- PostgreSQL Logical Replication Documentation
- Logical Replication Message Formats
- Replication Protocol
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
This project is licensed under the BSD 3-Clause License.
Author
Daniel Shih (dog830228@gmail.com)