pg-walstream
A high-performance Rust library for PostgreSQL logical and physical replication protocol parsing and streaming. This library provides a robust, type-safe interface for consuming PostgreSQL Write-Ahead Log (WAL) streams.
Features
- Full Logical Replication Support: Implements PostgreSQL logical replication protocol versions 1-4
- Physical Replication Support: Stream raw WAL data for standby servers and PITR
- Streaming Transactions: Support for streaming large transactions (protocol v2+)
- Two-Phase Commit: Prepared transaction support (protocol v3+)
- Parallel Streaming: Multi-stream parallel replication (protocol v4+)
- Zero-Copy Operations: Efficient buffer management using the
bytescrate - Thread-Safe LSN Tracking: Atomic LSN feedback for producer-consumer patterns
- Connection Management: Built-in connection handling with exponential backoff retry logic
- Type-Safe API: Strongly typed message parsing with comprehensive error handling
- Configurable Slot Options: Temporary slots, snapshot export, two-phase, and failover support
Installation
Add this to your Cargo.toml:
[]
= "0.1.0"
System Dependencies
Make sure you have libpq development libraries installed:
Ubuntu/Debian:
CentOS/RHEL/Fedora:
# or
Quick Start
Logical Replication - Stream API
The Stream API provides an ergonomic, iterator-like interface:
use ;
use Arc;
use Duration;
async
Note: The exported snapshot is only valid while the transaction that created the replication slot is still open. You must read the snapshot before consuming WAL events or closing the replication connection. Temporary slots are automatically recreated on connection recovery.
Working with Event Data
Events carry row data as RowData — an ordered list of (Arc<str>, Value) pairs.
Schema, table, and column names are Arc<str> (reference-counted, zero-cost cloning):
use ;
// Pattern match on event types
match &event.event_type
Using the Polling API
For more control, you can use the traditional polling approach:
use ;
use Arc;
use Duration;
async
LSN Tracking
Thread-safe LSN tracking for feedback to PostgreSQL:
use SharedLsnFeedback;
use Arc;
let feedback = new_shared;
// Producer thread: read LSN from feedback
let = feedback.get_feedback_lsn;
// Consumer thread: update LSN after processing
feedback.update_applied_lsn;
PostgreSQL Setup
Before using this library, you need to configure PostgreSQL for replication:
1. Configure PostgreSQL
Edit postgresql.conf:
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
Restart PostgreSQL after making these changes.
2. Create a Publication
-- Create a publication for specific tables
CREATE PUBLICATION my_publication FOR TABLE users, orders;
-- Or publish all tables
CREATE PUBLICATION my_publication FOR ALL TABLES;
3. Create Replication User
-- Create a user with replication privileges
;
-- Grant necessary permissions
SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
USAGE ON SCHEMA public TO replication_user;
4. Create Replication Slot with Advanced Options
The library provides two methods for creating replication slots:
Replication Slot Options
The library automatically selects the correct CREATE_REPLICATION_SLOT SQL syntax based on the connected PostgreSQL server version:
- PG14: Legacy positional keyword syntax (
EXPORT_SNAPSHOT,NOEXPORT_SNAPSHOT,USE_SNAPSHOT,TWO_PHASE,RESERVE_WAL) - PG15+: Modern parenthesized options syntax (
(SNAPSHOT 'export', TWO_PHASE true, ...))
| Option | Description | PG Version |
|---|---|---|
temporary |
Temporary slot (not persisted to disk, dropped on disconnect) | 14+ |
two_phase |
Enable two-phase commit for logical slots | 14+ |
reserve_wal |
Reserve WAL immediately for physical slots | 14+ |
snapshot |
Snapshot behavior: "export", "use", or "nothing" |
14+ |
failover |
Enable slot synchronization to standbys for HA | 16+ |
Note: : If both
two_phaseandsnapshotare set,two_phasetakes priority. Thefailoveroption is not available on PG14 and will return an error.
Message Types
The library supports all PostgreSQL logical replication message types:
Protocol Version 1 Messages
- BEGIN: Transaction start
- COMMIT: Transaction commit
- ORIGIN: Replication origin
- RELATION: Table schema definition
- TYPE: Data type definition
- INSERT: Row insertion
- UPDATE: Row update
- DELETE: Row deletion
- TRUNCATE: Table truncation
- MESSAGE: Generic message
Protocol Version 2+ Messages (Streaming)
- STREAM_START: Streaming transaction start
- STREAM_STOP: Streaming transaction segment end
- STREAM_COMMIT: Streaming transaction commit
- STREAM_ABORT: Streaming transaction abort
Protocol Version 3+ Messages (Two-Phase Commit)
- BEGIN_PREPARE: Prepared transaction start
- PREPARE: Transaction prepare
- COMMIT_PREPARED: Commit prepared transaction
- ROLLBACK_PREPARED: Rollback prepared transaction
- STREAM_PREPARE: Stream prepare message
Architecture
┌─────────────────────────────────────────┐
│ Application Layer │
│ (Your CDC / Replication Logic) │
└──────────────┬──────────────────────────┘
│
┌──────────────▼──────────────────────────┐
│ LogicalReplicationStream │
│ - Connection management │
│ - Event processing │
│ - LSN feedback │
└──────────────┬──────────────────────────┘
│
┌──────────────▼──────────────────────────┐
│ LogicalReplicationParser │
│ - Protocol parsing │
│ - Message deserialization │
└──────────────┬──────────────────────────┘
│
┌──────────────▼──────────────────────────┐
│ BufferReader / BufferWriter │
│ - Zero-copy operations │
│ - Binary protocol handling │
└─────────────────────────────────────────┘
Performance Considerations
- Zero-Copy: Uses
bytes::Bytesfor efficient buffer management - Arc-shared column metadata: Column names, schema, and table names use
Arc<str>— cloning is a single atomic increment instead of a heap allocation per event - RowData (ordered Vec): Row payloads use
RowData(aVec<(Arc<str>, Value)>) instead ofHashMap<String, Value>, eliminating per-event hashing overhead and extra allocations - Atomic Operations: Thread-safe LSN tracking with minimal overhead
- Connection Pooling: Reusable connection with automatic retry
- Streaming Support: Handle large transactions without memory issues
- Efficient Blocking: Async I/O with tokio::select eliminates busy-waiting
Limitations
- Requires PostgreSQL 14 or later for full protocol support
- Logical replication slot must be created before streaming
- Binary protocol only (no text-based protocol support)
- Requires
replicationpermission for the database user
Resources
- PostgreSQL Logical Replication Documentation
- Logical Replication Message Formats
- Replication Protocol
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Author
Daniel Shih (dog830228@gmail.com)