PostgreSQL Stored Procedure Reaction
A Drasi reaction plugin that invokes PostgreSQL stored procedures when continuous query results change.
Overview
The PostgreSQL Stored Procedure reaction enables you to:
- Execute different stored procedures for ADD, UPDATE, and DELETE operations
- Map query result fields to stored procedure parameters using
@fieldNamesyntax - Handle multiple queries with a single reaction
- Automatically retry failed procedure calls with exponential backoff
- Configure connection parameters and timeouts
Installation
Add the dependency to your Cargo.toml:
[]
= { = "path/to/drasi-core/components/reactions/storedproc-postgres" }
Quick Start
1. Create Stored Procedures in PostgreSQL
(
p_id INTEGER,
p_name TEXT,
p_email TEXT
)
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO users_sync (id, name, email)
VALUES (p_id, p_name, p_email);
END;
$$;
2. Create the Reaction
use ;
use DrasiLib;
async
Configuration
Builder API
Traditional Username/Password Authentication
let reaction = builder
.with_hostname
.with_port
.with_database
.with_user
.with_password
.with_ssl // Enable SSL/TLS
.with_query
.with_default_template
.with_command_timeout_ms
.with_retry_attempts
.build
.await?;
Cloud Identity Provider Authentication
For cloud-managed PostgreSQL databases, you can use identity providers instead of passwords:
Azure AD Authentication (Azure Database for PostgreSQL):
use AzureIdentityProvider;
// For Azure Kubernetes Service with Workload Identity
let identity_provider = with_workload_identity?;
// For local development or Azure VMs with Managed Identity
let identity_provider = with_default_credentials.await?;
let reaction = builder
.with_hostname
.with_port
.with_database
.with_identity_provider
.with_ssl // Required for Azure
.with_query
.with_default_template
.build
.await?;
AWS IAM Authentication (Amazon RDS/Aurora):
use AwsIdentityProvider;
// Using IAM user credentials
let identity_provider = new.await?;
// Or assuming an IAM role
let identity_provider = with_assumed_role.await?;
let reaction = builder
.with_hostname
.with_port
.with_database
.with_identity_provider
.with_ssl // Recommended for RDS
.with_query
.with_default_template
.build
.await?;
Password Provider (programmatic username/password):
use PasswordIdentityProvider;
let identity_provider = new;
let reaction = builder
.with_hostname
.with_port
.with_database
.with_identity_provider
.with_query
.with_default_template
.build
.await?;
Note: When using identity providers, do not call
.with_user()or.with_password(). The identity provider handles authentication automatically.See the Identity Provider README for detailed setup instructions for Azure AD and AWS IAM authentication.
Configuration Options
| Option | Description | Type | Default |
|---|---|---|---|
hostname |
Database hostname | String |
"localhost" |
port |
Database port | u16 |
5432 |
user |
Database user | String |
Required |
password |
Database password | String |
Required |
database |
Database name | String |
Required |
ssl |
Enable SSL/TLS | bool |
false |
default_template |
Default templates for all queries | Option<QueryConfig> |
None |
routes |
Query-specific template overrides | HashMap<String, QueryConfig> |
Empty |
command_timeout_ms |
Command timeout | u64 |
30000 |
retry_attempts |
Number of retries | u32 |
3 |
Parameter Mapping
Templates use the @ syntax to reference fields from query results. The reaction provides different data contexts based on the operation type:
- ADD operations: Use
@after.fieldto access the new data - UPDATE operations: Use
@after.fieldfor new data,@before.fieldfor old data - DELETE operations: Use
@before.fieldto access the deleted data
Example
.with_default_template
Query result for ADD operation:
Executes:
CALL add_user(1, 'Alice', 'alice@example.com')
Nested Field Access
Access nested fields using dot notation:
new
Per-Query Templates
You can configure different templates for specific queries using the routes field or the builder's with_route method:
use HashMap;
let mut routes = new;
routes.insert;
let reaction = builder
.with_hostname
.with_database
.with_user
.with_password
.with_query
.with_route
.build
.await?;
Advanced Example: Partial Route Overrides
This example shows how to override only specific operations for a query while falling back to defaults for others:
use ;
let reaction = builder
.with_hostname
.with_port
.with_database
.with_user
.with_password
// Subscribe to multiple queries
.with_query
.with_query
.with_query
// Default template - applies to "high-temp" and "low-temp"
.with_default_template
// Custom route for critical temperature readings
// Only handles ADD operations, falls back to default for UPDATE/DELETE
.with_route
.with_command_timeout_ms
.with_retry_attempts
.build
.await?;
How it works:
- "high-temp" and "low-temp" queries → Use default template for all operations
- "critical-temp" query:
- ADD:
CALL log_critical_alert(...)(custom route) - UPDATE:
CALL log_sensor_updated(...)(falls back to default) - DELETE:
CALL log_sensor_deleted(...)(falls back to default)
- ADD:
Advanced Example: Multiple Queries with Default and Custom Routes
This example shows a reaction handling multiple queries with different stored procedure requirements:
use ;
// Create a reaction that:
// 1. Subscribes to 3 different queries: "user-changes", "product-changes", "order-changes"
// 2. Has a default template for most operations
// 3. Overrides only the "product-changes" query with custom procedures
let reaction = builder
.with_hostname
.with_port
.with_database
.with_user
.with_password
// Subscribe to multiple queries
.with_query
.with_query
.with_query
// Default template applies to "user-changes" and "order-changes"
.with_default_template
// Override "product-changes" with specific procedures
.with_route
.with_command_timeout_ms
.with_retry_attempts
.build
.await?;
How it works:
-
"user-changes" query → Uses default template
- Add:
CALL log_entity_added(@after.id, @after.type) - Update:
CALL log_entity_updated(@after.id, @after.type) - Delete:
CALL log_entity_deleted(@before.id, @before.type)
- Add:
-
"product-changes" query → Uses custom route (with fallback to default for delete)
- Add:
CALL sync_product_added(@after.product_id, @after.name, @after.price, @after.inventory) - Update:
CALL sync_product_updated(@after.product_id, @after.price, @after.inventory) - Delete:
CALL log_entity_deleted(@before.id, @before.type)(falls back to default)
- Add:
-
"order-changes" query → Uses default template
- Add:
CALL log_entity_added(@after.id, @after.type) - Update:
CALL log_entity_updated(@after.id, @after.type) - Delete:
CALL log_entity_deleted(@before.id, @before.type)
- Add:
Note: If a route specifies None for an operation (like deleted: None for product-changes), the reaction will check the default template. If the default template also has None for that operation, no procedure will be called.
Error Handling
The reaction includes automatic retry logic with exponential backoff:
- Initial retry: 100ms delay
- Subsequent retries: 200ms, 400ms, 800ms, etc.
- Max retries: Configurable (default: 3)
- Timeout: Configurable per command (default: 30s)
Plugin Packaging
This reaction is compiled as a dynamic plugin (cdylib) that can be loaded by drasi-server at runtime.
Key files:
Cargo.toml— includescrate-type = ["lib", "cdylib"]src/descriptor.rs— implementsReactionPluginDescriptorwith kind"storedproc-postgres", configuration DTO, and OpenAPI schema generationsrc/lib.rs— invokesdrasi_plugin_sdk::export_plugin!to export the plugin entry point
Building:
The compiled .so (Linux) / .dylib (macOS) / .dll (Windows) is placed in target/debug/ and can be copied to the server's plugins/ directory.
For more details on the plugin descriptor pattern and configuration DTOs, see the Reaction Developer Guide.
License
Copyright 2025 The Drasi Authors.
Licensed under the Apache License, Version 2.0.