drasi-reaction-storedproc-postgres 0.2.8

PostgreSQL Stored Procedure reaction plugin for Drasi
Documentation

PostgreSQL Stored Procedure Reaction

A Drasi reaction plugin that invokes PostgreSQL stored procedures when continuous query results change.

Overview

The PostgreSQL Stored Procedure reaction enables you to:

  • Execute different stored procedures for ADD, UPDATE, and DELETE operations
  • Map query result fields to stored procedure parameters using @fieldName syntax
  • Handle multiple queries with a single reaction
  • Automatically retry failed procedure calls with exponential backoff
  • Configure connection parameters and timeouts

Installation

Add the dependency to your Cargo.toml:

[dependencies]
drasi-reaction-storedproc-postgres = { path = "path/to/drasi-core/components/reactions/storedproc-postgres" }

Quick Start

1. Create Stored Procedures in PostgreSQL

CREATE OR REPLACE PROCEDURE add_user(
    p_id INTEGER,
    p_name TEXT,
    p_email TEXT
)
LANGUAGE plpgsql
AS $$
BEGIN
    INSERT INTO users_sync (id, name, email)
    VALUES (p_id, p_name, p_email);
END;
$$;

2. Create the Reaction

use drasi_reaction_storedproc_postgres::{PostgresStoredProcReaction, QueryConfig, TemplateSpec};
use drasi_lib::DrasiLib;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let reaction = PostgresStoredProcReaction::builder("user-sync")
        .with_connection(
            "localhost",
            5432,
            "mydb",
            "postgres",
            "password"
        )
        .with_query("user-changes")
        .with_default_template(QueryConfig {
            added: Some(TemplateSpec::new("CALL add_user(@after.id, @after.name, @after.email)")),
            updated: Some(TemplateSpec::new("CALL update_user(@after.id, @after.name, @after.email)")),
            deleted: Some(TemplateSpec::new("CALL delete_user(@before.id)")),
        })
        .build()
        .await?;

    let drasi = DrasiLib::builder()
        .with_id("my-app")
        .with_reaction(reaction)
        .build()
        .await?;

    drasi.start().await?;
    tokio::signal::ctrl_c().await?;

    Ok(())
}

Configuration

Builder API

Traditional Username/Password Authentication

let reaction = PostgresStoredProcReaction::builder("my-reaction")
    .with_hostname("localhost")
    .with_port(5432)
    .with_database("mydb")
    .with_user("postgres")
    .with_password("secret")
    .with_ssl(true)  // Enable SSL/TLS
    .with_query("query1")
    .with_default_template(QueryConfig {
        added: Some(TemplateSpec::new("CALL add_record(@after.id, @after.name)")),
        updated: Some(TemplateSpec::new("CALL update_record(@after.id, @after.name)")),
        deleted: Some(TemplateSpec::new("CALL delete_record(@before.id)")),
    })
    .with_command_timeout_ms(30000)
    .with_retry_attempts(3)
    .build()
    .await?;

Cloud Identity Provider Authentication

For cloud-managed PostgreSQL databases, you can use identity providers instead of passwords:

Azure AD Authentication (Azure Database for PostgreSQL):

use drasi_lib::identity::AzureIdentityProvider;

// For Azure Kubernetes Service with Workload Identity
let identity_provider = AzureIdentityProvider::with_workload_identity("myuser@myserver")?;

// For local development or Azure VMs with Managed Identity
let identity_provider = AzureIdentityProvider::with_default_credentials("myuser@myserver").await?;

let reaction = PostgresStoredProcReaction::builder("my-reaction")
    .with_hostname("myserver.postgres.database.azure.com")
    .with_port(5432)
    .with_database("mydb")
    .with_identity_provider(identity_provider)
    .with_ssl(true)  // Required for Azure
    .with_query("query1")
    .with_default_template(QueryConfig {
        added: Some(TemplateSpec::new("CALL add_record(@after.id, @after.name)")),
        updated: Some(TemplateSpec::new("CALL update_record(@after.id, @after.name)")),
        deleted: Some(TemplateSpec::new("CALL delete_record(@before.id)")),
    })
    .build()
    .await?;

AWS IAM Authentication (Amazon RDS/Aurora):

use drasi_lib::identity::AwsIdentityProvider;

// Using IAM user credentials
let identity_provider = AwsIdentityProvider::new(
    "myuser",
    "mydb.rds.amazonaws.com",
    5432
).await?;

// Or assuming an IAM role
let identity_provider = AwsIdentityProvider::with_assumed_role(
    "myuser",
    "mydb.rds.amazonaws.com",
    5432,
    "arn:aws:iam::123456789012:role/RDSAccessRole",
    None
).await?;

