Skip to main content

outbox_core/
builder.rs

1//! Fluent builder for [`OutboxManager`].
2//!
3//! The manager has several collaborators that must all be provided before it
4//! can run: a storage backend, a message transport, a configuration, and a
5//! shutdown channel — plus a DLQ heap when the `dlq` feature is enabled.
6//! [`OutboxManagerBuilder`] assembles these pieces step by step and validates
7//! them once at [`build`](OutboxManagerBuilder::build) time, returning a
8//! structured [`OutboxError::ConfigError`] if anything is missing rather than
9//! panicking.
10//!
11//! # Example
12//!
13//! ```ignore
14//! use std::sync::Arc;
15//! use tokio::sync::watch;
16//! use outbox_core::prelude::*;
17//!
18//! # async fn run(
19//! #     storage: Arc<impl OutboxStorage<MyEvent> + Send + Sync + 'static>,
20//! #     publisher: Arc<impl Transport<MyEvent> + Send + Sync + 'static>,
21//! # ) -> Result<(), OutboxError>
22//! # where MyEvent: std::fmt::Debug + Clone + serde::Serialize + Send + Sync + 'static,
23//! # {
24//! let (_tx, rx) = watch::channel(false);
25//! let manager = OutboxManagerBuilder::new()
26//!     .storage(storage)
27//!     .publisher(publisher)
28//!     .config(Arc::new(OutboxConfig::default()))
29//!     .shutdown_rx(rx)
30//!     .build()?;
31//! manager.run().await
32//! # }
33//! ```
34
35use crate::config::OutboxConfig;
36#[cfg(feature = "dlq")]
37use crate::dlq::storage::DlqHeap;
38use crate::error::OutboxError;
39use crate::manager::OutboxManager;
40use crate::prelude::{OutboxStorage, Transport};
41use serde::Serialize;
42use std::fmt::Debug;
43use std::sync::Arc;
44use tokio::sync::watch::Receiver;
45
46/// Fluent builder that assembles an [`OutboxManager`].
47///
48/// All fields are optional during construction — validation happens once in
49/// [`build`](Self::build). Setter methods consume and return `self`, so a
50/// builder can be constructed in a single chained expression. The builder is
51/// generic over:
52///
53/// - `S` — [`OutboxStorage`] implementation (typically a database adapter)
54/// - `P` — [`Transport`] implementation (message broker publisher)
55/// - `PT` — the user's domain event payload type (`Debug + Clone + Serialize`)
56///
57/// # Required vs optional
58///
59/// | Setter | Required | Notes |
60/// |---|---|---|
61/// | [`storage`](Self::storage) | yes | fails `build()` if missing |
62/// | [`publisher`](Self::publisher) | yes | fails `build()` if missing |
63/// | [`config`](Self::config) | yes | fails `build()` if missing |
64/// | [`shutdown_rx`](Self::shutdown_rx) | yes | fails `build()` if missing |
65/// | [`dlq_heap`](Self::dlq_heap) | yes *(feature `dlq` only)* | fails `build()` if missing when feature is on |
66pub struct OutboxManagerBuilder<S, P, PT>
67where
68    PT: Debug + Clone + Serialize,
69{
70    storage: Option<Arc<S>>,
71    publisher: Option<Arc<P>>,
72    config: Option<Arc<OutboxConfig<PT>>>,
73    shutdown_rx: Option<Receiver<bool>>,
74    #[cfg(feature = "dlq")]
75    dlq_heap: Option<Arc<dyn DlqHeap>>,
76}
77impl<S, P, PT> Default for OutboxManagerBuilder<S, P, PT>
78where
79    PT: Debug + Clone + Serialize,
80{
81    fn default() -> Self {
82        Self {
83            storage: None,
84            publisher: None,
85            config: None,
86            shutdown_rx: None,
87            #[cfg(feature = "dlq")]
88            dlq_heap: None,
89        }
90    }
91}
92
93impl<S, P, PT> OutboxManagerBuilder<S, P, PT>
94where
95    S: OutboxStorage<PT> + Send + Sync + 'static,
96    P: Transport<PT> + Send + Sync + 'static,
97    PT: Debug + Clone + Serialize + Send + Sync + 'static,
98{
99    /// Creates an empty builder with all fields unset.
100    ///
101    /// Equivalent to [`Default::default`]; kept as a discoverable entry point
102    /// so callers do not need to import the `Default` trait.
103    #[must_use]
104    pub fn new() -> Self {
105        Self::default()
106    }
107
108    /// Sets the storage backend used for reading pending events, locking them,
109    /// persisting status transitions, and deleting expired rows.
110    #[must_use]
111    pub fn storage(mut self, s: Arc<S>) -> Self {
112        self.storage = Some(s);
113        self
114    }
115
116    /// Sets the transport (broker publisher) that delivers events to the
117    /// outside world.
118    #[must_use]
119    pub fn publisher(mut self, p: Arc<P>) -> Self {
120        self.publisher = Some(p);
121        self
122    }
123
124    /// Sets the runtime configuration (batch size, poll interval, GC cadence,
125    /// lock timeout, idempotency strategy).
126    #[must_use]
127    pub fn config(mut self, config: Arc<OutboxConfig<PT>>) -> Self {
128        self.config = Some(config);
129        self
130    }
131
132    /// Sets the shutdown channel. The manager stops its worker loop as soon as
133    /// `true` is observed on this receiver.
134    #[must_use]
135    pub fn shutdown_rx(mut self, rx: Receiver<bool>) -> Self {
136        self.shutdown_rx = Some(rx);
137        self
138    }
139
140    /// Sets the Dead-Letter-Queue heap that tracks per-event failure counts.
141    ///
142    /// Required when the crate is built with `--features dlq`.
143    #[cfg(feature = "dlq")]
144    #[must_use]
145    pub fn dlq_heap(mut self, heap: Arc<dyn DlqHeap>) -> Self {
146        self.dlq_heap = Some(heap);
147        self
148    }
149
150    /// Consumes the builder and returns a fully wired [`OutboxManager`].
151    ///
152    /// # Errors
153    ///
154    /// Returns [`OutboxError::ConfigError`] with a message identifying the
155    /// first missing dependency if any required field has not been set.
156    /// The diagnostic mentions one of: `Storage`, `Publisher`, `Config`,
157    /// `Shutdown channel`, or — under feature `dlq` — `Dlq heap`.
158    pub fn build(self) -> Result<OutboxManager<S, P, PT>, OutboxError> {
159        #[cfg(feature = "dlq")]
160        return Ok(OutboxManager::new(
161            self.storage
162                .ok_or_else(|| OutboxError::ConfigError("Storage config is missing".to_string()))?,
163            self.publisher.ok_or_else(|| {
164                OutboxError::ConfigError("Publisher config is missing".to_string())
165            })?,
166            self.config
167                .ok_or_else(|| OutboxError::ConfigError("Config config is missing".to_string()))?,
168            self.dlq_heap.ok_or_else(|| {
169                OutboxError::ConfigError("Dlq heap config is missing".to_string())
170            })?,
171            self.shutdown_rx.ok_or_else(|| {
172                OutboxError::ConfigError("Shutdown channel is missing".to_string())
173            })?,
174        ));
175        #[cfg(not(feature = "dlq"))]
176        return Ok(OutboxManager::new(
177            self.storage
178                .ok_or_else(|| OutboxError::ConfigError("Storage config is missing".to_string()))?,
179            self.publisher.ok_or_else(|| {
180                OutboxError::ConfigError("Publisher config is missing".to_string())
181            })?,
182            self.config
183                .ok_or_else(|| OutboxError::ConfigError("Config config is missing".to_string()))?,
184            self.shutdown_rx.ok_or_else(|| {
185                OutboxError::ConfigError("Shutdown channel is missing".to_string())
186            })?,
187        ));
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194    use crate::config::IdempotencyStrategy;
195    use crate::dlq::storage::MockDlqHeap;
196    use crate::publisher::MockTransport;
197    use crate::storage::MockOutboxStorage;
198    use rstest::rstest;
199    use serde::Deserialize;
200    use tokio::sync::watch;
201
202    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
203    enum SomeDomainEvent {
204        SomeEvent(String),
205    }
206
207    #[rstest]
208    fn test_success_build() {
209        let config = OutboxConfig {
210            batch_size: 100,
211            retention_days: 7,
212            gc_interval_secs: 3600,
213            poll_interval_secs: 5,
214            lock_timeout_mins: 5,
215            idempotency_strategy: IdempotencyStrategy::<SomeDomainEvent>::None,
216            ..Default::default()
217        };
218
219        let storage_mock = MockOutboxStorage::<SomeDomainEvent>::new();
220        let transport_mock = MockTransport::<SomeDomainEvent>::new();
221
222        #[cfg(feature = "dlq")]
223        let dlq_heap_mock: MockDlqHeap = MockDlqHeap::new();
224
225        let (_shutdown_tx, shutdown_rx) = watch::channel(false);
226
227        #[cfg(feature = "dlq")]
228        let outbox_manager = OutboxManagerBuilder::new()
229            .storage(Arc::new(storage_mock))
230            .publisher(Arc::new(transport_mock))
231            .config(Arc::new(config))
232            .shutdown_rx(shutdown_rx)
233            .dlq_heap(Arc::new(dlq_heap_mock))
234            .build();
235
236        #[cfg(not(feature = "dlq"))]
237        let outbox_manager = OutboxManagerBuilder::new()
238            .storage(Arc::new(storage_mock))
239            .publisher(Arc::new(transport_mock))
240            .config(Arc::new(config))
241            .shutdown_rx(shutdown_rx)
242            .build();
243
244        assert!(outbox_manager.is_ok());
245    }
246
247    type Builder = OutboxManagerBuilder<
248        MockOutboxStorage<SomeDomainEvent>,
249        MockTransport<SomeDomainEvent>,
250        SomeDomainEvent,
251    >;
252
253    fn default_config() -> Arc<OutboxConfig<SomeDomainEvent>> {
254        Arc::new(OutboxConfig {
255            batch_size: 100,
256            retention_days: 7,
257            gc_interval_secs: 3600,
258            poll_interval_secs: 5,
259            lock_timeout_mins: 5,
260            idempotency_strategy: IdempotencyStrategy::None,
261            ..Default::default()
262        })
263    }
264
265    #[rstest]
266    fn default_builder_fails_on_build_with_config_error() {
267        let result = Builder::default().build();
268        assert!(matches!(result, Err(OutboxError::ConfigError(_))));
269    }
270
271    fn assert_config_error_with(
272        result: Result<
273            OutboxManager<
274                MockOutboxStorage<SomeDomainEvent>,
275                MockTransport<SomeDomainEvent>,
276                SomeDomainEvent,
277            >,
278            OutboxError,
279        >,
280        needle: &str,
281    ) {
282        match result {
283            Err(OutboxError::ConfigError(msg)) => {
284                assert!(msg.contains(needle), "expected '{needle}' in '{msg}'");
285            }
286            Err(other) => panic!("expected ConfigError, got {other:?}"),
287            Ok(_) => panic!("expected ConfigError, got Ok(..)"),
288        }
289    }
290
291    #[rstest]
292    fn new_matches_default() {
293        let r1 = Builder::new().build();
294        let r2 = Builder::default().build();
295        let m1 = match r1 {
296            Err(OutboxError::ConfigError(m)) => m,
297            _ => panic!("new().build() should fail with ConfigError"),
298        };
299        let m2 = match r2 {
300            Err(OutboxError::ConfigError(m)) => m,
301            _ => panic!("default().build() should fail with ConfigError"),
302        };
303        assert_eq!(m1, m2);
304    }
305
306    #[rstest]
307    fn build_fails_without_storage() {
308        let (_tx, rx) = watch::channel(false);
309        let b = Builder::new()
310            .publisher(Arc::new(MockTransport::new()))
311            .config(default_config())
312            .shutdown_rx(rx);
313        #[cfg(feature = "dlq")]
314        let b = b.dlq_heap(Arc::new(MockDlqHeap::new()));
315
316        assert_config_error_with(b.build(), "Storage");
317    }
318
319    #[rstest]
320    fn build_fails_without_publisher() {
321        let (_tx, rx) = watch::channel(false);
322        let b = Builder::new()
323            .storage(Arc::new(MockOutboxStorage::new()))
324            .config(default_config())
325            .shutdown_rx(rx);
326        #[cfg(feature = "dlq")]
327        let b = b.dlq_heap(Arc::new(MockDlqHeap::new()));
328
329        assert_config_error_with(b.build(), "Publisher");
330    }
331
332    #[rstest]
333    fn build_fails_without_config() {
334        let (_tx, rx) = watch::channel(false);
335        let b = Builder::new()
336            .storage(Arc::new(MockOutboxStorage::new()))
337            .publisher(Arc::new(MockTransport::new()))
338            .shutdown_rx(rx);
339        #[cfg(feature = "dlq")]
340        let b = b.dlq_heap(Arc::new(MockDlqHeap::new()));
341
342        assert_config_error_with(b.build(), "Config");
343    }
344
345    #[rstest]
346    fn build_fails_without_shutdown_rx() {
347        let b = Builder::new()
348            .storage(Arc::new(MockOutboxStorage::new()))
349            .publisher(Arc::new(MockTransport::new()))
350            .config(default_config());
351        #[cfg(feature = "dlq")]
352        let b = b.dlq_heap(Arc::new(MockDlqHeap::new()));
353
354        assert_config_error_with(b.build(), "Shutdown");
355    }
356
357    #[cfg(feature = "dlq")]
358    #[rstest]
359    fn build_fails_without_dlq_heap() {
360        let (_tx, rx) = watch::channel(false);
361        let result = Builder::new()
362            .storage(Arc::new(MockOutboxStorage::new()))
363            .publisher(Arc::new(MockTransport::new()))
364            .config(default_config())
365            .shutdown_rx(rx)
366            .build();
367        assert_config_error_with(result, "Dlq");
368    }
369
370    #[rstest]
371    fn build_is_insensitive_to_setter_order() {
372        let (_tx, rx) = watch::channel(false);
373        let b = Builder::new()
374            .shutdown_rx(rx)
375            .config(default_config())
376            .publisher(Arc::new(MockTransport::new()))
377            .storage(Arc::new(MockOutboxStorage::new()));
378        #[cfg(feature = "dlq")]
379        let b = b.dlq_heap(Arc::new(MockDlqHeap::new()));
380
381        assert!(b.build().is_ok());
382    }
383}