postgres-notify 0.3.8

Library that makes it easy to subscribe to PostgreSQL notifications
Documentation

Crates.io Documentation Maintenance License: MIT

postgres-notify

A resilient PostgreSQL client wrapper for tokio_postgres with automatic reconnection, query timeouts, and NOTIFY/LISTEN support.

  • Automatic reconnection with exponential backoff and jitter
  • Query timeouts with server-side cancellation
  • PostgreSQL NOTIFY/LISTEN pub/sub notifications
  • RAISE message capture for execution logging

Installation

Add to your Cargo.toml:

[dependencies]
postgres-notify = "0.3"
tokio = { version = "1", features = ["rt", "time"] }
tokio-postgres = "0.7"

Feature Flags

Feature Default Description
chrono Yes Use DateTime<Utc> for timestamps instead of SystemTime
serde Yes Enable serialization for message types
tracing Yes Structured logging via the tracing crate

To disable default features:

[dependencies]
postgres-notify = { version = "0.3", default-features = false }

Quick Start

use postgres_notify::{PGRobustClient, PGRobustClientConfig, PGMessage};
use tokio_postgres::NoTls;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = PGRobustClientConfig::new("postgres://localhost/mydb", NoTls)
        .callback(|msg: PGMessage| println!("{}", msg));

    let mut client = PGRobustClient::spawn(config).await?;

    let rows = client.query("SELECT $1::TEXT", &[&"hello"], Some(Duration::from_secs(5))).await?;
    println!("Result: {}", rows[0].get::<_, String>(0));

    Ok(())
}

Features

LISTEN/NOTIFY

PostgreSQL supports asynchronous notifications via LISTEN/NOTIFY commands. This allows the database to push messages to clients without polling.

use postgres_notify::{PGRobustClient, PGRobustClientConfig, PGMessage};
use tokio_postgres::NoTls;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = PGRobustClientConfig::new("postgres://localhost/mydb", NoTls)
        .callback(|msg: PGMessage| {
            if let PGMessage::Notify(notification) = msg {
                println!("Channel: {}, Payload: {}", notification.channel, notification.payload);
            }
        });

    let mut client = PGRobustClient::spawn(config).await?;

    // Subscribe to channels
    client.subscribe_notify(&["events", "updates"], Some(Duration::from_secs(5))).await?;

    // Subscriptions are automatically restored after reconnection
    // Unsubscribe when done
    client.unsubscribe_notify(&["events"], None).await?;

    Ok(())
}

RAISE/Logs

Capture PostgreSQL RAISE messages during query execution. Useful for debugging stored procedures and functions.

use postgres_notify::{PGRobustClient, PGRobustClientConfig, PGMessage};
use tokio_postgres::NoTls;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = PGRobustClientConfig::new("postgres://localhost/mydb", NoTls)
        .callback(|_| {});

    let mut client = PGRobustClient::spawn(config).await?;

    // Capture logs from a query
    let (result, logs) = client.with_captured_log(async |c| {
        c.simple_query(
            "DO $$ BEGIN RAISE NOTICE 'Processing started'; END; $$",
            Some(Duration::from_secs(5))
        ).await?;
        Ok(())
    }).await?;

    for log in logs {
        println!("{}", log);
    }

    Ok(())
}

Log Levels: DEBUG, LOG, INFO, NOTICE, WARNING, ERROR, FATAL, PANIC

Automatic Reconnection

If the connection is lost, the client automatically reconnects with exponential backoff:

  • Initial delay: 500ms
  • Doubles after each attempt (with jitter)
  • Maximum delay: 1 hour
  • Default max attempts: 10

On reconnection:

  • Connect script is re-executed
  • Channel subscriptions are restored
let config = PGRobustClientConfig::new(url, NoTls)
    .max_reconnect_attempts(5)  // Limit retry attempts
    .callback(|msg| {
        match msg {
            PGMessage::Reconnect { attempts, max_attempts, .. } => {
                println!("Reconnecting: attempt {} of {}", attempts, max_attempts);
            }
            PGMessage::Connected { .. } => println!("Connected!"),
            PGMessage::FailedToReconnect { .. } => println!("Gave up reconnecting"),
            _ => {}
        }
    });

Query Timeouts

All query methods accept an optional timeout. If exceeded, the query is cancelled server-side.

use std::time::Duration;

// With explicit timeout
let rows = client.query("SELECT pg_sleep(10)", &[], Some(Duration::from_secs(1))).await;
// Returns Err(PGError::Timeout(..))

