Please check the build logs for more information.
See Builds for ideas on how to fix a failed build, or Metadata for how to configure docs.rs builds.
If you believe this is docs.rs' fault, open an issue.
PostgreSQL Replication Source
Overview
The PostgreSQL Replication Source is a Change Data Capture (CDC) plugin for Drasi that streams data changes from PostgreSQL databases in real-time using logical replication and the Write-Ahead Log (WAL). It captures INSERT, UPDATE, and DELETE operations as they occur and transforms them into Drasi SourceChange events for continuous query processing.
Key Capabilities:
- Real-time change streaming via PostgreSQL logical replication
- Transactionally-consistent data capture using WAL decoding
- Initial snapshot bootstrapping with LSN coordination
- Automatic reconnection and recovery on connection failures
- Support for multiple PostgreSQL data types with type-safe conversion
- Primary key detection and custom key configuration
- SCRAM-SHA-256 authentication (recommended) and cleartext fallback
Use Cases:
- Real-time data synchronization from PostgreSQL databases
- Event-driven architectures based on database changes
- Building reactive applications that respond to data mutations
- Maintaining materialized views or derived data sets
- Audit logging and change tracking
- Data replication and ETL pipelines
Architecture
Components
The PostgreSQL source consists of several specialized modules:
- Connection (
connection.rs): Manages the PostgreSQL replication protocol connection, authentication (including SCRAM-SHA-256), and message exchange - Stream (
stream.rs): Handles the continuous WAL streaming loop, message processing, and transaction coordination - Decoder (
decoder.rs): Decodes binary pgoutput messages into structured WAL events with full type support - Protocol (
protocol.rs): Implements PostgreSQL wire protocol encoding/decoding - SCRAM (
scram.rs): SCRAM-SHA-256 authentication implementation - Types (
types.rs): Type definitions for PostgreSQL values and WAL messages
Note: Bootstrap functionality is provided by the separate drasi-bootstrap-postgres crate via the pluggable bootstrap provider pattern.
Data Flow
PostgreSQL WAL → Connection → Decoder → Stream → SourceChange Events
↓
Transaction
Grouping
↓
Dispatcher → Queries
Bootstrap Flow (via pluggable bootstrap provider):
Bootstrap Request → Bootstrap Provider → SourceChange Events
↓
Coordinate with Streaming
Prerequisites
PostgreSQL Configuration
The PostgreSQL database must be configured for logical replication:
-
PostgreSQL Version: PostgreSQL 10 or later (requires pgoutput plugin)
-
Configuration Parameters (
postgresql.conf):wal_level = logical max_replication_slots = 10 # At least 1 per source max_wal_senders = 10 # At least 1 per source -
Database User Permissions:
-- Grant replication privilege drasi_user WITH REPLICATION; -- Grant table access SELECT ON ALL TABLES IN SCHEMA public TO drasi_user; USAGE ON SCHEMA public TO drasi_user; -
Publication Setup:
-- Create a publication for specific tables CREATE PUBLICATION drasi_publication FOR TABLE users, orders, products; -- Or for all tables CREATE PUBLICATION drasi_publication FOR ALL TABLES; -
Replication Slot: The source automatically creates a replication slot with the configured name. If it exists, it will be reused.
-
Replica Identity (recommended for full UPDATE/DELETE data):
-- For tables without primary keys or needing full row data your_table REPLICA IDENTITY FULL; -- Default behavior (uses primary key) your_table REPLICA IDENTITY DEFAULT;
Configuration
Builder Pattern (Recommended)
The builder pattern provides type-safe configuration with sensible defaults:
use PostgresReplicationSource;
use ;
let source = builder
.with_host
.with_port
.with_database
.with_user
.with_password
.with_tables
.with_slot_name
.with_publication_name
.with_ssl_mode
.add_table_key
.with_dispatch_mode
.with_dispatch_buffer_capacity
.with_auto_start
.build?;
Config Struct Approach
Alternatively, construct the config struct directly:
use ;
use ;
let config = PostgresSourceConfig ;
let source = new?;
Configuration Options
| Option | Type | Default | Description |
|---|---|---|---|
id |
String |
(Required) | Unique identifier for the source instance |
host |
String |
"localhost" |
PostgreSQL server hostname or IP address |
port |
u16 |
5432 |
PostgreSQL server port number |
database |
String |
(Required) | Database name to connect to |
user |
String |
(Required) | Database user with replication privileges |
password |
String |
"" |
Database password (supports SCRAM-SHA-256 and cleartext) |
tables |
Vec<String> |
[] |
List of tables to monitor (empty = all tables in publication) |
slot_name |
String |
"drasi_slot" |
Replication slot name (created if doesn't exist) |
publication_name |
String |
"drasi_publication" |
PostgreSQL publication to subscribe to |
ssl_mode |
SslMode |
SslMode::Prefer |
SSL mode: Disable, Prefer, or Require |
table_keys |
Vec<TableKeyConfig> |
[] |
Manual primary key configuration (see below) |
dispatch_mode |
Option<DispatchMode> |
None |
Channel dispatch mode (builder only) |
dispatch_buffer_capacity |
Option<usize> |
None |
Dispatch buffer size (builder only) |
auto_start |
bool |
true |
Whether to start automatically when added to DrasiLib |
TableKeyConfig
Manually specify primary key columns for element ID generation:
| Field | Type | Description |
|---|---|---|
table |
String |
Table name (e.g., "users" or "schema.table") |
key_columns |
Vec<String> |
Column names to use as primary key |
Example:
TableKeyConfig
// Composite key
TableKeyConfig
Note: User-configured keys override automatically detected primary keys.
Input Schema
PostgreSQL Logical Replication (pgoutput)
The source consumes WAL messages in the pgoutput binary format:
WAL Message Types:
B(Begin): Transaction startC(Commit): Transaction commitR(Relation): Table metadata (schema, columns, types)I(Insert): Row insertionU(Update): Row update (may include old tuple)D(Delete): Row deletionT(Truncate): Table truncate (not implemented)
Relation Metadata includes:
- Namespace (schema name)
- Table name
- Replica identity mode
- Column definitions (name, OID, type modifier, key flag)
Tuple Data encoding:
- Column count (u16)
- Per-column: type marker (
n= null,u= unchanged TOAST,t= text) + length + data
Type Support
The decoder supports PostgreSQL's built-in types via OID mapping:
| PostgreSQL Type | OID | Decoded As |
|---|---|---|
boolean |
16 | PostgresValue::Bool |
int2 (smallint) |
21 | PostgresValue::Int2 |
int4 (integer) |
23 | PostgresValue::Int4 |
int8 (bigint) |
20 | PostgresValue::Int8 |
float4 (real) |
700 | PostgresValue::Float4 |
float8 (double) |
701 | PostgresValue::Float8 |
numeric / decimal |
1700 | PostgresValue::Numeric |
text |
25 | PostgresValue::Text |
varchar |
1043 | PostgresValue::Varchar |
char / bpchar |
1042 | PostgresValue::Char |
uuid |
2950 | PostgresValue::Uuid |
timestamp |
1114 | PostgresValue::Timestamp |
timestamptz |
1184 | PostgresValue::TimestampTz |
date |
1082 | PostgresValue::Date |
time |
1083 | PostgresValue::Time |
json |
114 | PostgresValue::Json |
jsonb |
3802 | PostgresValue::Jsonb |
bytea |
17 | PostgresValue::Bytea |
| Unknown | - | PostgresValue::Text (fallback) |
Usage Examples
Basic Usage
use PostgresReplicationSource;
async
With Custom Primary Keys
use PostgresReplicationSource;
use TableKeyConfig;
let source = builder
.with_host
.with_database
.with_user
.add_table_key
.build?;
source.start.await?;
With SSL and Custom Dispatch
use PostgresReplicationSource;
use SslMode;
use DispatchMode;
let source = builder
.with_host
.with_database
.with_user
.with_password
.with_ssl_mode
.with_dispatch_mode
.with_dispatch_buffer_capacity
.build?;
source.start.await?;
Using Direct Constructor
use ;
use SslMode;
let config = PostgresSourceConfig ;
let source = new?;
source.start.await?;
Output Format
SourceChange Events
All PostgreSQL changes are transformed into Drasi SourceChange events:
Insert Event:
Insert
Update Event:
Update
Delete Event:
Delete
Element ID Generation
Element IDs are generated using the following priority:
- User-configured keys (from
table_keysconfig) - Detected primary keys (from PostgreSQL system catalogs)
- UUID fallback (if no keys available)
Format:
- Single key:
"table_name:value"(e.g.,"users:12345") - Composite key:
"table_name:value1_value2"(e.g.,"order_items:1001_5") - No key:
"table_name:uuid"(e.g.,"events:550e8400-e29b-41d4-a716-446655440000")
Labels
- Each element receives the table name as its label (case-preserved)
- Schema-qualified tables use fully qualified names for labels
- Example:
"users"table →["users"]label
Advanced Features
Transaction Grouping
Changes are grouped by PostgreSQL transaction and dispatched atomically:
- All changes within a transaction are buffered
- Changes are sent together when the transaction commits
- Ensures transactional consistency in downstream processing
Bootstrap Support
The PostgreSQL source supports pluggable bootstrap providers via the BootstrapProvider trait. Any bootstrap provider implementation can be used with this source:
use PostgresReplicationSource;
let source = builder
.with_host
.with_database
.with_user
.with_password
.with_bootstrap_provider // Any BootstrapProvider impl
.build?;
Common bootstrap provider options include:
PostgresBootstrapProvider(drasi-bootstrap-postgres) - Snapshots directly from PostgreSQLScriptFileBootstrapProvider(drasi-bootstrap-scriptfile) - Loads initial data from JSONL filesNoopBootstrapProvider(drasi-bootstrap-noop) - Skips bootstrap entirely- Custom implementations of the
BootstrapProvidertrait
Automatic Reconnection
The source handles connection failures gracefully:
- Detects connection loss and errors
- Waits 5 seconds before reconnecting
- Re-establishes replication from last confirmed LSN
- Continues streaming without data loss
Keepalive and Feedback
- Sends keepalive responses every 10 seconds
- Reports LSN progress to PostgreSQL
- Responds to server keepalive requests immediately
- Prevents connection timeouts and slot cleanup
Primary Key Detection
During WAL streaming, the source uses primary key information from the relation metadata provided by PostgreSQL's logical replication protocol. For bootstrap, the configured bootstrap provider is responsible for primary key detection.
When using PostgresBootstrapProvider, it queries PostgreSQL system catalogs:
SELECT n.nspname, c.relname, a.attname
FROM pg_constraint con
JOIN pg_class c ON con.conrelid = c.oid
JOIN pg_namespace n ON c.relnamespace = n.oid
JOIN pg_attribute a ON a.attrelid = c.oid
WHERE con.contype = 'p'
AND a.attnum = ANY(con.conkey)
ORDER BY n.nspname, c.relname, array_position(con.conkey, a.attnum)
User-configured table_keys override automatically detected primary keys in both streaming and bootstrap.
Troubleshooting
"permission denied to create replication slot"
Solution: Grant replication privilege
drasi_user WITH REPLICATION;
"logical decoding requires wal_level >= logical"
Solution: Configure PostgreSQL and restart
# postgresql.conf
wal_level = logical
"replication slot already exists"
Options:
- Drop existing slot:
SELECT pg_drop_replication_slot('slot_name'); - Use different
slot_namein config - Reuse existing slot (source will continue from last position)
"UPDATE/DELETE missing old tuple data"
Solution: Set replica identity to FULL
your_table REPLICA IDENTITY FULL;
"No primary key found for table"
Solution: Configure manual keys
.add_table_key
Connection/SSL errors
Solution: Adjust SSL mode
.with_ssl_mode // Try without SSL
// or
.with_ssl_mode // Prefer SSL but allow fallback
Performance Considerations
Memory Usage
- Transaction buffers scale with transaction size
- Large transactions consume more memory
- Bootstrap batches 1000 rows at a time
Network Latency
- High latency increases replication lag
- Co-locate source with PostgreSQL when possible
- Monitor lag via
pg_replication_slots
WAL Retention
- Inactive slots prevent WAL cleanup
- Set
max_slot_wal_keep_sizeto limit retention - Monitor and drop unused slots
Throughput
- Very high write rates may require tuning
- Consider
dispatch_buffer_capacityfor high volume - Use
DispatchMode::Channelfor backpressure control
Monitoring
PostgreSQL Queries
-- Check replication slots
SELECT * FROM pg_replication_slots;
-- Monitor replication lag
SELECT slot_name,
pg_current_wal_lsn AS current_lsn,
confirmed_flush_lsn,
pg_wal_lsn_diff(pg_current_wal_lsn, confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots;
-- Check active replication connections
SELECT * FROM pg_stat_replication;
-- View publications
SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables WHERE pubname = 'drasi_publication';
Log Monitoring
The source logs important events:
info!: Connection events, transaction commits, bootstrap progresswarn!: Missing primary keys, unknown message types, recoverable errorserror!: Connection failures, protocol errors, unrecoverable errorsdebug!: Detailed message processing, WAL decoding
Enable debug logging:
RUST_LOG=drasi_source_postgres=debug
Known Limitations
Not Implemented
- TRUNCATE operations: Logged but not processed into SourceChange events
- Schema changes: DDL operations not captured (requires source restart)
- Composite/Array types: Partial support (may lose type information)
Partial Support
- TOAST values: Unchanged TOAST values decoded as NULL (use REPLICA IDENTITY FULL)
- Binary data: bytea columns base64-encoded to strings
- Numeric precision: Very high-precision decimals may lose precision (converted to f64)
Known Issues
- Multi-schema support requires fully qualified table names
- UUID fallback IDs not stable across restarts (define primary keys)
- Large objects may not capture all changes (use REPLICA IDENTITY FULL)
Developer Notes
Testing
# Unit tests
# With PostgreSQL instance
Authentication Methods Supported
- SCRAM-SHA-256 (recommended, see
scram.rs) - Cleartext password (not recommended for production)
Note: MD5 authentication is explicitly not supported due to security concerns. If your PostgreSQL server requests MD5 authentication, you will receive an error instructing you to configure scram-sha-256 in pg_hba.conf.
Wire Protocol
The source implements PostgreSQL wire protocol 3.0:
- Startup messages with replication mode
- Query messages for replication commands
- CopyData streaming for WAL messages
- Standby status updates for feedback
See protocol.rs and connection.rs for implementation details.
License
Copyright 2025 The Drasi Authors.
Licensed under the Apache License, Version 2.0. See LICENSE file for details.