Expand description
PostgreSQL Replication Source Plugin for Drasi
This plugin captures data changes from PostgreSQL databases using logical replication. It connects to PostgreSQL as a replication client and decodes Write-Ahead Log (WAL) messages in real-time, converting them to Drasi source change events.
§Prerequisites
Before using this source, you must configure PostgreSQL for logical replication:
-
Enable logical replication in
postgresql.conf:wal_level = logical max_replication_slots = 10 max_wal_senders = 10 -
Create a publication for the tables you want to monitor:
CREATE PUBLICATION drasi_publication FOR TABLE users, orders; -
Create a replication slot (optional - the source can create one automatically):
SELECT pg_create_logical_replication_slot('drasi_slot', 'pgoutput'); -
Grant replication permissions to the database user:
ALTER ROLE drasi_user REPLICATION; GRANT SELECT ON TABLE users, orders TO drasi_user;
§Architecture
The source has two main components:
-
Bootstrap Handler: Performs an initial snapshot of table data when a query subscribes with bootstrap enabled. Uses the replication slot’s snapshot LSN to ensure consistency.
-
Streaming Handler: Continuously reads WAL messages and decodes them using the
pgoutputprotocol. Handles INSERT, UPDATE, and DELETE operations.
§Configuration
| Field | Type | Default | Description |
|---|---|---|---|
host | string | "localhost" | PostgreSQL host |
port | u16 | 5432 | PostgreSQL port |
database | string | required | Database name |
user | string | required | Database user (must have replication permission) |
password | string | "" | Database password |
tables | string[] | [] | Tables to replicate |
slot_name | string | "drasi_slot" | Replication slot name |
publication_name | string | "drasi_publication" | Publication name |
ssl_mode | string | "prefer" | SSL mode: disable, prefer, require |
table_keys | TableKeyConfig[] | [] | Primary key configuration for tables |
§Example Configuration (YAML)
source_type: postgres
properties:
host: db.example.com
port: 5432
database: production
user: replication_user
password: secret
tables:
- users
- orders
slot_name: drasi_slot
publication_name: drasi_publication
table_keys:
- table: users
key_columns: [id]
- table: orders
key_columns: [order_id]§Data Format
The PostgreSQL source decodes WAL messages and converts them to Drasi source changes. Each row change is mapped as follows:
§Node Mapping
- Element ID:
{schema}:{table}:{primary_key_value}(e.g.,public:users:123) - Labels:
[{table_name}](e.g.,["users"]) - Properties: All columns from the row (column names become property keys)
§WAL Message to SourceChange
| WAL Operation | SourceChange |
|---|---|
| INSERT | SourceChange::Insert { element: Node } |
| UPDATE | SourceChange::Update { element: Node } |
| DELETE | SourceChange::Delete { metadata } |
§Example Mapping
Given a PostgreSQL table:
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(255),
age INTEGER
);
INSERT INTO users (name, email, age) VALUES ('Alice', 'alice@example.com', 30);Produces a SourceChange equivalent to:
{
"type": "Insert",
"element": {
"metadata": {
"element_id": "public:users:1",
"source_id": "pg-source",
"labels": ["users"],
"effective_from": 1699900000000000
},
"properties": {
"id": 1,
"name": "Alice",
"email": "alice@example.com",
"age": 30
}
}
}§Usage Example
use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceBuilder};
use std::sync::Arc;
let config = PostgresSourceBuilder::new()
.with_host("db.example.com")
.with_database("production")
.with_user("replication_user")
.with_password("secret")
.with_tables(vec!["users".to_string(), "orders".to_string()])
.build();
let source = Arc::new(PostgresReplicationSource::new("pg-source", config)?);
drasi.add_source(source).await?;Re-exports§
pub use config::PostgresSourceConfig;pub use config::SslMode;pub use config::TableKeyConfig;
Modules§
Structs§
- Postgres
Replication Source - PostgreSQL replication source that captures changes via logical replication.
- Postgres
Source Builder - Builder for PostgreSQL source configuration.