Outbox Postgres
The official PostgreSQL storage backend for outbox-core.
This crate leverages sqlx to provide a robust, concurrency-safe, and real-time implementation of the Transactional Outbox pattern for PostgreSQL.
Key Features
- ACID Guarantees: Use
PostgresWriterwith ansqlx::Transactionto save your business data and outbox events in the exact same database transaction. - Concurrency Safe: Uses Postgres'
FOR UPDATE SKIP LOCKEDmechanism to safely allow multiple outbox workers to process events concurrently without stepping on each other's toes. - Instant Processing: Native support for PostgreSQL
LISTEN/NOTIFY. ThePostgresOutboxlistens for DB triggers to wake up and process events instantly, minimizing latency and falling back to polling only as a safety net. - Type-Safe JSONB: Seamlessly serializes your strongly-typed generic domain events (
Event<P>) into PostgreSQLjsonbcolumns. - Built-in Garbage Collection: Automatically cleans up old, successfully processed messages to prevent your outbox table from growing indefinitely.
- Dead Letter Queue (feature
dlq): ProvidesOutboxStorage::quarantine_events— atomic move from the active outbox table into a dedicateddead_letter_outbox_eventstable in a single transaction.
Installation
Add this to your Cargo.toml:
[]
= "0.4"
= { = "0.2", = ["dlq"] } # drop the feature if you don't need DLQ
= { = "0.8.6", = ["postgres", "runtime-tokio", "macros", "uuid", "time"] }
Database Schema (Migrations)
Since this crate uses sqlx, you need to set up the outbox_events table in your database. You can add the following to your migrations/ folder.
create type status as enum (
'Pending',
'Processing',
'Sent'
);
create table outbox_events
(
id uuid primary key default gen_random_uuid(),
idempotency_token text default null,
event_type text not null,
payload jsonb not null,
status status not null default 'Pending',
created_at timestamptz not null default now(),
locked_until timestamptz not null default '-infinity'
);
CREATE INDEX idx_outbox_processing_queue
ON outbox_events (locked_until ASC, status)
WHERE status IN ('Pending', 'Processing');
CREATE UNIQUE INDEX idx_outbox_idempotency
ON outbox_events (idempotency_token);
create or replace function notify_outbox_event() returns trigger as
$$
begin
perform pg_notify('outbox_event', 'ping');
return new;
end;
$$ language plpgsql;
create trigger outbox_events_notify_trigger
after insert or update
on outbox_events
for each row
execute function notify_outbox_event();
DLQ table (feature dlq)
The DLQ migration ships with the crate (see migrations/20260425120000_dlq.sql). It adds the outbox_dead_letters table that quarantine_events writes to. Rows arrive here exclusively via the DLQ reaper — workers never read from it, it is meant to be inspected (and cleaned up) by operators.
create table outbox_dead_letters
(
id uuid primary key,
idempotency_token text default null,
event_type text not null,
payload jsonb not null,
original_status status not null,
created_at timestamptz not null,
locked_until timestamptz not null,
failure_count integer not null,
quarantined_at timestamptz not null default now(),
last_error text default null
);
create index idx_outbox_dlq_quarantined_at
on outbox_dead_letters (quarantined_at desc);
create index idx_outbox_dlq_event_type
on outbox_dead_letters (event_type);
Apply it together with the base outbox migration if you enable the dlq feature.
Usage
Initialize Storage for the Manager
To run the background worker, initialize the PostgresOutbox with your PgPool and pass it to your OutboxManager.
use *;
use PostgresOutbox;
use PgPool;
use Arc;
// Assuming MyEvent is your strongly-typed domain enum
let pool = connect.await?;
let config = new; // Configure as needed
let postgres_storage = new;
// Pass postgres_storage to OutboxManager::new(...)