Skip to main content

outbox_core/
manager.rs

1//! Top-level worker that owns the outbox event loop.
2//!
3//! [`OutboxManager`] ties together storage, transport, configuration and a
4//! shutdown signal. It drives the main processing loop — waiting for database
5//! notifications, polling on a fixed interval, draining pending events via
6//! [`OutboxProcessor`], and running a periodic [`GarbageCollector`] task on
7//! the side. Construct one via [`OutboxManagerBuilder`](crate::builder::OutboxManagerBuilder).
8
9use crate::config::OutboxConfig;
10use crate::dlq::processor::DlqProcessor;
11use crate::error::OutboxError;
12use crate::gc::GarbageCollector;
13use crate::processor::OutboxProcessor;
14use crate::publisher::Transport;
15use crate::storage::OutboxStorage;
16use serde::Serialize;
17use std::fmt::Debug;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::sync::watch::Receiver;
21use tracing::{debug, error, info, trace};
22
23/// Long-running worker that publishes pending outbox events to the broker.
24///
25/// The manager owns one concrete [`OutboxStorage`] implementation (`S`), one
26/// [`Transport`] implementation (`P`), and the user's domain event payload type
27/// (`PT`). After [`run`](Self::run) is invoked, it will keep processing events
28/// until the watched shutdown channel flips to `true`.
29///
30/// Prefer constructing through
31/// [`OutboxManagerBuilder`](crate::builder::OutboxManagerBuilder) rather than
32/// calling [`new`](Self::new) directly — the builder reports missing
33/// dependencies with a structured [`OutboxError::ConfigError`] instead of
34/// requiring all arguments to line up positionally.
35pub struct OutboxManager<S, P, PT>
36where
37    PT: Debug + Clone + Serialize,
38{
39    storage: Arc<S>,
40    publisher: Arc<P>,
41    config: Arc<OutboxConfig<PT>>,
42    shutdown_rx: Receiver<bool>,
43    #[cfg(feature = "dlq")]
44    dlq_heap: Arc<dyn crate::dlq::storage::DlqHeap>,
45}
46
47impl<S, P, PT> OutboxManager<S, P, PT>
48where
49    S: OutboxStorage<PT> + Send + Sync + 'static,
50    P: Transport<PT> + Send + Sync + 'static,
51    PT: Debug + Clone + Serialize + Send + Sync + 'static,
52{
53    /// Direct constructor used by
54    /// [`OutboxManagerBuilder`](crate::builder::OutboxManagerBuilder).
55    ///
56    /// Application code should normally go through the builder, which
57    /// validates that every required collaborator has been supplied.
58    ///
59    /// This signature is compiled when the `dlq` feature is enabled and takes
60    /// an extra [`DlqHeap`](crate::dlq::storage::DlqHeap) that tracks per-event
61    /// failure counts.
62    #[cfg(feature = "dlq")]
63    pub fn new(
64        storage: Arc<S>,
65        publisher: Arc<P>,
66        config: Arc<OutboxConfig<PT>>,
67        dlq_heap: Arc<dyn crate::dlq::storage::DlqHeap>,
68        shutdown_rx: Receiver<bool>,
69    ) -> Self {
70        Self {
71            storage,
72            publisher,
73            config,
74            shutdown_rx,
75            dlq_heap,
76        }
77    }
78
79    /// Direct constructor used by
80    /// [`OutboxManagerBuilder`](crate::builder::OutboxManagerBuilder).
81    ///
82    /// Application code should normally go through the builder.
83    ///
84    /// This signature is compiled when the `dlq` feature is disabled and omits
85    /// the DLQ heap argument.
86    #[cfg(not(feature = "dlq"))]
87    pub fn new(
88        storage: Arc<S>,
89        publisher: Arc<P>,
90        config: Arc<OutboxConfig<PT>>,
91        shutdown_rx: Receiver<bool>,
92    ) -> Self {
93        Self {
94            storage,
95            publisher,
96            config,
97            shutdown_rx,
98        }
99    }
100
101    /// Starts the main outbox worker loop.
102    ///
103    /// This method will run until a shutdown signal is received via the
104    /// `shutdown_rx` channel. It coordinates three concerns:
105    ///
106    /// - **Event processing** — on each wake-up it drives
107    ///   [`OutboxProcessor`] in an inner drain loop until the fetched batch is
108    ///   empty, at which point it returns to waiting.
109    /// - **Wake-up sources** — a `tokio::select!` races a storage-level
110    ///   `LISTEN`/notify call, a poll interval (`config.poll_interval_secs`),
111    ///   and the shutdown receiver. A notification error is logged and the
112    ///   loop sleeps for 5 seconds before retrying.
113    /// - **Garbage collection** — a background task is spawned that ticks on
114    ///   `config.gc_interval_secs` and calls [`GarbageCollector::collect_garbage`],
115    ///   exiting when the shutdown signal fires.
116    ///
117    /// # Errors
118    ///
119    /// Returns an [`OutboxError`] if the worker encounters a terminal failure
120    /// that it cannot recover from. In the current implementation transient
121    /// errors from the storage and transport layers are logged and the loop
122    /// continues, so a returned error signals that the worker observed a
123    /// graceful shutdown via `shutdown_rx`.
124    ///
125    /// # Example
126    ///
127    /// ```ignore
128    /// use std::sync::Arc;
129    /// use tokio::sync::watch;
130    /// use outbox_core::prelude::*;
131    ///
132    /// # async fn start(
133    /// #     storage: Arc<impl OutboxStorage<MyEvent> + Send + Sync + 'static>,
134    /// #     publisher: Arc<impl Transport<MyEvent> + Send + Sync + 'static>,
135    /// # ) -> Result<(), OutboxError>
136    /// # where MyEvent: std::fmt::Debug + Clone + serde::Serialize + Send + Sync + 'static,
137    /// # {
138    /// let (shutdown_tx, shutdown_rx) = watch::channel(false);
139    /// let manager = OutboxManagerBuilder::new()
140    ///     .storage(storage)
141    ///     .publisher(publisher)
142    ///     .config(Arc::new(OutboxConfig::default()))
143    ///     .shutdown_rx(shutdown_rx)
144    ///     .build()?;
145    ///
146    /// let handle = tokio::spawn(async move { manager.run().await });
147    ///
148    /// // ... later, on a signal or process exit:
149    /// let _ = shutdown_tx.send(true);
150    /// handle.await.expect("worker panicked")?;
151    /// # Ok(()) }
152    /// ```
153    pub async fn run(self) -> Result<(), OutboxError> {
154        let storage_for_listen = self.storage.clone();
155        let processor = OutboxProcessor::new(
156            self.storage.clone(),
157            self.publisher.clone(),
158            self.config.clone(),
159        );
160
161        let gc = GarbageCollector::new(self.storage.clone());
162        let mut rx_gc = self.shutdown_rx.clone();
163        let gc_interval_secs = self.config.gc_interval_secs;
164        tokio::spawn(async move {
165            gc.run(Duration::from_secs(gc_interval_secs), &mut rx_gc)
166                .await
167        });
168
169        #[cfg(feature = "dlq")]
170        {
171            let dlq_processor = DlqProcessor::new(
172                self.dlq_heap.clone(),
173                self.storage.clone(),
174                self.config.clone(),
175                self.shutdown_rx.clone(),
176            );
177            tokio::spawn(async move { dlq_processor.run().await });
178        }
179
180        let mut rx_listen = self.shutdown_rx.clone();
181        let poll_interval = self.config.poll_interval_secs;
182        let mut interval = tokio::time::interval(Duration::from_secs(poll_interval));
183
184        info!("Outbox worker loop started");
185
186        loop {
187            tokio::select! {
188                signal = storage_for_listen.wait_for_notification("outbox_event") => {
189                    if let Err(e) = signal {
190                        error!("Listen error: {}", e);
191                        tokio::time::sleep(Duration::from_secs(5)).await;
192                        continue;
193                    }
194                }
195                _ = interval.tick() => {
196                    trace!("Checking for stale or pending events via interval");
197                }
198                _ = rx_listen.changed() => {
199                    if rx_listen.has_changed().is_err(){
200                        break;
201                    }
202                    if *rx_listen.borrow() {
203                        break
204                    }
205                }
206            }
207            loop {
208                if *rx_listen.borrow() {
209                    return Ok(());
210                }
211                #[cfg(feature = "dlq")]
212                match processor
213                    .process_pending_events(self.dlq_heap.clone())
214                    .await
215                {
216                    Ok(0) => break,
217                    Ok(count) => debug!("Processed {} events", count),
218                    Err(e) => {
219                        error!("Processing error: {}", e);
220                        tokio::time::sleep(Duration::from_secs(5)).await;
221                        break;
222                    }
223                }
224                #[cfg(not(feature = "dlq"))]
225                match processor.process_pending_events().await {
226                    Ok(0) => break,
227                    Ok(count) => debug!("Processed {} events", count),
228                    Err(e) => {
229                        error!("Processing error: {}", e);
230                        tokio::time::sleep(Duration::from_secs(5)).await;
231                        break;
232                    }
233                }
234            }
235        }
236        debug!("Outbox worker loop stopped");
237        Ok(())
238    }
239}
240
241#[cfg(test)]
242#[allow(clippy::unwrap_used)]
243mod tests {
244    use crate::builder::OutboxManagerBuilder;
245    use crate::config::{IdempotencyStrategy, OutboxConfig};
246    #[cfg(feature = "dlq")]
247    use crate::dlq::storage::MockDlqHeap;
248    use crate::error::OutboxError;
249    use crate::model::{Event, EventStatus};
250    use crate::object::EventType;
251    use crate::prelude::Payload;
252    use crate::publisher::MockTransport;
253    use crate::storage::MockOutboxStorage;
254    use mockall::Sequence;
255    use rstest::rstest;
256    use serde::{Deserialize, Serialize};
257    use std::sync::Arc;
258    use tokio::sync::watch;
259
260    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
261    enum SomeDomainEvent {
262        SomeEvent(String),
263    }
264
265    #[rstest]
266    #[tokio::test]
267    async fn test_event_send_success() {
268        let config = OutboxConfig {
269            batch_size: 100,
270            retention_days: 7,
271            gc_interval_secs: 3600,
272            poll_interval_secs: 5,
273            lock_timeout_mins: 5,
274            idempotency_strategy: IdempotencyStrategy::None,
275            dlq_threshold: 10,
276            dlq_interval_secs: 1,
277        };
278
279        let mut storage_mock = MockOutboxStorage::<SomeDomainEvent>::new();
280        let mut transport_mock = MockTransport::<SomeDomainEvent>::new();
281
282        #[cfg(feature = "dlq")]
283        let mut dlq_heap_mock: MockDlqHeap = MockDlqHeap::new();
284
285        #[cfg(feature = "dlq")]
286        dlq_heap_mock.expect_record_success().returning(|_| Ok(()));
287
288        let (shutdown_tx, shutdown_rx) = watch::channel(false);
289
290        storage_mock
291            .expect_wait_for_notification()
292            .returning(|_| Ok(()));
293
294        storage_mock
295            .expect_fetch_next_to_process()
296            .withf(move |l| l == &config.batch_size)
297            .times(1)
298            .returning(move |_| {
299                let _ = shutdown_tx.send(true);
300                Ok(vec![
301                    Event::new(
302                        EventType::new("1"),
303                        Payload::new(SomeDomainEvent::SomeEvent("test1".to_string())),
304                        None,
305                    ),
306                    Event::new(
307                        EventType::new("2"),
308                        Payload::new(SomeDomainEvent::SomeEvent("test2".to_string())),
309                        None,
310                    ),
311                    Event::new(
312                        EventType::new("3"),
313                        Payload::new(SomeDomainEvent::SomeEvent("test3".to_string())),
314                        None,
315                    ),
316                    Event::new(
317                        EventType::new("4"),
318                        Payload::new(SomeDomainEvent::SomeEvent("test4".to_string())),
319                        None,
320                    ),
321                ])
322            });
323
324        storage_mock
325            .expect_fetch_next_to_process()
326            .withf(move |l| l == &config.batch_size)
327            .returning(move |_| Ok(vec![]));
328
329        storage_mock
330            .expect_update_status()
331            .withf(|ids, s| ids.len() == 4 && s == &EventStatus::Sent)
332            .returning(|_, _| Ok(()));
333
334        storage_mock.expect_delete_garbage().returning(|| Ok(()));
335
336        let mut seq = Sequence::new();
337
338        for i in 1..=4 {
339            let expected_type = i.to_string();
340            let expected_val = SomeDomainEvent::SomeEvent(format!("test{}", i));
341
342            transport_mock
343                .expect_publish()
344                .withf(move |event| {
345                    let type_matches = event.event_type.as_str() == expected_type;
346                    let payload_matches = event.payload.as_value() == &expected_val;
347                    type_matches && payload_matches
348                })
349                .times(1)
350                .in_sequence(&mut seq)
351                .returning(|_| Ok(()));
352        }
353
354        #[cfg(feature = "dlq")]
355        let manager = OutboxManagerBuilder::new()
356            .storage(Arc::new(storage_mock))
357            .publisher(Arc::new(transport_mock))
358            .config(Arc::new(config))
359            .shutdown_rx(shutdown_rx)
360            .dlq_heap(Arc::new(dlq_heap_mock))
361            .build()
362            .unwrap();
363
364        #[cfg(not(feature = "dlq"))]
365        let manager = OutboxManagerBuilder::new()
366            .storage(Arc::new(storage_mock))
367            .publisher(Arc::new(transport_mock))
368            .config(Arc::new(config))
369            .shutdown_rx(shutdown_rx)
370            .build()
371            .unwrap();
372
373        let handle = tokio::spawn(async move {
374            manager.run().await.unwrap();
375        });
376
377        tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
378            .await
379            .expect("Manager did not stop in time")
380            .unwrap();
381    }
382
383    #[rstest]
384    #[tokio::test]
385    async fn test_recovery_after_storage_error() {
386        let mut storage_mock = MockOutboxStorage::<SomeDomainEvent>::new();
387        let (shutdown_tx, shutdown_rx) = watch::channel(false);
388
389        storage_mock
390            .expect_wait_for_notification()
391            .times(1)
392            .returning(|_| Err(OutboxError::InfrastructureError("Connection lost".into())));
393
394        storage_mock
395            .expect_wait_for_notification()
396            .times(1)
397            .returning(move |_| {
398                let _ = shutdown_tx.send(true);
399                Ok(())
400            });
401
402        storage_mock
403            .expect_fetch_next_to_process()
404            .returning(|_| Ok(vec![]));
405
406        storage_mock
407            .expect_delete_garbage()
408            .times(1)
409            .returning(|| Ok(()));
410
411        let transport_mock = MockTransport::<SomeDomainEvent>::new();
412
413        let config = OutboxConfig::<SomeDomainEvent> {
414            batch_size: 100,
415            retention_days: 7,
416            gc_interval_secs: 3600,
417            poll_interval_secs: 5,
418            lock_timeout_mins: 5,
419            idempotency_strategy: IdempotencyStrategy::None,
420            dlq_threshold: 10,
421            dlq_interval_secs: 1,
422        };
423
424        #[cfg(feature = "dlq")]
425        let manager = OutboxManagerBuilder::new()
426            .storage(Arc::new(storage_mock))
427            .publisher(Arc::new(transport_mock))
428            .config(Arc::new(config))
429            .dlq_heap(Arc::new(MockDlqHeap::new()))
430            .shutdown_rx(shutdown_rx)
431            .build()
432            .unwrap();
433
434        #[cfg(not(feature = "dlq"))]
435        let manager = OutboxManagerBuilder::new()
436            .storage(Arc::new(storage_mock))
437            .publisher(Arc::new(transport_mock))
438            .config(Arc::new(config))
439            .shutdown_rx(shutdown_rx)
440            .build()
441            .unwrap();
442
443        let result = manager.run().await;
444        assert!(result.is_ok());
445    }
446
447    #[rstest]
448    #[tokio::test]
449    async fn test_publish_failure() {
450        let mut storage_mock = MockOutboxStorage::<SomeDomainEvent>::new();
451        let (shutdown_tx, shutdown_rx) = watch::channel(false);
452
453        let e1 = Event::new(
454            EventType::new("1"),
455            Payload::new(SomeDomainEvent::SomeEvent("test1".to_string())),
456            None,
457        );
458        let e2 = Event::new(
459            EventType::new("2"),
460            Payload::new(SomeDomainEvent::SomeEvent("test2".to_string())),
461            None,
462        );
463        let e3 = Event::new(
464            EventType::new("3"),
465            Payload::new(SomeDomainEvent::SomeEvent("test3".to_string())),
466            None,
467        );
468        let e4 = Event::new(
469            EventType::new("4"),
470            Payload::new(SomeDomainEvent::SomeEvent("test4".to_string())),
471            None,
472        );
473
474        let id1 = e1.id.clone();
475        let id2 = e2.id.clone();
476        let id3 = e3.id.clone();
477        let id4 = e4.id.clone();
478
479        storage_mock
480            .expect_wait_for_notification()
481            .returning(|_| Ok(()));
482
483        storage_mock
484            .expect_fetch_next_to_process()
485            .times(1)
486            .returning(move |_| Ok(vec![e1.clone(), e2.clone(), e3.clone(), e4.clone()]));
487
488        storage_mock
489            .expect_fetch_next_to_process()
490            .returning(|_| Ok(vec![]));
491
492        storage_mock.expect_delete_garbage().returning(|| Ok(()));
493
494        storage_mock
495            .expect_update_status()
496            .withf(move |ids, status| {
497                if status != &EventStatus::Sent {
498                    return false;
499                }
500
501                let ids_set: std::collections::HashSet<_> = ids.iter().cloned().collect();
502
503                ids_set.len() == 3
504                    && ids_set.contains(&id1)
505                    && ids_set.contains(&id2)
506                    && ids_set.contains(&id4)
507                    && !ids_set.contains(&id3)
508            })
509            .returning(move |_, _| {
510                let _ = shutdown_tx.send(true);
511                Ok(())
512            });
513
514        let mut transport_mock = MockTransport::<SomeDomainEvent>::new();
515
516        let mut seq = Sequence::new();
517
518        transport_mock
519            .expect_publish()
520            .times(1)
521            .in_sequence(&mut seq)
522            .returning(|_| Ok(()));
523
524        transport_mock
525            .expect_publish()
526            .times(1)
527            .in_sequence(&mut seq)
528            .returning(|_| Ok(()));
529
530        transport_mock
531            .expect_publish()
532            .times(1)
533            .in_sequence(&mut seq)
534            .returning(|_| Err(OutboxError::InfrastructureError("Connection lost".into())));
535
536        transport_mock
537            .expect_publish()
538            .times(1)
539            .in_sequence(&mut seq)
540            .returning(|_| Ok(()));
541
542        let config = OutboxConfig::<SomeDomainEvent> {
543            batch_size: 100,
544            retention_days: 7,
545            gc_interval_secs: 3600,
546            poll_interval_secs: 5,
547            lock_timeout_mins: 5,
548            idempotency_strategy: IdempotencyStrategy::None,
549            dlq_threshold: 10,
550            dlq_interval_secs: 1,
551        };
552
553        #[cfg(feature = "dlq")]
554        let dlq_heap_mock = {
555            let mut m = MockDlqHeap::new();
556            m.expect_record_success().returning(|_| Ok(()));
557            m.expect_record_failure().returning(|_| Ok(()));
558            m
559        };
560
561        #[cfg(feature = "dlq")]
562        let manager = OutboxManagerBuilder::new()
563            .storage(Arc::new(storage_mock))
564            .publisher(Arc::new(transport_mock))
565            .config(Arc::new(config))
566            .dlq_heap(Arc::new(dlq_heap_mock))
567            .shutdown_rx(shutdown_rx)
568            .build()
569            .unwrap();
570
571        #[cfg(not(feature = "dlq"))]
572        let manager = OutboxManagerBuilder::new()
573            .storage(Arc::new(storage_mock))
574            .publisher(Arc::new(transport_mock))
575            .config(Arc::new(config))
576            .shutdown_rx(shutdown_rx)
577            .build()
578            .unwrap();
579
580        let result = manager.run().await;
581        assert!(result.is_ok());
582    }
583
584    fn default_config() -> OutboxConfig<SomeDomainEvent> {
585        OutboxConfig {
586            batch_size: 100,
587            retention_days: 7,
588            gc_interval_secs: 3600,
589            poll_interval_secs: 5,
590            lock_timeout_mins: 5,
591            idempotency_strategy: IdempotencyStrategy::None,
592            dlq_threshold: 10,
593            dlq_interval_secs: 1,
594        }
595    }
596
597    #[rstest]
598    #[tokio::test]
599    async fn shutdown_set_before_run_exits_without_fetch() {
600        let mut storage_mock = MockOutboxStorage::<SomeDomainEvent>::new();
601        let transport_mock = MockTransport::<SomeDomainEvent>::new();
602        let (shutdown_tx, shutdown_rx) = watch::channel(false);
603        let _ = shutdown_tx.send(true);
604
605        storage_mock
606            .expect_wait_for_notification()
607            .returning(|_| Ok(()));
608        storage_mock.expect_delete_garbage().returning(|| Ok(()));
609        storage_mock.expect_fetch_next_to_process().times(0);
610        storage_mock.expect_update_status().times(0);
611
612        #[cfg(feature = "dlq")]
613        let manager = OutboxManagerBuilder::new()
614            .storage(Arc::new(storage_mock))
615            .publisher(Arc::new(transport_mock))
616            .config(Arc::new(default_config()))
617            .dlq_heap(Arc::new(MockDlqHeap::new()))
618            .shutdown_rx(shutdown_rx)
619            .build()
620            .unwrap();
621        #[cfg(not(feature = "dlq"))]
622        let manager = OutboxManagerBuilder::new()
623            .storage(Arc::new(storage_mock))
624            .publisher(Arc::new(transport_mock))
625            .config(Arc::new(default_config()))
626            .shutdown_rx(shutdown_rx)
627            .build()
628            .unwrap();
629
630        let result = tokio::time::timeout(tokio::time::Duration::from_secs(1), manager.run())
631            .await
632            .expect("manager did not stop in time");
633        assert!(result.is_ok());
634        drop(shutdown_tx);
635    }
636
637    #[rstest]
638    #[tokio::test]
639    async fn inner_loop_drains_until_empty_batch() {
640        let mut storage_mock = MockOutboxStorage::<SomeDomainEvent>::new();
641        let mut transport_mock = MockTransport::<SomeDomainEvent>::new();
642        let (shutdown_tx, shutdown_rx) = watch::channel(false);
643
644        storage_mock
645            .expect_wait_for_notification()
646            .returning(|_| Ok(()));
647        storage_mock.expect_delete_garbage().returning(|| Ok(()));
648
649        let mut seq = Sequence::new();
650        storage_mock
651            .expect_fetch_next_to_process()
652            .times(1)
653            .in_sequence(&mut seq)
654            .returning(|_| {
655                Ok(vec![
656                    Event::new(
657                        EventType::new("a1"),
658                        Payload::new(SomeDomainEvent::SomeEvent("a1".into())),
659                        None,
660                    ),
661                    Event::new(
662                        EventType::new("a2"),
663                        Payload::new(SomeDomainEvent::SomeEvent("a2".into())),
664                        None,
665                    ),
666                ])
667            });
668        storage_mock
669            .expect_fetch_next_to_process()
670            .times(1)
671            .in_sequence(&mut seq)
672            .returning(|_| {
673                Ok(vec![Event::new(
674                    EventType::new("b1"),
675                    Payload::new(SomeDomainEvent::SomeEvent("b1".into())),
676                    None,
677                )])
678            });
679        storage_mock
680            .expect_fetch_next_to_process()
681            .times(1)
682            .in_sequence(&mut seq)
683            .returning(move |_| {
684                let _ = shutdown_tx.send(true);
685                Ok(vec![])
686            });
687        storage_mock
688            .expect_fetch_next_to_process()
689            .returning(|_| Ok(vec![]));
690
691        storage_mock
692            .expect_update_status()
693            .times(2)
694            .returning(|_, _| Ok(()));
695
696        transport_mock
697            .expect_publish()
698            .times(3)
699            .returning(|_| Ok(()));
700
701        #[cfg(feature = "dlq")]
702        let manager = {
703            let mut dlq = MockDlqHeap::new();
704            dlq.expect_record_success().returning(|_| Ok(()));
705            dlq.expect_record_failure().returning(|_| Ok(()));
706            OutboxManagerBuilder::new()
707                .storage(Arc::new(storage_mock))
708                .publisher(Arc::new(transport_mock))
709                .config(Arc::new(default_config()))
710                .dlq_heap(Arc::new(dlq))
711                .shutdown_rx(shutdown_rx)
712                .build()
713                .unwrap()
714        };
715        #[cfg(not(feature = "dlq"))]
716        let manager = OutboxManagerBuilder::new()
717            .storage(Arc::new(storage_mock))
718            .publisher(Arc::new(transport_mock))
719            .config(Arc::new(default_config()))
720            .shutdown_rx(shutdown_rx)
721            .build()
722            .unwrap();
723
724        let handle = tokio::spawn(async move { manager.run().await.unwrap() });
725        tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
726            .await
727            .expect("manager did not stop in time")
728            .unwrap();
729    }
730
731    #[rstest]
732    #[tokio::test(start_paused = true)]
733    async fn fetch_error_inside_loop_is_recoverable() {
734        let mut storage_mock = MockOutboxStorage::<SomeDomainEvent>::new();
735        let transport_mock = MockTransport::<SomeDomainEvent>::new();
736        let (shutdown_tx, shutdown_rx) = watch::channel(false);
737
738        storage_mock
739            .expect_wait_for_notification()
740            .returning(|_| Ok(()));
741        storage_mock.expect_delete_garbage().returning(|| Ok(()));
742
743        let mut seq = Sequence::new();
744        storage_mock
745            .expect_fetch_next_to_process()
746            .times(1)
747            .in_sequence(&mut seq)
748            .returning(|_| Err(OutboxError::DatabaseError("transient".into())));
749        storage_mock
750            .expect_fetch_next_to_process()
751            .in_sequence(&mut seq)
752            .returning(move |_| {
753                let _ = shutdown_tx.send(true);
754                Ok(vec![])
755            });
756        storage_mock
757            .expect_fetch_next_to_process()
758            .returning(|_| Ok(vec![]));
759
760        storage_mock.expect_update_status().times(0);
761
762        #[cfg(feature = "dlq")]
763        let manager = OutboxManagerBuilder::new()
764            .storage(Arc::new(storage_mock))
765            .publisher(Arc::new(transport_mock))
766            .config(Arc::new(default_config()))
767            .dlq_heap(Arc::new(MockDlqHeap::new()))
768            .shutdown_rx(shutdown_rx)
769            .build()
770            .unwrap();
771        #[cfg(not(feature = "dlq"))]
772        let manager = OutboxManagerBuilder::new()
773            .storage(Arc::new(storage_mock))
774            .publisher(Arc::new(transport_mock))
775            .config(Arc::new(default_config()))
776            .shutdown_rx(shutdown_rx)
777            .build()
778            .unwrap();
779
780        let result = tokio::time::timeout(tokio::time::Duration::from_secs(30), manager.run())
781            .await
782            .expect("manager did not stop in time");
783        assert!(result.is_ok());
784    }
785}