1use crate::config::OutboxConfig;
10use crate::dlq::processor::DlqProcessor;
11use crate::error::OutboxError;
12use crate::gc::GarbageCollector;
13use crate::processor::OutboxProcessor;
14use crate::publisher::Transport;
15use crate::storage::OutboxStorage;
16use serde::Serialize;
17use std::fmt::Debug;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::sync::watch::Receiver;
21use tracing::{debug, error, info, trace};
22
23pub struct OutboxManager<S, P, PT>
36where
37 PT: Debug + Clone + Serialize,
38{
39 storage: Arc<S>,
40 publisher: Arc<P>,
41 config: Arc<OutboxConfig<PT>>,
42 shutdown_rx: Receiver<bool>,
43 #[cfg(feature = "dlq")]
44 dlq_heap: Arc<dyn crate::dlq::storage::DlqHeap>,
45}
46
47impl<S, P, PT> OutboxManager<S, P, PT>
48where
49 S: OutboxStorage<PT> + Send + Sync + 'static,
50 P: Transport<PT> + Send + Sync + 'static,
51 PT: Debug + Clone + Serialize + Send + Sync + 'static,
52{
53 #[cfg(feature = "dlq")]
63 pub fn new(
64 storage: Arc<S>,
65 publisher: Arc<P>,
66 config: Arc<OutboxConfig<PT>>,
67 dlq_heap: Arc<dyn crate::dlq::storage::DlqHeap>,
68 shutdown_rx: Receiver<bool>,
69 ) -> Self {
70 Self {
71 storage,
72 publisher,
73 config,
74 shutdown_rx,
75 dlq_heap,
76 }
77 }
78
79 #[cfg(not(feature = "dlq"))]
87 pub fn new(
88 storage: Arc<S>,
89 publisher: Arc<P>,
90 config: Arc<OutboxConfig<PT>>,
91 shutdown_rx: Receiver<bool>,
92 ) -> Self {
93 Self {
94 storage,
95 publisher,
96 config,
97 shutdown_rx,
98 }
99 }
100
101 pub async fn run(self) -> Result<(), OutboxError> {
154 let storage_for_listen = self.storage.clone();
155 let processor = OutboxProcessor::new(
156 self.storage.clone(),
157 self.publisher.clone(),
158 self.config.clone(),
159 );
160
161 let gc = GarbageCollector::new(self.storage.clone());
162 let mut rx_gc = self.shutdown_rx.clone();
163 let gc_interval_secs = self.config.gc_interval_secs;
164 tokio::spawn(async move {
165 gc.run(Duration::from_secs(gc_interval_secs), &mut rx_gc)
166 .await
167 });
168
169 #[cfg(feature = "dlq")]
170 {
171 let dlq_processor = DlqProcessor::new(
172 self.dlq_heap.clone(),
173 self.storage.clone(),
174 self.config.clone(),
175 self.shutdown_rx.clone(),
176 );
177 tokio::spawn(async move { dlq_processor.run().await });
178 }
179
180 let mut rx_listen = self.shutdown_rx.clone();
181 let poll_interval = self.config.poll_interval_secs;
182 let mut interval = tokio::time::interval(Duration::from_secs(poll_interval));
183
184 info!("Outbox worker loop started");
185
186 loop {
187 tokio::select! {
188 signal = storage_for_listen.wait_for_notification("outbox_event") => {
189 if let Err(e) = signal {
190 error!("Listen error: {}", e);
191 tokio::time::sleep(Duration::from_secs(5)).await;
192 continue;
193 }
194 }
195 _ = interval.tick() => {
196 trace!("Checking for stale or pending events via interval");
197 }
198 _ = rx_listen.changed() => {
199 if rx_listen.has_changed().is_err(){
200 break;
201 }
202 if *rx_listen.borrow() {
203 break
204 }
205 }
206 }
207 loop {
208 if *rx_listen.borrow() {
209 return Ok(());
210 }
211 #[cfg(feature = "dlq")]
212 match processor
213 .process_pending_events(self.dlq_heap.clone())
214 .await
215 {
216 Ok(0) => break,
217 Ok(count) => debug!("Processed {} events", count),
218 Err(e) => {
219 error!("Processing error: {}", e);
220 tokio::time::sleep(Duration::from_secs(5)).await;
221 break;
222 }
223 }
224 #[cfg(not(feature = "dlq"))]
225 match processor.process_pending_events().await {
226 Ok(0) => break,
227 Ok(count) => debug!("Processed {} events", count),
228 Err(e) => {
229 error!("Processing error: {}", e);
230 tokio::time::sleep(Duration::from_secs(5)).await;
231 break;
232 }
233 }
234 }
235 }
236 debug!("Outbox worker loop stopped");
237 Ok(())
238 }
239}
240
241#[cfg(test)]
242#[allow(clippy::unwrap_used)]
243mod tests {
244 use crate::builder::OutboxManagerBuilder;
245 use crate::config::{IdempotencyStrategy, OutboxConfig};
246 #[cfg(feature = "dlq")]
247 use crate::dlq::storage::MockDlqHeap;
248 use crate::error::OutboxError;
249 use crate::model::{Event, EventStatus};
250 use crate::object::EventType;
251 use crate::prelude::Payload;
252 use crate::publisher::MockTransport;
253 use crate::storage::MockOutboxStorage;
254 use mockall::Sequence;
255 use rstest::rstest;
256 use serde::{Deserialize, Serialize};
257 use std::sync::Arc;
258 use tokio::sync::watch;
259
260 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
261 enum SomeDomainEvent {
262 SomeEvent(String),
263 }
264
265 #[rstest]
266 #[tokio::test]
267 async fn test_event_send_success() {
268 let config = OutboxConfig {
269 batch_size: 100,
270 retention_days: 7,
271 gc_interval_secs: 3600,
272 poll_interval_secs: 5,
273 lock_timeout_mins: 5,
274 idempotency_strategy: IdempotencyStrategy::None,
275 dlq_threshold: 10,
276 dlq_interval_secs: 1,
277 };
278
279 let mut storage_mock = MockOutboxStorage::<SomeDomainEvent>::new();
280 let mut transport_mock = MockTransport::<SomeDomainEvent>::new();
281
282 #[cfg(feature = "dlq")]
283 let mut dlq_heap_mock: MockDlqHeap = MockDlqHeap::new();
284
285 #[cfg(feature = "dlq")]
286 dlq_heap_mock.expect_record_success().returning(|_| Ok(()));
287
288 let (shutdown_tx, shutdown_rx) = watch::channel(false);
289
290 storage_mock
291 .expect_wait_for_notification()
292 .returning(|_| Ok(()));
293
294 storage_mock
295 .expect_fetch_next_to_process()
296 .withf(move |l| l == &config.batch_size)
297 .times(1)
298 .returning(move |_| {
299 let _ = shutdown_tx.send(true);
300 Ok(vec![
301 Event::new(
302 EventType::new("1"),
303 Payload::new(SomeDomainEvent::SomeEvent("test1".to_string())),
304 None,
305 ),
306 Event::new(
307 EventType::new("2"),
308 Payload::new(SomeDomainEvent::SomeEvent("test2".to_string())),
309 None,
310 ),
311 Event::new(
312 EventType::new("3"),
313 Payload::new(SomeDomainEvent::SomeEvent("test3".to_string())),
314 None,
315 ),
316 Event::new(
317 EventType::new("4"),
318 Payload::new(SomeDomainEvent::SomeEvent("test4".to_string())),
319 None,
320 ),
321 ])
322 });
323
324 storage_mock
325 .expect_fetch_next_to_process()
326 .withf(move |l| l == &config.batch_size)
327 .returning(move |_| Ok(vec![]));
328
329 storage_mock
330 .expect_update_status()
331 .withf(|ids, s| ids.len() == 4 && s == &EventStatus::Sent)
332 .returning(|_, _| Ok(()));
333
334 storage_mock.expect_delete_garbage().returning(|| Ok(()));
335
336 let mut seq = Sequence::new();
337
338 for i in 1..=4 {
339 let expected_type = i.to_string();
340 let expected_val = SomeDomainEvent::SomeEvent(format!("test{}", i));
341
342 transport_mock
343 .expect_publish()
344 .withf(move |event| {
345 let type_matches = event.event_type.as_str() == expected_type;
346 let payload_matches = event.payload.as_value() == &expected_val;
347 type_matches && payload_matches
348 })
349 .times(1)
350 .in_sequence(&mut seq)
351 .returning(|_| Ok(()));
352 }
353
354 #[cfg(feature = "dlq")]
355 let manager = OutboxManagerBuilder::new()
356 .storage(Arc::new(storage_mock))
357 .publisher(Arc::new(transport_mock))
358 .config(Arc::new(config))
359 .shutdown_rx(shutdown_rx)
360 .dlq_heap(Arc::new(dlq_heap_mock))
361 .build()
362 .unwrap();
363
364 #[cfg(not(feature = "dlq"))]
365 let manager = OutboxManagerBuilder::new()
366 .storage(Arc::new(storage_mock))
367 .publisher(Arc::new(transport_mock))
368 .config(Arc::new(config))
369 .shutdown_rx(shutdown_rx)
370 .build()
371 .unwrap();
372
373 let handle = tokio::spawn(async move {
374 manager.run().await.unwrap();
375 });
376
377 tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
378 .await
379 .expect("Manager did not stop in time")
380 .unwrap();
381 }
382
383 #[rstest]
384 #[tokio::test]
385 async fn test_recovery_after_storage_error() {
386 let mut storage_mock = MockOutboxStorage::<SomeDomainEvent>::new();
387 let (shutdown_tx, shutdown_rx) = watch::channel(false);
388
389 storage_mock
390 .expect_wait_for_notification()
391 .times(1)
392 .returning(|_| Err(OutboxError::InfrastructureError("Connection lost".into())));
393
394 storage_mock
395 .expect_wait_for_notification()
396 .times(1)
397 .returning(move |_| {
398 let _ = shutdown_tx.send(true);
399 Ok(())
400 });
401
402 storage_mock
403 .expect_fetch_next_to_process()
404 .returning(|_| Ok(vec![]));
405
406 storage_mock
407 .expect_delete_garbage()
408 .times(1)
409 .returning(|| Ok(()));
410
411 let transport_mock = MockTransport::<SomeDomainEvent>::new();
412
413 let config = OutboxConfig::<SomeDomainEvent> {
414 batch_size: 100,
415 retention_days: 7,
416 gc_interval_secs: 3600,
417 poll_interval_secs: 5,
418 lock_timeout_mins: 5,
419 idempotency_strategy: IdempotencyStrategy::None,
420 dlq_threshold: 10,
421 dlq_interval_secs: 1,
422 };
423
424 #[cfg(feature = "dlq")]
425 let manager = OutboxManagerBuilder::new()
426 .storage(Arc::new(storage_mock))
427 .publisher(Arc::new(transport_mock))
428 .config(Arc::new(config))
429 .dlq_heap(Arc::new(MockDlqHeap::new()))
430 .shutdown_rx(shutdown_rx)
431 .build()
432 .unwrap();
433
434 #[cfg(not(feature = "dlq"))]
435 let manager = OutboxManagerBuilder::new()
436 .storage(Arc::new(storage_mock))
437 .publisher(Arc::new(transport_mock))
438 .config(Arc::new(config))
439 .shutdown_rx(shutdown_rx)
440 .build()
441 .unwrap();
442
443 let result = manager.run().await;
444 assert!(result.is_ok());
445 }
446
447 #[rstest]
448 #[tokio::test]
449 async fn test_publish_failure() {
450 let mut storage_mock = MockOutboxStorage::<SomeDomainEvent>::new();
451 let (shutdown_tx, shutdown_rx) = watch::channel(false);
452
453 let e1 = Event::new(
454 EventType::new("1"),
455 Payload::new(SomeDomainEvent::SomeEvent("test1".to_string())),
456 None,
457 );
458 let e2 = Event::new(
459 EventType::new("2"),
460 Payload::new(SomeDomainEvent::SomeEvent("test2".to_string())),
461 None,
462 );
463 let e3 = Event::new(
464 EventType::new("3"),
465 Payload::new(SomeDomainEvent::SomeEvent("test3".to_string())),
466 None,
467 );
468 let e4 = Event::new(
469 EventType::new("4"),
470 Payload::new(SomeDomainEvent::SomeEvent("test4".to_string())),
471 None,
472 );
473
474 let id1 = e1.id.clone();
475 let id2 = e2.id.clone();
476 let id3 = e3.id.clone();
477 let id4 = e4.id.clone();
478
479 storage_mock
480 .expect_wait_for_notification()
481 .returning(|_| Ok(()));
482
483 storage_mock
484 .expect_fetch_next_to_process()
485 .times(1)
486 .returning(move |_| Ok(vec![e1.clone(), e2.clone(), e3.clone(), e4.clone()]));
487
488 storage_mock
489 .expect_fetch_next_to_process()
490 .returning(|_| Ok(vec![]));
491
492 storage_mock.expect_delete_garbage().returning(|| Ok(()));
493
494 storage_mock
495 .expect_update_status()
496 .withf(move |ids, status| {
497 if status != &EventStatus::Sent {
498 return false;
499 }
500
501 let ids_set: std::collections::HashSet<_> = ids.iter().cloned().collect();
502
503 ids_set.len() == 3
504 && ids_set.contains(&id1)
505 && ids_set.contains(&id2)
506 && ids_set.contains(&id4)
507 && !ids_set.contains(&id3)
508 })
509 .returning(move |_, _| {
510 let _ = shutdown_tx.send(true);
511 Ok(())
512 });
513
514 let mut transport_mock = MockTransport::<SomeDomainEvent>::new();
515
516 let mut seq = Sequence::new();
517
518 transport_mock
519 .expect_publish()
520 .times(1)
521 .in_sequence(&mut seq)
522 .returning(|_| Ok(()));
523
524 transport_mock
525 .expect_publish()
526 .times(1)
527 .in_sequence(&mut seq)
528 .returning(|_| Ok(()));
529
530 transport_mock
531 .expect_publish()
532 .times(1)
533 .in_sequence(&mut seq)
534 .returning(|_| Err(OutboxError::InfrastructureError("Connection lost".into())));
535
536 transport_mock
537 .expect_publish()
538 .times(1)
539 .in_sequence(&mut seq)
540 .returning(|_| Ok(()));
541
542 let config = OutboxConfig::<SomeDomainEvent> {
543 batch_size: 100,
544 retention_days: 7,
545 gc_interval_secs: 3600,
546 poll_interval_secs: 5,
547 lock_timeout_mins: 5,
548 idempotency_strategy: IdempotencyStrategy::None,
549 dlq_threshold: 10,
550 dlq_interval_secs: 1,
551 };
552
553 #[cfg(feature = "dlq")]
554 let dlq_heap_mock = {
555 let mut m = MockDlqHeap::new();
556 m.expect_record_success().returning(|_| Ok(()));
557 m.expect_record_failure().returning(|_| Ok(()));
558 m
559 };
560
561 #[cfg(feature = "dlq")]
562 let manager = OutboxManagerBuilder::new()
563 .storage(Arc::new(storage_mock))
564 .publisher(Arc::new(transport_mock))
565 .config(Arc::new(config))
566 .dlq_heap(Arc::new(dlq_heap_mock))
567 .shutdown_rx(shutdown_rx)
568 .build()
569 .unwrap();
570
571 #[cfg(not(feature = "dlq"))]
572 let manager = OutboxManagerBuilder::new()
573 .storage(Arc::new(storage_mock))
574 .publisher(Arc::new(transport_mock))
575 .config(Arc::new(config))
576 .shutdown_rx(shutdown_rx)
577 .build()
578 .unwrap();
579
580 let result = manager.run().await;
581 assert!(result.is_ok());
582 }
583
584 fn default_config() -> OutboxConfig<SomeDomainEvent> {
585 OutboxConfig {
586 batch_size: 100,
587 retention_days: 7,
588 gc_interval_secs: 3600,
589 poll_interval_secs: 5,
590 lock_timeout_mins: 5,
591 idempotency_strategy: IdempotencyStrategy::None,
592 dlq_threshold: 10,
593 dlq_interval_secs: 1,
594 }
595 }
596
597 #[rstest]
598 #[tokio::test]
599 async fn shutdown_set_before_run_exits_without_fetch() {
600 let mut storage_mock = MockOutboxStorage::<SomeDomainEvent>::new();
601 let transport_mock = MockTransport::<SomeDomainEvent>::new();
602 let (shutdown_tx, shutdown_rx) = watch::channel(false);
603 let _ = shutdown_tx.send(true);
604
605 storage_mock
606 .expect_wait_for_notification()
607 .returning(|_| Ok(()));
608 storage_mock.expect_delete_garbage().returning(|| Ok(()));
609 storage_mock.expect_fetch_next_to_process().times(0);
610 storage_mock.expect_update_status().times(0);
611
612 #[cfg(feature = "dlq")]
613 let manager = OutboxManagerBuilder::new()
614 .storage(Arc::new(storage_mock))
615 .publisher(Arc::new(transport_mock))
616 .config(Arc::new(default_config()))
617 .dlq_heap(Arc::new(MockDlqHeap::new()))
618 .shutdown_rx(shutdown_rx)
619 .build()
620 .unwrap();
621 #[cfg(not(feature = "dlq"))]
622 let manager = OutboxManagerBuilder::new()
623 .storage(Arc::new(storage_mock))
624 .publisher(Arc::new(transport_mock))
625 .config(Arc::new(default_config()))
626 .shutdown_rx(shutdown_rx)
627 .build()
628 .unwrap();
629
630 let result = tokio::time::timeout(tokio::time::Duration::from_secs(1), manager.run())
631 .await
632 .expect("manager did not stop in time");
633 assert!(result.is_ok());
634 drop(shutdown_tx);
635 }
636
637 #[rstest]
638 #[tokio::test]
639 async fn inner_loop_drains_until_empty_batch() {
640 let mut storage_mock = MockOutboxStorage::<SomeDomainEvent>::new();
641 let mut transport_mock = MockTransport::<SomeDomainEvent>::new();
642 let (shutdown_tx, shutdown_rx) = watch::channel(false);
643
644 storage_mock
645 .expect_wait_for_notification()
646 .returning(|_| Ok(()));
647 storage_mock.expect_delete_garbage().returning(|| Ok(()));
648
649 let mut seq = Sequence::new();
650 storage_mock
651 .expect_fetch_next_to_process()
652 .times(1)
653 .in_sequence(&mut seq)
654 .returning(|_| {
655 Ok(vec![
656 Event::new(
657 EventType::new("a1"),
658 Payload::new(SomeDomainEvent::SomeEvent("a1".into())),
659 None,
660 ),
661 Event::new(
662 EventType::new("a2"),
663 Payload::new(SomeDomainEvent::SomeEvent("a2".into())),
664 None,
665 ),
666 ])
667 });
668 storage_mock
669 .expect_fetch_next_to_process()
670 .times(1)
671 .in_sequence(&mut seq)
672 .returning(|_| {
673 Ok(vec![Event::new(
674 EventType::new("b1"),
675 Payload::new(SomeDomainEvent::SomeEvent("b1".into())),
676 None,
677 )])
678 });
679 storage_mock
680 .expect_fetch_next_to_process()
681 .times(1)
682 .in_sequence(&mut seq)
683 .returning(move |_| {
684 let _ = shutdown_tx.send(true);
685 Ok(vec![])
686 });
687 storage_mock
688 .expect_fetch_next_to_process()
689 .returning(|_| Ok(vec![]));
690
691 storage_mock
692 .expect_update_status()
693 .times(2)
694 .returning(|_, _| Ok(()));
695
696 transport_mock
697 .expect_publish()
698 .times(3)
699 .returning(|_| Ok(()));
700
701 #[cfg(feature = "dlq")]
702 let manager = {
703 let mut dlq = MockDlqHeap::new();
704 dlq.expect_record_success().returning(|_| Ok(()));
705 dlq.expect_record_failure().returning(|_| Ok(()));
706 OutboxManagerBuilder::new()
707 .storage(Arc::new(storage_mock))
708 .publisher(Arc::new(transport_mock))
709 .config(Arc::new(default_config()))
710 .dlq_heap(Arc::new(dlq))
711 .shutdown_rx(shutdown_rx)
712 .build()
713 .unwrap()
714 };
715 #[cfg(not(feature = "dlq"))]
716 let manager = OutboxManagerBuilder::new()
717 .storage(Arc::new(storage_mock))
718 .publisher(Arc::new(transport_mock))
719 .config(Arc::new(default_config()))
720 .shutdown_rx(shutdown_rx)
721 .build()
722 .unwrap();
723
724 let handle = tokio::spawn(async move { manager.run().await.unwrap() });
725 tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
726 .await
727 .expect("manager did not stop in time")
728 .unwrap();
729 }
730
731 #[rstest]
732 #[tokio::test(start_paused = true)]
733 async fn fetch_error_inside_loop_is_recoverable() {
734 let mut storage_mock = MockOutboxStorage::<SomeDomainEvent>::new();
735 let transport_mock = MockTransport::<SomeDomainEvent>::new();
736 let (shutdown_tx, shutdown_rx) = watch::channel(false);
737
738 storage_mock
739 .expect_wait_for_notification()
740 .returning(|_| Ok(()));
741 storage_mock.expect_delete_garbage().returning(|| Ok(()));
742
743 let mut seq = Sequence::new();
744 storage_mock
745 .expect_fetch_next_to_process()
746 .times(1)
747 .in_sequence(&mut seq)
748 .returning(|_| Err(OutboxError::DatabaseError("transient".into())));
749 storage_mock
750 .expect_fetch_next_to_process()
751 .in_sequence(&mut seq)
752 .returning(move |_| {
753 let _ = shutdown_tx.send(true);
754 Ok(vec![])
755 });
756 storage_mock
757 .expect_fetch_next_to_process()
758 .returning(|_| Ok(vec![]));
759
760 storage_mock.expect_update_status().times(0);
761
762 #[cfg(feature = "dlq")]
763 let manager = OutboxManagerBuilder::new()
764 .storage(Arc::new(storage_mock))
765 .publisher(Arc::new(transport_mock))
766 .config(Arc::new(default_config()))
767 .dlq_heap(Arc::new(MockDlqHeap::new()))
768 .shutdown_rx(shutdown_rx)
769 .build()
770 .unwrap();
771 #[cfg(not(feature = "dlq"))]
772 let manager = OutboxManagerBuilder::new()
773 .storage(Arc::new(storage_mock))
774 .publisher(Arc::new(transport_mock))
775 .config(Arc::new(default_config()))
776 .shutdown_rx(shutdown_rx)
777 .build()
778 .unwrap();
779
780 let result = tokio::time::timeout(tokio::time::Duration::from_secs(30), manager.run())
781 .await
782 .expect("manager did not stop in time");
783 assert!(result.is_ok());
784 }
785}