Skip to main content

Crate drasi_source_postgres

Crate drasi_source_postgres 

Source
Expand description

PostgreSQL Replication Source Plugin for Drasi

This plugin captures data changes from PostgreSQL databases using logical replication. It connects to PostgreSQL as a replication client and decodes Write-Ahead Log (WAL) messages in real-time, converting them to Drasi source change events.

§Prerequisites

Before using this source, you must configure PostgreSQL for logical replication:

  1. Enable logical replication in postgresql.conf:

    wal_level = logical
    max_replication_slots = 10
    max_wal_senders = 10
  2. Create a publication for the tables you want to monitor:

    CREATE PUBLICATION drasi_publication FOR TABLE users, orders;
  3. Create a replication slot (optional - the source can create one automatically):

    SELECT pg_create_logical_replication_slot('drasi_slot', 'pgoutput');
  4. Grant replication permissions to the database user:

    ALTER ROLE drasi_user REPLICATION;
    GRANT SELECT ON TABLE users, orders TO drasi_user;

§Architecture

The source has two main components:

  • Bootstrap Handler: Performs an initial snapshot of table data when a query subscribes with bootstrap enabled. Uses the replication slot’s snapshot LSN to ensure consistency.

  • Streaming Handler: Continuously reads WAL messages and decodes them using the pgoutput protocol. Handles INSERT, UPDATE, and DELETE operations.

§Configuration

FieldTypeDefaultDescription
hoststring"localhost"PostgreSQL host
portu165432PostgreSQL port
databasestringrequiredDatabase name
userstringrequiredDatabase user (must have replication permission)
passwordstring""Database password
tablesstring[][]Tables to replicate
slot_namestring"drasi_slot"Replication slot name
publication_namestring"drasi_publication"Publication name
ssl_modestring"prefer"SSL mode: disable, prefer, require
table_keysTableKeyConfig[][]Primary key configuration for tables

§Example Configuration (YAML)

source_type: postgres
properties:
  host: db.example.com
  port: 5432
  database: production
  user: replication_user
  password: secret
  tables:
    - users
    - orders
  slot_name: drasi_slot
  publication_name: drasi_publication
  table_keys:
    - table: users
      key_columns: [id]
    - table: orders
      key_columns: [order_id]

§Data Format

The PostgreSQL source decodes WAL messages and converts them to Drasi source changes. Each row change is mapped as follows:

§Node Mapping

  • Element ID: {schema}:{table}:{primary_key_value} (e.g., public:users:123)
  • Labels: [{table_name}] (e.g., ["users"])
  • Properties: All columns from the row (column names become property keys)

§WAL Message to SourceChange

WAL OperationSourceChange
INSERTSourceChange::Insert { element: Node }
UPDATESourceChange::Update { element: Node }
DELETESourceChange::Delete { metadata }

§Example Mapping

Given a PostgreSQL table:

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(255),
    age INTEGER
);

INSERT INTO users (name, email, age) VALUES ('Alice', 'alice@example.com', 30);

Produces a SourceChange equivalent to:

{
    "type": "Insert",
    "element": {
        "metadata": {
            "element_id": "public:users:1",
            "source_id": "pg-source",
            "labels": ["users"],
            "effective_from": 1699900000000000
        },
        "properties": {
            "id": 1,
            "name": "Alice",
            "email": "alice@example.com",
            "age": 30
        }
    }
}

§Usage Example

use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceBuilder};
use std::sync::Arc;

let config = PostgresSourceBuilder::new()
    .with_host("db.example.com")
    .with_database("production")
    .with_user("replication_user")
    .with_password("secret")
    .with_tables(vec!["users".to_string(), "orders".to_string()])
    .build();

let source = Arc::new(PostgresReplicationSource::new("pg-source", config)?);
drasi.add_source(source).await?;

Re-exports§

pub use config::PostgresSourceConfig;
pub use config::SslMode;
pub use config::TableKeyConfig;

Modules§

config
Configuration for the PostgreSQL replication source.
connection
decoder
protocol
scram
stream
types

Structs§

PostgresReplicationSource
PostgreSQL replication source that captures changes via logical replication.
PostgresSourceBuilder
Builder for PostgreSQL source configuration.