# Transactional Outbox
Reliable async message production with per-partition ordering guarantees.
Supports PostgreSQL, MySQL/MariaDB, and SQLite.
Four-stage pipeline: enqueue (inside your transaction) -> sequencer
(assigns per-partition sequence numbers) -> processor (calls your handler)
-> vacuum (GC). Two processing modes: transactional (exactly-once) and
leased (at-least-once with lease-based locking and framework-managed
cancellation).
## Usage
### Single-message handler (leased)
```rust
struct OrderHandler {
client: HttpClient,
}
#[async_trait]
impl LeasedMessageHandler for OrderHandler {
async fn handle(&self, msg: &OutboxMessage) -> MessageResult {
let order: Order = match serde_json::from_slice(&msg.payload) {
Ok(o) => o,
Err(e) => return MessageResult::Reject(format!("bad payload: {e}")),
};
match self.client.post(&warehouse_url).json(&order).send().await {
Ok(resp) if resp.status().is_success() => MessageResult::Ok,
Ok(_) | Err(_) => MessageResult::Retry,
}
}
}
let handle = Outbox::builder(db)
.profile(OutboxProfile::low_latency())
.queue("orders", Partitions::of(4))
.leased(OrderHandler { client })
.start().await?;
```
### Transactional handler (exactly-once with DB writes)
```rust
struct AuditHandler;
#[async_trait]
impl TransactionalMessageHandler for AuditHandler {
async fn handle(
&self,
txn: &dyn ConnectionTrait,
msg: &OutboxMessage,
_cancel: CancellationToken,
) -> HandlerResult {
// DB writes here are atomic with the ack
if let Err(e) = audit_log::ActiveModel { payload: Set(msg.payload.clone()), .. }
.insert(txn).await
{
return HandlerResult::Retry { reason: format!("insert failed: {e}") };
}
HandlerResult::Success
}
}
let handle = Outbox::builder(db)
.queue("audit", Partitions::of(2))
.transactional(AuditHandler)
.start().await?;
```
### Enqueue (inside a business transaction)
```rust
let outbox = handle.outbox();
// Atomic with your business logic:
outbox.enqueue( & txn, "orders", partition, payload, "application/json").await?;
// Batch enqueue:
outbox.enqueue_batch( & txn, "orders", & [
EnqueueMessage { partition: 0, payload: p1, payload_type: "application/json" },
EnqueueMessage { partition: 1, payload: p2, payload_type: "application/json" },
]).await?;
```
### Multi-queue with tuning
```rust
let handle = Outbox::builder(db)
.profile(OutboxProfile::high_throughput())
.sequencer_tuning(WorkerTuning::sequencer_high_throughput().batch_size(500))
.vacuum_tuning(WorkerTuning::vacuum().idle_interval(Duration::from_secs(300)))
.queue("orders", Partitions::of(16))
.leased(OrderHandler { client: client.clone() })
.queue("notifications", Partitions::of(4))
.leased(NotifyHandler { client })
.lease(LeaseConfig {
duration: Duration::from_secs(60),
headroom: Duration::from_secs(5),
})
.start().await?;
// Graceful shutdown
handle.stop().await;
```
---
## Use-Case Scenarios
### Handler makes a remote HTTP call
The most common case. Implement `LeasedMessageHandler` - one message, one
result. Use `HttpClient` from `cf-modkit-http` for the outgoing call.
**Cancellation is framework-managed.** The processor drops the handler
future when the lease cancel point is reached (`lease_duration - ack_headroom`).
In-flight `HttpClient` calls are cancelled via drop - hyper closes the
connection.
```rust
use modkit_db::outbox::{LeasedMessageHandler, MessageResult, OutboxMessage};
use modkit_http::HttpClient;
pub struct WebhookHandler {
client: HttpClient,
url: String,
}
#[async_trait::async_trait]
impl LeasedMessageHandler for WebhookHandler {
async fn handle(&self, msg: &OutboxMessage) -> MessageResult {
let event: Event = match serde_json::from_slice(&msg.payload) {
Ok(e) => e,
Err(e) => return MessageResult::Reject(format!("bad payload: {e}")),
};
// HttpClient handles per-attempt timeout (30s) and retries (3x).
// The Idempotency-Key header enables POST retry in the retry layer.
let idempotency_key = format!("{}:{}", msg.partition_id, msg.seq);
match self.client
.post(&self.url)
.header("Idempotency-Key", &idempotency_key)
.json(&event)
.send()
.await
{
Ok(resp) if resp.status().is_success() => MessageResult::Ok,
Ok(_) | Err(_) => MessageResult::Retry,
}
}
}
```
**Idempotency is required.** Leased processing provides at-least-once
delivery. If the lease expires before the ack transaction, the message is
re-delivered. The `Idempotency-Key` header (derived from `partition_id`
and `seq`) also enables `HttpClient` retry for POST/PATCH requests.
### Handler makes multiple sequential calls
When a single message requires several calls (enrich -> transform ->
publish), use `tokio::time::timeout_at` with a shared deadline:
```rust
#[async_trait::async_trait]
impl LeasedMessageHandler for PipelineHandler {
async fn handle(&self, msg: &OutboxMessage) -> MessageResult {
let deadline = tokio::time::Instant::now() + Duration::from_secs(20);
let enriched = match tokio::time::timeout_at(
deadline,
self.enrichment_api.enrich(msg),
).await {
Ok(Ok(data)) => data,
_ => return MessageResult::Retry,
};
let transformed = match tokio::time::timeout_at(
deadline,
self.transform_api.transform(&enriched),
).await {
Ok(Ok(data)) => data,
_ => return MessageResult::Retry,
};
match tokio::time::timeout_at(
deadline,
self.publish_api.publish(&transformed),
).await {
Ok(Ok(())) => MessageResult::Ok,
_ => MessageResult::Retry,
}
}
}
```
`tokio::time::timeout_at` uses an absolute deadline - each call gets the
remaining budget, not a fresh timeout. If enrichment takes 15s, transform
and publish share the remaining 5s.
### Handler processes a batch with chunked API calls
Implement `LeasedHandler` directly for batch/chunked processing. Use
`Batch` for iteration, progress tracking, and remaining lease time:
```rust
use modkit_db::outbox::{Batch, HandlerResult, LeasedHandler};
#[async_trait::async_trait]
impl LeasedHandler for BulkExportHandler {
async fn handle(&self, batch: &mut Batch<'_>) -> HandlerResult {
while !batch.is_empty() {
let chunk = batch.next_chunk(10);
if chunk.is_empty() {
break;
}
let mut events = Vec::with_capacity(chunk.len());
for msg in chunk {
match serde_json::from_slice::<Event>(&msg.payload) {
Ok(e) => events.push(e),
Err(e) => {
return HandlerResult::Reject {
reason: format!("bad payload at seq {}: {e}", msg.seq),
};
}
}
}
// batch.remaining() returns time until the lease cancel point.
// The handler owns timeout decisions - the framework only
// exposes facts (remaining time, message count).
let timeout = batch.remaining() / 3;
match tokio::time::timeout(timeout, self.api.bulk_send(&events)).await {
Ok(Ok(())) => batch.ack_chunk(10),
Ok(Err(e)) if e.is_transient() => {
return HandlerResult::Retry { reason: e.to_string() };
}
Ok(Err(e)) => {
return HandlerResult::Reject { reason: e.to_string() };
}
Err(_) => {
return HandlerResult::Retry { reason: "chunk timeout".into() };
}
}
}
HandlerResult::Success
}
}
```
Chunk ack is all-or-nothing. If a bulk API partially succeeds, do NOT call
`ack_chunk()` - return `Retry` instead. Idempotency handles the duplicate.
### Handler needs fire-and-forget side work
Use `tokio::spawn` when work must survive handler cancellation (e.g.,
best-effort metrics). The spawned task runs independently - it is NOT
cancelled when the handler future is dropped.
```rust
async fn handle(&self, msg: &OutboxMessage) -> MessageResult {
let event = match serde_json::from_slice(&msg.payload) {
Ok(e) => e,
Err(e) => return MessageResult::Reject(format!("bad payload: {e}")),
};
// Main work - cancelled if the handler future is dropped.
if let Err(e) = self.api.send(&event).await {
return MessageResult::Retry;
}
// Fire-and-forget - survives handler cancellation.
let metrics = self.metrics.clone();
tokio::spawn(async move {
if let Err(e) = metrics.record_delivery(&event).await {
tracing::warn!(error = %e, "fire-and-forget metric failed");
}
});
MessageResult::Ok
}
```
Use `tokio::spawn` only for best-effort side effects (metrics, logging,
notifications). Avoid it for main business logic - spawned tasks have no
lease guarantee and no backpressure.
### Custom lease for slow handlers
Chain `.lease(LeaseConfig { .. })` after `.leased()`. Defaults: 30s
duration, 2s headroom. The handler cancel point fires at
`duration - headroom`:
```rust
use modkit_db::outbox::LeaseConfig;
Outbox::builder(db)
.queue("slow-export", partitions)
.leased(SlowHandler { api })
.lease(LeaseConfig {
duration: Duration::from_secs(300),
headroom: Duration::from_secs(5),
})
.start()
.await?;
```
The headroom reserves time for the ack DB round-trip after the handler
finishes. It is a fixed cost (not a percentage of the lease).
### Error mapping: HTTP status -> MessageResult
**Do not blanket-reject all 4xx.** A 4xx may come from an intermediate
proxy, API gateway, or service mesh sidecar - not the target service.
Only reject when you are confident the error is permanent and caused by
the message payload itself.
| 2xx | `Ok` | Success |
| 429 | `Retry` | Rate limited - `HttpClient` retries this automatically |
| 500 / 502 / 503 / 504 | `Retry` | Server-side transient error |
| 401 / 403 | `Retry` | Likely transient - token rotation, IAM propagation delay |
| 404 | `Retry` | Resource may not be provisioned yet (eventual consistency) |
| 409 | `Retry` | Concurrent write conflict |
| 400 | `Reject` | Malformed payload - a bug in the producer, will not self-resolve |
| 422 | `Reject` | Validation failure - the payload content is wrong |
| 410 | `Reject` | Resource explicitly deleted - no point retrying |
| Timeout | `Retry` | The call may have succeeded server-side - idempotency handles redelivery |
| Transport error | `Retry` | Network issue, connection reset |
**When in doubt, `Retry`.** Dead-lettering is permanent - it removes the
message from processing. A retry that succeeds on the next cycle costs
almost nothing. A false reject loses the message until someone manually
replays it from the dead-letter table.
**`Reject` is for bugs, not for infrastructure.** If the remote service
returns an error because of the message content (bad schema, missing
required field, invalid enum value), that is a `Reject` - the message
will never succeed no matter how many times it is retried. Everything
else is infrastructure noise that will resolve itself.
---
## Benchmarks
### Worker Overhead
Infrastructure overhead (scheduling, notifiers, semaphores) with no-op actions:
```bash
cargo bench -p cf-modkit-db --features preview-outbox --bench worker_overhead
```
### Outbox Throughput
End-to-end throughput with per-partition ordering verification.
Requires a database feature flag:
```bash
# SQLite (local, no external DB needed)
cargo bench -p cf-modkit-db --features preview-outbox,sqlite --bench outbox_throughput
# PostgreSQL
cargo bench -p cf-modkit-db --features preview-outbox,pg --bench outbox_throughput -- postgres
# MySQL
cargo bench -p cf-modkit-db --features preview-outbox,mysql --bench outbox_throughput -- mysql
```
### Makefile Targets
```bash
make bench-pg # PostgreSQL standard
make bench-pg-longhaul # PostgreSQL 1M + 10M messages
make bench-mysql # MySQL standard
make bench-sqlite # SQLite standard
make bench-db # All engines
make bench-db-longhaul # All engines, long-haul
```
### Resource-Limited Runs
```bash
systemd-run --user --scope -p MemoryMax=4G -p CPUQuota=200% \
cargo bench -p cf-modkit-db --features preview-outbox --bench worker_overhead \
-- --warm-up-time 1 --measurement-time 3 --sample-size 10
```