Skip to main content

outbox_core/
storage.rs

1//! Storage abstractions backing the outbox table.
2//!
3//! Two traits split the read/write responsibilities:
4//!
5//! - [`OutboxWriter`] — producer-side insert path, used by
6//!   [`OutboxService`](crate::service::OutboxService) to persist new events.
7//! - [`OutboxStorage`] — worker-side read and lifecycle path, used by
8//!   [`OutboxManager`](crate::manager::OutboxManager) to fetch pending rows,
9//!   record status transitions, prune old data, and wait for notifications.
10//!
11//! Concrete implementations live in sibling crates (`outbox-postgres`,
12//! `outbox-redis`). Splitting the traits lets a producer depend on the write
13//! side only and keeps the worker's broader surface opt-in.
14
15use crate::error::OutboxError;
16use crate::model::{Event, EventStatus};
17use crate::object::EventId;
18use async_trait::async_trait;
19use serde::Serialize;
20use std::fmt::Debug;
21
22/// Worker-side storage contract.
23///
24/// An implementation must provide the read and lifecycle operations the
25/// [`OutboxManager`](crate::manager::OutboxManager) drives on every tick:
26/// claiming pending rows, recording their outcome, cleaning up finished data,
27/// and blocking until an external notification arrives.
28#[cfg_attr(test, mockall::automock)]
29#[async_trait]
30pub trait OutboxStorage<P>
31where
32    P: Debug + Clone + Serialize + Send + Sync,
33{
34    /// Claims up to `limit` rows that are eligible for processing.
35    ///
36    /// "Eligible" means rows whose status is
37    /// [`EventStatus::Pending`](crate::model::EventStatus::Pending) — including
38    /// newly inserted rows and rows whose processing lock has expired.
39    /// Implementations are expected to atomically flip the returned rows to
40    /// [`EventStatus::Processing`](crate::model::EventStatus::Processing)
41    /// with a lock that expires after `lock_timeout_mins`, so concurrent
42    /// workers cannot pick up the same row.
43    ///
44    /// # Errors
45    ///
46    /// Returns an [`OutboxError`] if the underlying datastore call fails.
47    async fn fetch_next_to_process(&self, limit: u32) -> Result<Vec<Event<P>>, OutboxError>;
48
49    /// Transitions the rows identified by `id` to `status`.
50    ///
51    /// Typically called after a batch publish attempt:
52    /// [`EventStatus::Sent`](crate::model::EventStatus::Sent) for successful
53    /// publications, or
54    /// [`EventStatus::Pending`](crate::model::EventStatus::Pending) to release
55    /// a row for retry.
56    ///
57    /// # Errors
58    ///
59    /// Returns an [`OutboxError`] if the underlying datastore call fails.
60    async fn update_status(&self, id: &[EventId], status: EventStatus) -> Result<(), OutboxError>;
61
62    /// Deletes rows that are past their retention window.
63    ///
64    /// Invoked on a timer by the [`GarbageCollector`](crate::gc::GarbageCollector)
65    /// task. The retention window itself is defined by the storage
66    /// implementation (it usually reads `retention_days` from the same
67    /// configuration the manager holds).
68    ///
69    /// # Errors
70    ///
71    /// Returns an [`OutboxError`] if the underlying datastore call fails.
72    async fn delete_garbage(&self) -> Result<(), OutboxError>;
73
74    /// Blocks until a notification arrives on `channel`, or returns
75    /// immediately on the next call if the backend does not support async
76    /// notifications.
77    ///
78    /// Used by the manager's wake-up loop in combination with a poll
79    /// interval: the backend can deliver a nudge as soon as a new row is
80    /// written, while the poll interval guarantees eventual progress if the
81    /// notification is missed.
82    ///
83    /// # Errors
84    ///
85    /// Returns an [`OutboxError`] if the listen call fails. The manager
86    /// recovers by logging and sleeping 5 seconds before retrying.
87    async fn wait_for_notification(&self, channel: &str) -> Result<(), OutboxError>;
88
89    /// Atomically moves the given entries out of the active outbox table and
90    /// into the dead-letter destination table.
91    ///
92    /// Called once per tick by
93    /// [`DlqProcessor`](crate::dlq::processor::DlqProcessor) after
94    /// [`DlqHeap::drain_exceeded`](crate::dlq::storage::DlqHeap::drain_exceeded)
95    /// has returned a non-empty batch. The whole `entries` slice is expected
96    /// to be moved in a single transaction so there is no observable window
97    /// in which a row appears in neither table or in both.
98    ///
99    /// `entries` carries `failure_count` (and any future per-event metadata)
100    /// alongside each [`EventId`]; the implementation persists those values
101    /// onto the destination row.
102    ///
103    /// Ids in `entries` whose source row no longer exists (already deleted
104    /// by GC, manual operator action, etc.) are silently dropped — only
105    /// matched rows are moved.
106    ///
107    /// # Default implementation
108    ///
109    /// The default implementation returns an [`OutboxError::ConfigError`].
110    /// Backend crates ship a real implementation behind their own `dlq`
111    /// feature; this default is what callers see when the backend was built
112    /// without DLQ support but `outbox-core/dlq` happens to be enabled
113    /// elsewhere in the workspace (Cargo's feature unification can pull it
114    /// in transitively).
115    ///
116    /// The method is intentionally **not** `#[cfg(feature = "dlq")]`-gated:
117    /// gating it on the trait creates a workspace-level mismatch where
118    /// `outbox-core` sees the method (because some other crate enabled the
119    /// feature) but a backend crate built without its own `dlq` feature
120    /// does not provide an implementation.
121    ///
122    /// # Errors
123    ///
124    /// Returns an [`OutboxError`] if the underlying datastore call fails.
125    /// On error, the caller should assume **none** of the entries were
126    /// moved — implementations must not partially commit.
127    async fn quarantine_events(
128        &self,
129        _entries: &[crate::dlq::model::DlqEntry],
130    ) -> Result<(), OutboxError> {
131        Err(OutboxError::ConfigError(
132            "OutboxStorage::quarantine_events: DLQ is not implemented by this backend \
133             (rebuild the storage crate with its `dlq` feature enabled)"
134                .to_string(),
135        ))
136    }
137}
138
139/// Producer-side storage contract.
140///
141/// Separated from [`OutboxStorage`] so a service that only writes events can
142/// depend on the narrow surface it actually uses.
143#[cfg_attr(test, mockall::automock)]
144#[async_trait]
145pub trait OutboxWriter<P>
146where
147    P: Debug + Clone + Serialize + Send + Sync,
148{
149    /// Persists a single [`Event`] row in the outbox table.
150    ///
151    /// Called by [`OutboxService::add_event`](crate::service::OutboxService::add_event)
152    /// after any configured idempotency reservation has succeeded.
153    ///
154    /// # Errors
155    ///
156    /// Returns an [`OutboxError`] if the insert fails — typically a
157    /// [`DatabaseError`](OutboxError::DatabaseError) on a unique-constraint
158    /// violation or connection issue.
159    async fn insert_event(&self, event: Event<P>) -> Result<(), OutboxError>;
160}