outbox_core/storage.rs
1//! Storage abstractions backing the outbox table.
2//!
3//! Two traits split the read/write responsibilities:
4//!
5//! - [`OutboxWriter`] — producer-side insert path, used by
6//! [`OutboxService`](crate::service::OutboxService) to persist new events.
7//! - [`OutboxStorage`] — worker-side read and lifecycle path, used by
8//! [`OutboxManager`](crate::manager::OutboxManager) to fetch pending rows,
9//! record status transitions, prune old data, and wait for notifications.
10//!
11//! Concrete implementations live in sibling crates (`outbox-postgres`,
12//! `outbox-redis`). Splitting the traits lets a producer depend on the write
13//! side only and keeps the worker's broader surface opt-in.
14
15use crate::error::OutboxError;
16use crate::model::{Event, EventStatus};
17use crate::object::EventId;
18use async_trait::async_trait;
19use serde::Serialize;
20use std::fmt::Debug;
21
22/// Worker-side storage contract.
23///
24/// An implementation must provide the read and lifecycle operations the
25/// [`OutboxManager`](crate::manager::OutboxManager) drives on every tick:
26/// claiming pending rows, recording their outcome, cleaning up finished data,
27/// and blocking until an external notification arrives.
28#[cfg_attr(test, mockall::automock)]
29#[async_trait]
30pub trait OutboxStorage<P>
31where
32 P: Debug + Clone + Serialize + Send + Sync,
33{
34 /// Claims up to `limit` rows that are eligible for processing.
35 ///
36 /// "Eligible" means rows whose status is
37 /// [`EventStatus::Pending`](crate::model::EventStatus::Pending) — including
38 /// newly inserted rows and rows whose processing lock has expired.
39 /// Implementations are expected to atomically flip the returned rows to
40 /// [`EventStatus::Processing`](crate::model::EventStatus::Processing)
41 /// with a lock that expires after `lock_timeout_mins`, so concurrent
42 /// workers cannot pick up the same row.
43 ///
44 /// # Errors
45 ///
46 /// Returns an [`OutboxError`] if the underlying datastore call fails.
47 async fn fetch_next_to_process(&self, limit: u32) -> Result<Vec<Event<P>>, OutboxError>;
48
49 /// Transitions the rows identified by `id` to `status`.
50 ///
51 /// Typically called after a batch publish attempt:
52 /// [`EventStatus::Sent`](crate::model::EventStatus::Sent) for successful
53 /// publications, or
54 /// [`EventStatus::Pending`](crate::model::EventStatus::Pending) to release
55 /// a row for retry.
56 ///
57 /// # Errors
58 ///
59 /// Returns an [`OutboxError`] if the underlying datastore call fails.
60 async fn update_status(&self, id: &[EventId], status: EventStatus) -> Result<(), OutboxError>;
61
62 /// Deletes rows that are past their retention window.
63 ///
64 /// Invoked on a timer by the [`GarbageCollector`](crate::gc::GarbageCollector)
65 /// task. The retention window itself is defined by the storage
66 /// implementation (it usually reads `retention_days` from the same
67 /// configuration the manager holds).
68 ///
69 /// # Errors
70 ///
71 /// Returns an [`OutboxError`] if the underlying datastore call fails.
72 async fn delete_garbage(&self) -> Result<(), OutboxError>;
73
74 /// Blocks until a notification arrives on `channel`, or returns
75 /// immediately on the next call if the backend does not support async
76 /// notifications.
77 ///
78 /// Used by the manager's wake-up loop in combination with a poll
79 /// interval: the backend can deliver a nudge as soon as a new row is
80 /// written, while the poll interval guarantees eventual progress if the
81 /// notification is missed.
82 ///
83 /// # Errors
84 ///
85 /// Returns an [`OutboxError`] if the listen call fails. The manager
86 /// recovers by logging and sleeping 5 seconds before retrying.
87 async fn wait_for_notification(&self, channel: &str) -> Result<(), OutboxError>;
88
89 /// Atomically moves the given entries out of the active outbox table and
90 /// into the dead-letter destination table.
91 ///
92 /// Called once per tick by
93 /// [`DlqProcessor`](crate::dlq::processor::DlqProcessor) after
94 /// [`DlqHeap::drain_exceeded`](crate::dlq::storage::DlqHeap::drain_exceeded)
95 /// has returned a non-empty batch. The whole `entries` slice is expected
96 /// to be moved in a single transaction so there is no observable window
97 /// in which a row appears in neither table or in both.
98 ///
99 /// `entries` carries `failure_count` (and any future per-event metadata)
100 /// alongside each [`EventId`]; the implementation persists those values
101 /// onto the destination row.
102 ///
103 /// Ids in `entries` whose source row no longer exists (already deleted
104 /// by GC, manual operator action, etc.) are silently dropped — only
105 /// matched rows are moved.
106 ///
107 /// # Default implementation
108 ///
109 /// The default implementation returns an [`OutboxError::ConfigError`].
110 /// Backend crates ship a real implementation behind their own `dlq`
111 /// feature; this default is what callers see when the backend was built
112 /// without DLQ support but `outbox-core/dlq` happens to be enabled
113 /// elsewhere in the workspace (Cargo's feature unification can pull it
114 /// in transitively).
115 ///
116 /// The method is intentionally **not** `#[cfg(feature = "dlq")]`-gated:
117 /// gating it on the trait creates a workspace-level mismatch where
118 /// `outbox-core` sees the method (because some other crate enabled the
119 /// feature) but a backend crate built without its own `dlq` feature
120 /// does not provide an implementation.
121 ///
122 /// # Errors
123 ///
124 /// Returns an [`OutboxError`] if the underlying datastore call fails.
125 /// On error, the caller should assume **none** of the entries were
126 /// moved — implementations must not partially commit.
127 async fn quarantine_events(
128 &self,
129 _entries: &[crate::dlq::model::DlqEntry],
130 ) -> Result<(), OutboxError> {
131 Err(OutboxError::ConfigError(
132 "OutboxStorage::quarantine_events: DLQ is not implemented by this backend \
133 (rebuild the storage crate with its `dlq` feature enabled)"
134 .to_string(),
135 ))
136 }
137}
138
139/// Producer-side storage contract.
140///
141/// Separated from [`OutboxStorage`] so a service that only writes events can
142/// depend on the narrow surface it actually uses.
143#[cfg_attr(test, mockall::automock)]
144#[async_trait]
145pub trait OutboxWriter<P>
146where
147 P: Debug + Clone + Serialize + Send + Sync,
148{
149 /// Persists a single [`Event`] row in the outbox table.
150 ///
151 /// Called by [`OutboxService::add_event`](crate::service::OutboxService::add_event)
152 /// after any configured idempotency reservation has succeeded.
153 ///
154 /// # Errors
155 ///
156 /// Returns an [`OutboxError`] if the insert fails — typically a
157 /// [`DatabaseError`](OutboxError::DatabaseError) on a unique-constraint
158 /// violation or connection issue.
159 async fn insert_event(&self, event: Event<P>) -> Result<(), OutboxError>;
160}