let reaction = PostgresStoredProcReaction::builder("my-reaction")
    .with_hostname("mydb.rds.amazonaws.com")
    .with_port(5432)
    .with_database("mydb")
    .with_identity_provider(identity_provider)
    .with_ssl(true)  // Recommended for RDS
    .with_query("query1")
    .with_default_template(QueryConfig {
        added: Some(TemplateSpec::new("CALL add_record(@after.id, @after.name)")),
        updated: Some(TemplateSpec::new("CALL update_record(@after.id, @after.name)")),
        deleted: Some(TemplateSpec::new("CALL delete_record(@before.id)")),
    })
    .build()
    .await?;

Password Provider (programmatic username/password):

use drasi_lib::identity::PasswordIdentityProvider;

let identity_provider = PasswordIdentityProvider::new("postgres", "secret");

let reaction = PostgresStoredProcReaction::builder("my-reaction")
    .with_hostname("localhost")
    .with_port(5432)
    .with_database("mydb")
    .with_identity_provider(identity_provider)
    .with_query("query1")
    .with_default_template(QueryConfig {
        added: Some(TemplateSpec::new("CALL add_record(@after.id, @after.name)")),
        updated: Some(TemplateSpec::new("CALL update_record(@after.id, @after.name)")),
        deleted: Some(TemplateSpec::new("CALL delete_record(@before.id)")),
    })
    .build()
    .await?;

Note: When using identity providers, do not call .with_user() or .with_password(). The identity provider handles authentication automatically.

See the Identity Provider README for detailed setup instructions for Azure AD and AWS IAM authentication.

Configuration Options

Option Description Type Default
hostname Database hostname String "localhost"
port Database port u16 5432
user Database user String Required
password Database password String Required
database Database name String Required
ssl Enable SSL/TLS bool false
default_template Default templates for all queries Option<QueryConfig> None
routes Query-specific template overrides HashMap<String, QueryConfig> Empty
command_timeout_ms Command timeout u64 30000
retry_attempts Number of retries u32 3

Parameter Mapping

Templates use the @ syntax to reference fields from query results. The reaction provides different data contexts based on the operation type:

  • ADD operations: Use @after.field to access the new data
  • UPDATE operations: Use @after.field for new data, @before.field for old data
  • DELETE operations: Use @before.field to access the deleted data

Example

.with_default_template(QueryConfig {
    added: Some(TemplateSpec::new("CALL add_user(@after.id, @after.name, @after.email)")),
    updated: Some(TemplateSpec::new("CALL update_user(@after.id, @after.name, @after.email)")),
    deleted: Some(TemplateSpec::new("CALL delete_user(@before.id)")),
})

Query result for ADD operation:

{
  "id": 1,
  "name": "Alice",
  "email": "alice@example.com"
}

Executes:

CALL add_user(1, 'Alice', 'alice@example.com')

Nested Field Access

Access nested fields using dot notation:

TemplateSpec::new("CALL add_address(@after.user.id, @after.address.city)")

Per-Query Templates

You can configure different templates for specific queries using the routes field or the builder's with_route method:

use std::collections::HashMap;

let mut routes = HashMap::new();
routes.insert("user-query".to_string(), QueryConfig {
    added: Some(TemplateSpec::new("CALL user_added(@after.id, @after.name)")),
    updated: None,
    deleted: None,
});

let reaction = PostgresStoredProcReaction::builder("my-reaction")
    .with_hostname("localhost")
    .with_database("mydb")
    .with_user("postgres")
    .with_password("secret")
    .with_query("user-query")
    .with_route("user-query", QueryConfig {
        added: Some(TemplateSpec::new("CALL user_added(@after.id, @after.name)")),
        ..Default::default()  // updated and deleted will use default template
    })
    .build()
    .await?;

Advanced Example: Partial Route Overrides

This example shows how to override only specific operations for a query while falling back to defaults for others:

use drasi_reaction_storedproc_postgres::{PostgresStoredProcReaction, QueryConfig, TemplateSpec};

let reaction = PostgresStoredProcReaction::builder("multi-query-sensor-sync")
    .with_hostname("localhost")
    .with_port(5432)
    .with_database("drasi_test")
    .with_user("postgres")
    .with_password("mysecret")
    // Subscribe to multiple queries
    .with_query("high-temp")
    .with_query("low-temp")
    .with_query("critical-temp")
    // Default template - applies to "high-temp" and "low-temp"
    .with_default_template(QueryConfig {
        added: Some(TemplateSpec::new(
            "CALL log_sensor_added(@after.id, @after.temperature, @after.timestamp)"
        )),
        updated: Some(TemplateSpec::new(
            "CALL log_sensor_updated(@after.id, @after.temperature)"
        )),
        deleted: Some(TemplateSpec::new(
            "CALL log_sensor_deleted(@before.id)"
        )),
    })
    // Custom route for critical temperature readings
    // Only handles ADD operations, falls back to default for UPDATE/DELETE
    .with_route("critical-temp", QueryConfig {
        added: Some(TemplateSpec::new(
            "CALL log_critical_alert(@after.id, @after.temperature, @after.timestamp)"
        )),
        ..Default::default()  // updated and deleted will use default template
    })
    .with_command_timeout_ms(5000)
    .with_retry_attempts(3)
    .build()
    .await?;