// Use default timeout (1 hour)
let rows = client.query("SELECT 1", &[], None).await?;

// Configure default timeout
let config = PGRobustClientConfig::new(url, NoTls)
    .default_timeout(Duration::from_secs(30));

// Manual cancellation
client.cancel_query().await?;

Configuration

Builder Pattern

use postgres_notify::{PGRobustClientConfig, PGMessage};
use tokio_postgres::NoTls;
use std::time::Duration;

let config = PGRobustClientConfig::new("postgres://user:pass@localhost/db", NoTls)
    // Event callback (required for NOTIFY/RAISE)
    .callback(|msg: PGMessage| println!("{}", msg))

    // Reconnection settings
    .max_reconnect_attempts(10)

    // Default query timeout
    .default_timeout(Duration::from_secs(300))

    // SQL executed on each connection
    .connect_script("SET timezone = 'UTC'; SET statement_timeout = '30s';")

    // PostgreSQL application name
    .application_name("my-service")

    // Pre-configure LISTEN channels
    .subscriptions(["events", "updates"]);

API Overview

Main Types

Type Description
PGRobustClient<TLS> Main client with resilience features
PGRobustClientConfig<TLS> Builder for client configuration
PGMessage Event enum delivered to callbacks
PGError Error type with timeout/reconnect variants
PGResult<T> Alias for Result<T, PGError>

Query Methods

All methods mirror tokio_postgres::Client with an additional timeout parameter:

Method Description
query Execute query, return all rows
query_one Execute query, return exactly one row
query_opt Execute query, return zero or one row
query_raw Execute query with streaming results
execute_raw Execute statement, return affected row count
prepare Prepare a statement
transaction Begin a transaction
simple_query Execute simple (non-parameterized) query
batch_execute Execute multiple statements

Message Types

pub enum PGMessage {
    Notify(PGNotifyMessage),     // NOTIFY received
    Raise(PGRaiseMessage),       // RAISE message received
    Reconnect { .. },            // Reconnection attempt
    Connected { .. },            // Connection established
    Timeout { .. },              // Query timeout occurred
    Cancelled { .. },            // Query cancellation result
    FailedToReconnect { .. },    // Max reconnect attempts reached
    Disconnected { .. },         // Connection lost
}

Error Handling

use postgres_notify::{PGError, PGResult};

async fn example(client: &mut PGRobustClient<NoTls>) -> PGResult<()> {
    match client.query("SELECT 1", &[], None).await {
        Ok(rows) => println!("Got {} rows", rows.len()),
        Err(PGError::Timeout(duration)) => {
            println!("Query timed out after {:?}", duration);
        }
        Err(PGError::FailedToReconnect(attempts)) => {
            println!("Failed to reconnect after {} attempts", attempts);
        }
        Err(PGError::Postgres(e)) => {
            if e.code().map(|c| c.code().starts_with("23")).unwrap_or(false) {
                println!("Constraint violation: {}", e);
            }
        }
        Err(e) => return Err(e),
    }
    Ok(())
}

Best Practices

Callback Safety

The callback runs in a background tokio task. If it panics:

  • The internal message log becomes inaccessible
  • The connection polling task terminates

Recommendation: Never panic in callbacks. Use std::panic::catch_unwind if calling untrusted code.

Security

The connect_script and application_name values are interpolated directly into SQL. Do not pass untrusted user input to these methods.

Performance

  • This is a single-connection client, not a connection pool
  • query, query_one, and query_opt clone parameters internally
  • For bulk operations, consider query_raw or execute_raw

TLS Configuration

For production use, configure TLS:

// Without TLS (development only)
use tokio_postgres::NoTls;
let config = PGRobustClientConfig::new(url, NoTls);

// With rustls
// Add: tokio-postgres-rustls = "0.12"
use tokio_postgres_rustls::MakeRustlsConnect;
let tls = MakeRustlsConnect::new(rustls_config);
let config = PGRobustClientConfig::new(url, tls);

// With native-tls
// Add: postgres-native-tls = "0.5"
use postgres_native_tls::MakeTlsConnector;
let tls = MakeTlsConnector::new(native_tls_connector);
let config = PGRobustClientConfig::new(url, tls);

Requirements

  • Rust 1.85+ (Edition 2024)
  • Tokio runtime with rt and time features
  • PostgreSQL 9.0+ (LISTEN/NOTIFY support)

Changelog

See CHANGELOG.md for a detailed history of changes.

License

MIT License - see LICENSE for details.