outbox-postgres 0.1.2

PostgreSQL storage implementation for oxide-outbox using sqlx
Documentation

Oxide Outbox 🦀

Crates.io License: MIT

A high-performance, flexible implementation of the Transactional Outbox pattern for Rust. Ensure reliable message delivery in your distributed systems by decoupling database transactions from event publishing.

Key Features

  • Hybrid Event Discovery: Combines real-time database notifications (e.g., Postgres LISTEN/NOTIFY) with fallback polling intervals to ensure zero lost events.
  • Trait-First Architecture: Completely decoupled from specific storage or message brokers. Switch between Postgres, MySQL, Kafka, or RabbitMQ by implementing simple traits.
  • Built-in Garbage Collection: Automatic cleanup of processed events to prevent table bloat.
  • Concurrency Safe: Designed for horizontal scaling with support for row-level locking.
  • Async Native: Built from the ground up on tokio.

Project Structure

  • outbox-core: The heart of the library containing core logic, traits, and the OutboxManager.
  • outbox-postgres: Default implementation for PostgreSQL using sqlx.

Installation

Add this to your Cargo.toml:

[dependencies]

outbox-core = "0.1"
outbox-postgres = "0.1" # If using Postgres

How It Works

Oxide Outbox uses a dual-trigger mechanism to process events:

  • Notification Trigger: The manager listens for specific DB signals (like NOTIFY in Postgres). When a transaction commits an outbox entry, the worker wakes up immediately.
  • Interval Trigger: A safety net that periodically checks for "stale" events that might have been missed during network blips or worker restarts.
  • Automatic GC: A background task periodically removes successfully processed events based on your retention policy.

Reliability & Resilience

  • At-Least-Once Delivery: Messages are guaranteed to be delivered at least once. If a worker fails while processing an event, the message remains in the database.
  • Lazy Retry (Visibility Timeout): When an event is picked up, it is assigned a lock_until timestamp. If the worker doesn't mark it as completed within the lock_timeout_mins (e.g., due to a crash), the event automatically becomes visible again for the next polling cycle or notification.
  • Transactional Integrity: Since the outbox table lives in your business database, events are saved within the same ACID transaction as your business logic, ensuring they are never lost or orphaned.

Quick Start

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing_subscriber::fmt().with_max_level(Level::TRACE).init();

    let pool = PgPool::connect("postgresql://postgres:mysecretpassword@localhost:5432/outbox").await?;
    let config = Arc::new(OutboxConfig {
        batch_size: 100,
        retention_days: 0,
        gc_interval_secs: 10,
        poll_interval_secs: 10,
        lock_timeout_mins: 5,
    });

    let storage = PostgresOutbox::new(pool.clone(), config.clone());
    let writer = PostgresWriter(pool.clone());

    let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<Message>();
    let publisher = TokioEventPublisher(sender);

    let mut outbox = OutboxManager::new(storage, publisher, config.clone());

    tokio::spawn(async move {
        if let Err(e) = outbox.run().await {
            error!("Outbox critical error: {}", e);
        }
    });

    tokio::spawn(async move {
        while let Some(msg) = receiver.recv().await {
            info!("Event received in Broker: type={:?}, payload={:?}", msg.0, msg.1);
        }
    });
    
    add_event(writer, "OrderCreated", serde_json::json!({"id": 123})).await?;
    tokio::time::sleep(Duration::from_mins(5)).await;
    Ok(())
}
struct Message(EventType, Payload);

#[derive(Clone)]
struct TokioEventPublisher(tokio::sync::mpsc::UnboundedSender<Message>);
#[async_trait::async_trait]
impl EventPublisher for TokioEventPublisher {
    async fn publish(&self, event_type: EventType, payload: Payload) -> Result<(), OutboxError> {
        self.0.send(Message(event_type, payload)).map_err(|e| OutboxError::InfrastructureError(e.to_string()))
    }
}