1use crate::config::OutboxConfig;
36#[cfg(feature = "dlq")]
37use crate::dlq::storage::DlqHeap;
38use crate::error::OutboxError;
39use crate::manager::OutboxManager;
40use crate::prelude::{OutboxStorage, Transport};
41use serde::Serialize;
42use std::fmt::Debug;
43use std::sync::Arc;
44use tokio::sync::watch::Receiver;
45
46pub struct OutboxManagerBuilder<S, P, PT>
67where
68 PT: Debug + Clone + Serialize,
69{
70 storage: Option<Arc<S>>,
71 publisher: Option<Arc<P>>,
72 config: Option<Arc<OutboxConfig<PT>>>,
73 shutdown_rx: Option<Receiver<bool>>,
74 #[cfg(feature = "dlq")]
75 dlq_heap: Option<Arc<dyn DlqHeap>>,
76}
77impl<S, P, PT> Default for OutboxManagerBuilder<S, P, PT>
78where
79 PT: Debug + Clone + Serialize,
80{
81 fn default() -> Self {
82 Self {
83 storage: None,
84 publisher: None,
85 config: None,
86 shutdown_rx: None,
87 #[cfg(feature = "dlq")]
88 dlq_heap: None,
89 }
90 }
91}
92
93impl<S, P, PT> OutboxManagerBuilder<S, P, PT>
94where
95 S: OutboxStorage<PT> + Send + Sync + 'static,
96 P: Transport<PT> + Send + Sync + 'static,
97 PT: Debug + Clone + Serialize + Send + Sync + 'static,
98{
99 #[must_use]
104 pub fn new() -> Self {
105 Self::default()
106 }
107
108 #[must_use]
111 pub fn storage(mut self, s: Arc<S>) -> Self {
112 self.storage = Some(s);
113 self
114 }
115
116 #[must_use]
119 pub fn publisher(mut self, p: Arc<P>) -> Self {
120 self.publisher = Some(p);
121 self
122 }
123
124 #[must_use]
127 pub fn config(mut self, config: Arc<OutboxConfig<PT>>) -> Self {
128 self.config = Some(config);
129 self
130 }
131
132 #[must_use]
135 pub fn shutdown_rx(mut self, rx: Receiver<bool>) -> Self {
136 self.shutdown_rx = Some(rx);
137 self
138 }
139
140 #[cfg(feature = "dlq")]
144 #[must_use]
145 pub fn dlq_heap(mut self, heap: Arc<dyn DlqHeap>) -> Self {
146 self.dlq_heap = Some(heap);
147 self
148 }
149
150 pub fn build(self) -> Result<OutboxManager<S, P, PT>, OutboxError> {
159 #[cfg(feature = "dlq")]
160 return Ok(OutboxManager::new(
161 self.storage
162 .ok_or_else(|| OutboxError::ConfigError("Storage config is missing".to_string()))?,
163 self.publisher.ok_or_else(|| {
164 OutboxError::ConfigError("Publisher config is missing".to_string())
165 })?,
166 self.config
167 .ok_or_else(|| OutboxError::ConfigError("Config config is missing".to_string()))?,
168 self.dlq_heap.ok_or_else(|| {
169 OutboxError::ConfigError("Dlq heap config is missing".to_string())
170 })?,
171 self.shutdown_rx.ok_or_else(|| {
172 OutboxError::ConfigError("Shutdown channel is missing".to_string())
173 })?,
174 ));
175 #[cfg(not(feature = "dlq"))]
176 return Ok(OutboxManager::new(
177 self.storage
178 .ok_or_else(|| OutboxError::ConfigError("Storage config is missing".to_string()))?,
179 self.publisher.ok_or_else(|| {
180 OutboxError::ConfigError("Publisher config is missing".to_string())
181 })?,
182 self.config
183 .ok_or_else(|| OutboxError::ConfigError("Config config is missing".to_string()))?,
184 self.shutdown_rx.ok_or_else(|| {
185 OutboxError::ConfigError("Shutdown channel is missing".to_string())
186 })?,
187 ));
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use crate::config::IdempotencyStrategy;
195 use crate::dlq::storage::MockDlqHeap;
196 use crate::publisher::MockTransport;
197 use crate::storage::MockOutboxStorage;
198 use rstest::rstest;
199 use serde::Deserialize;
200 use tokio::sync::watch;
201
202 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
203 enum SomeDomainEvent {
204 SomeEvent(String),
205 }
206
207 #[rstest]
208 fn test_success_build() {
209 let config = OutboxConfig {
210 batch_size: 100,
211 retention_days: 7,
212 gc_interval_secs: 3600,
213 poll_interval_secs: 5,
214 lock_timeout_mins: 5,
215 idempotency_strategy: IdempotencyStrategy::<SomeDomainEvent>::None,
216 ..Default::default()
217 };
218
219 let storage_mock = MockOutboxStorage::<SomeDomainEvent>::new();
220 let transport_mock = MockTransport::<SomeDomainEvent>::new();
221
222 #[cfg(feature = "dlq")]
223 let dlq_heap_mock: MockDlqHeap = MockDlqHeap::new();
224
225 let (_shutdown_tx, shutdown_rx) = watch::channel(false);
226
227 #[cfg(feature = "dlq")]
228 let outbox_manager = OutboxManagerBuilder::new()
229 .storage(Arc::new(storage_mock))
230 .publisher(Arc::new(transport_mock))
231 .config(Arc::new(config))
232 .shutdown_rx(shutdown_rx)
233 .dlq_heap(Arc::new(dlq_heap_mock))
234 .build();
235
236 #[cfg(not(feature = "dlq"))]
237 let outbox_manager = OutboxManagerBuilder::new()
238 .storage(Arc::new(storage_mock))
239 .publisher(Arc::new(transport_mock))
240 .config(Arc::new(config))
241 .shutdown_rx(shutdown_rx)
242 .build();
243
244 assert!(outbox_manager.is_ok());
245 }
246
247 type Builder = OutboxManagerBuilder<
248 MockOutboxStorage<SomeDomainEvent>,
249 MockTransport<SomeDomainEvent>,
250 SomeDomainEvent,
251 >;
252
253 fn default_config() -> Arc<OutboxConfig<SomeDomainEvent>> {
254 Arc::new(OutboxConfig {
255 batch_size: 100,
256 retention_days: 7,
257 gc_interval_secs: 3600,
258 poll_interval_secs: 5,
259 lock_timeout_mins: 5,
260 idempotency_strategy: IdempotencyStrategy::None,
261 ..Default::default()
262 })
263 }
264
265 #[rstest]
266 fn default_builder_fails_on_build_with_config_error() {
267 let result = Builder::default().build();
268 assert!(matches!(result, Err(OutboxError::ConfigError(_))));
269 }
270
271 fn assert_config_error_with(
272 result: Result<
273 OutboxManager<
274 MockOutboxStorage<SomeDomainEvent>,
275 MockTransport<SomeDomainEvent>,
276 SomeDomainEvent,
277 >,
278 OutboxError,
279 >,
280 needle: &str,
281 ) {
282 match result {
283 Err(OutboxError::ConfigError(msg)) => {
284 assert!(msg.contains(needle), "expected '{needle}' in '{msg}'");
285 }
286 Err(other) => panic!("expected ConfigError, got {other:?}"),
287 Ok(_) => panic!("expected ConfigError, got Ok(..)"),
288 }
289 }
290
291 #[rstest]
292 fn new_matches_default() {
293 let r1 = Builder::new().build();
294 let r2 = Builder::default().build();
295 let m1 = match r1 {
296 Err(OutboxError::ConfigError(m)) => m,
297 _ => panic!("new().build() should fail with ConfigError"),
298 };
299 let m2 = match r2 {
300 Err(OutboxError::ConfigError(m)) => m,
301 _ => panic!("default().build() should fail with ConfigError"),
302 };
303 assert_eq!(m1, m2);
304 }
305
306 #[rstest]
307 fn build_fails_without_storage() {
308 let (_tx, rx) = watch::channel(false);
309 let b = Builder::new()
310 .publisher(Arc::new(MockTransport::new()))
311 .config(default_config())
312 .shutdown_rx(rx);
313 #[cfg(feature = "dlq")]
314 let b = b.dlq_heap(Arc::new(MockDlqHeap::new()));
315
316 assert_config_error_with(b.build(), "Storage");
317 }
318
319 #[rstest]
320 fn build_fails_without_publisher() {
321 let (_tx, rx) = watch::channel(false);
322 let b = Builder::new()
323 .storage(Arc::new(MockOutboxStorage::new()))
324 .config(default_config())
325 .shutdown_rx(rx);
326 #[cfg(feature = "dlq")]
327 let b = b.dlq_heap(Arc::new(MockDlqHeap::new()));
328
329 assert_config_error_with(b.build(), "Publisher");
330 }
331
332 #[rstest]
333 fn build_fails_without_config() {
334 let (_tx, rx) = watch::channel(false);
335 let b = Builder::new()
336 .storage(Arc::new(MockOutboxStorage::new()))
337 .publisher(Arc::new(MockTransport::new()))
338 .shutdown_rx(rx);
339 #[cfg(feature = "dlq")]
340 let b = b.dlq_heap(Arc::new(MockDlqHeap::new()));
341
342 assert_config_error_with(b.build(), "Config");
343 }
344
345 #[rstest]
346 fn build_fails_without_shutdown_rx() {
347 let b = Builder::new()
348 .storage(Arc::new(MockOutboxStorage::new()))
349 .publisher(Arc::new(MockTransport::new()))
350 .config(default_config());
351 #[cfg(feature = "dlq")]
352 let b = b.dlq_heap(Arc::new(MockDlqHeap::new()));
353
354 assert_config_error_with(b.build(), "Shutdown");
355 }
356
357 #[cfg(feature = "dlq")]
358 #[rstest]
359 fn build_fails_without_dlq_heap() {
360 let (_tx, rx) = watch::channel(false);
361 let result = Builder::new()
362 .storage(Arc::new(MockOutboxStorage::new()))
363 .publisher(Arc::new(MockTransport::new()))
364 .config(default_config())
365 .shutdown_rx(rx)
366 .build();
367 assert_config_error_with(result, "Dlq");
368 }
369
370 #[rstest]
371 fn build_is_insensitive_to_setter_order() {
372 let (_tx, rx) = watch::channel(false);
373 let b = Builder::new()
374 .shutdown_rx(rx)
375 .config(default_config())
376 .publisher(Arc::new(MockTransport::new()))
377 .storage(Arc::new(MockOutboxStorage::new()));
378 #[cfg(feature = "dlq")]
379 let b = b.dlq_heap(Arc::new(MockDlqHeap::new()));
380
381 assert!(b.build().is_ok());
382 }
383}