How it works:

  • "high-temp" and "low-temp" queries → Use default template for all operations
  • "critical-temp" query:
    • ADD: CALL log_critical_alert(...) (custom route)
    • UPDATE: CALL log_sensor_updated(...) (falls back to default)
    • DELETE: CALL log_sensor_deleted(...) (falls back to default)

Advanced Example: Multiple Queries with Default and Custom Routes

This example shows a reaction handling multiple queries with different stored procedure requirements:

use drasi_reaction_storedproc_postgres::{PostgresStoredProcReaction, QueryConfig, TemplateSpec};

// Create a reaction that:
// 1. Subscribes to 3 different queries: "user-changes", "product-changes", "order-changes"
// 2. Has a default template for most operations
// 3. Overrides only the "product-changes" query with custom procedures

let reaction = PostgresStoredProcReaction::builder("multi-query-sync")
    .with_hostname("localhost")
    .with_port(5432)
    .with_database("mydb")
    .with_user("postgres")
    .with_password("secret")
    // Subscribe to multiple queries
    .with_query("user-changes")
    .with_query("product-changes")
    .with_query("order-changes")
    // Default template applies to "user-changes" and "order-changes"
    .with_default_template(QueryConfig {
        added: Some(TemplateSpec::new("CALL log_entity_added(@after.id, @after.type)")),
        updated: Some(TemplateSpec::new("CALL log_entity_updated(@after.id, @after.type)")),
        deleted: Some(TemplateSpec::new("CALL log_entity_deleted(@before.id, @before.type)")),
    })
    // Override "product-changes" with specific procedures
    .with_route("product-changes", QueryConfig {
        added: Some(TemplateSpec::new(
            "CALL sync_product_added(@after.product_id, @after.name, @after.price, @after.inventory)"
        )),
        updated: Some(TemplateSpec::new(
            "CALL sync_product_updated(@after.product_id, @after.price, @after.inventory)"
        )),
        ..Default::default()  // deleted will fall back to default template
    })
    .with_command_timeout_ms(5000)
    .with_retry_attempts(3)
    .build()
    .await?;

How it works:

  1. "user-changes" query → Uses default template

    • Add: CALL log_entity_added(@after.id, @after.type)
    • Update: CALL log_entity_updated(@after.id, @after.type)
    • Delete: CALL log_entity_deleted(@before.id, @before.type)
  2. "product-changes" query → Uses custom route (with fallback to default for delete)

    • Add: CALL sync_product_added(@after.product_id, @after.name, @after.price, @after.inventory)
    • Update: CALL sync_product_updated(@after.product_id, @after.price, @after.inventory)
    • Delete: CALL log_entity_deleted(@before.id, @before.type) (falls back to default)
  3. "order-changes" query → Uses default template

    • Add: CALL log_entity_added(@after.id, @after.type)
    • Update: CALL log_entity_updated(@after.id, @after.type)
    • Delete: CALL log_entity_deleted(@before.id, @before.type)

Note: If a route specifies None for an operation (like deleted: None for product-changes), the reaction will check the default template. If the default template also has None for that operation, no procedure will be called.

Error Handling

The reaction includes automatic retry logic with exponential backoff:

  • Initial retry: 100ms delay
  • Subsequent retries: 200ms, 400ms, 800ms, etc.
  • Max retries: Configurable (default: 3)
  • Timeout: Configurable per command (default: 30s)

Plugin Packaging

This reaction is compiled as a dynamic plugin (cdylib) that can be loaded by drasi-server at runtime.

Key files:

  • Cargo.toml — includes crate-type = ["lib", "cdylib"]
  • src/descriptor.rs — implements ReactionPluginDescriptor with kind "storedproc-postgres", configuration DTO, and OpenAPI schema generation
  • src/lib.rs — invokes drasi_plugin_sdk::export_plugin! to export the plugin entry point

Building:

cargo build -p drasi-reaction-storedproc-postgres

The compiled .so (Linux) / .dylib (macOS) / .dll (Windows) is placed in target/debug/ and can be copied to the server's plugins/ directory.

For more details on the plugin descriptor pattern and configuration DTOs, see the Reaction Developer Guide.

License

Copyright 2025 The Drasi Authors.

Licensed under the Apache License, Version 2.0.