pgpubsub 2.0.0

Async PostgreSQL LISTEN/NOTIFY pub/sub client built on tokio-postgres
Documentation
use tokio_postgres::{
    tls::{MakeTlsConnect, TlsConnect},
    Socket,
};

use crate::{
    pg_pubsub_connection::{PgPubSubConnection, PubSubError, Subscription},
    PgPubSubOptions,
};

/// Client for PostgreSQL LISTEN/NOTIFY pub/sub.
///
/// Created via [`PgPubSub::connect`]. Use [`listen`](PgPubSub::listen) to subscribe to channels
/// and [`notify`](PgPubSub::notify) to publish messages.
pub struct PgPubSub {
    connection: PgPubSubConnection,
}

impl PgPubSub {
    /// Connects to PostgreSQL and spawns a background listener task.
    ///
    /// Use [`PgPubSubOptionsBuilder`](crate::PgPubSubOptionsBuilder) to construct the options.
    ///
    /// If the connection later drops, the background task reconnects automatically with
    /// exponential backoff and re-issues LISTEN for every channel that has active
    /// subscriptions, so [`Subscription`]s keep working across reconnects. Notifications
    /// published while the connection was down are lost (PostgreSQL's NOTIFY keeps no
    /// backlog), and `listen`/`notify` calls made during the outage fail with an error
    /// rather than waiting for the reconnect.
    pub async fn connect<T>(options: PgPubSubOptions<T>) -> Result<Self, tokio_postgres::Error>
    where
        T: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
        <T as MakeTlsConnect<Socket>>::Stream: Send + 'static,
        <T as MakeTlsConnect<Socket>>::TlsConnect: Send,
        <<T as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
    {
        let connection = PgPubSubConnection::connect(options).await?;
        Ok(Self { connection })
    }

    /// Subscribes to a PostgreSQL notification channel.
    ///
    /// Returns a [`Subscription`] that receives notifications. The channel is automatically
    /// unlistened when all subscriptions for it are dropped. Channel names must be 1-63 bytes.
    pub async fn listen(&self, channel: &str) -> Result<Subscription, PubSubError> {
        self.connection.listen(channel).await
    }

    /// Sends a NOTIFY on the given channel with an optional payload.
    ///
    /// The payload must be shorter than 8000 bytes (PostgreSQL's limit); longer payloads
    /// are rejected with [`PubSubError::InvalidPayload`] without contacting the server.
    pub async fn notify(&self, channel: &str, payload: Option<&str>) -> Result<(), PubSubError> {
        self.connection.notify(channel, payload).await
    }

    /// Sends a batch of NOTIFY commands in a single round-trip. The batch executes in
    /// one implicit transaction, so it is delivered atomically (either every
    /// notification reaches subscribers or none of them do). Empty input is a no-op.
    ///
    /// Useful when one publisher fans out many notifications quickly: each call is one
    /// round-trip rather than N.
    pub async fn notify_batch(&self, items: &[(&str, Option<&str>)]) -> Result<(), PubSubError> {
        self.connection.notify_batch(items).await
    }
}