1use std::collections::HashMap;
2use std::future::Future;
3use std::marker::PhantomData;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use hexeract_core::CorrelationId;
9use hexeract_core::HandlerContext;
10use hexeract_core::MessageId;
11use tokio_util::sync::CancellationToken;
12use uuid::Uuid;
13
14use crate::Event;
15use crate::Handler;
16use crate::OutboxEnvelope;
17use crate::OutboxError;
18
19pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
21
22#[async_trait::async_trait]
36pub trait OutboxStore: Send + Sync + 'static {
37 type Client: Send;
39 type Tx<'tx>: Send
41 where
42 Self: 'tx;
43
44 async fn acquire(&self) -> Result<Self::Client, OutboxError>;
46
47 async fn begin<'a>(&self, client: &'a mut Self::Client) -> Result<Self::Tx<'a>, OutboxError>;
49
50 async fn poll<'a>(
55 &self,
56 tx: &mut Self::Tx<'a>,
57 batch_size: usize,
58 max_attempts: u32,
59 ) -> Result<Vec<OutboxEnvelope>, OutboxError>;
60
61 async fn mark_delivered<'a>(
63 &self,
64 tx: &mut Self::Tx<'a>,
65 event_id: Uuid,
66 ) -> Result<(), OutboxError>;
67
68 async fn mark_failed<'a>(
76 &self,
77 tx: &mut Self::Tx<'a>,
78 event_id: Uuid,
79 error: &str,
80 retry_in: Duration,
81 ) -> Result<(), OutboxError>;
82
83 async fn commit<'a>(&self, tx: Self::Tx<'a>) -> Result<(), OutboxError>;
85
86 async fn mark_dead_lettered<'a>(
94 &self,
95 _tx: &mut Self::Tx<'a>,
96 _event_id: Uuid,
97 _error: &str,
98 ) -> Result<(), OutboxError> {
99 Ok(())
100 }
101
102 async fn claim<'a>(
122 &self,
123 _tx: &mut Self::Tx<'a>,
124 _event_ids: &[Uuid],
125 _lease_for: Duration,
126 ) -> Result<(), OutboxError> {
127 Ok(())
128 }
129}
130
131pub trait ErasedHandler: Send + Sync + 'static {
137 fn event_type(&self) -> &'static str;
139
140 fn handle<'a>(
142 &'a self,
143 envelope: &'a OutboxEnvelope,
144 ctx: &'a HandlerContext,
145 ) -> BoxFuture<'a, Result<(), OutboxError>>;
146}
147
148pub struct TypedHandler<E, H>
150where
151 E: Event,
152 H: Handler<E>,
153{
154 handler: Arc<H>,
155 _phantom: PhantomData<fn() -> E>,
156}
157
158impl<E, H> TypedHandler<E, H>
159where
160 E: Event,
161 H: Handler<E>,
162{
163 #[must_use]
165 pub fn new(handler: H) -> Self {
166 Self {
167 handler: Arc::new(handler),
168 _phantom: PhantomData,
169 }
170 }
171
172 #[must_use]
174 pub fn shared(handler: Arc<H>) -> Self {
175 Self {
176 handler,
177 _phantom: PhantomData,
178 }
179 }
180}
181
182impl<E, H> ErasedHandler for TypedHandler<E, H>
183where
184 E: Event,
185 H: Handler<E>,
186{
187 fn event_type(&self) -> &'static str {
188 E::EVENT_TYPE
189 }
190
191 fn handle<'a>(
192 &'a self,
193 envelope: &'a OutboxEnvelope,
194 ctx: &'a HandlerContext,
195 ) -> BoxFuture<'a, Result<(), OutboxError>> {
196 Box::pin(async move {
197 let event: E = envelope.decode()?;
198 self.handler.handle(event, ctx).await.map_err(Into::into)
199 })
200 }
201}
202
203#[derive(Debug, Clone)]
205pub struct OutboxWorkerConfig {
206 pub poll_interval: Duration,
208 pub batch_size: usize,
210 pub max_attempts: u32,
212 pub retry_base_delay: Duration,
221 pub retry_max_delay: Duration,
226 pub jitter: bool,
233 pub min_cycle_delay: Duration,
241 pub dispatch_timeout: Duration,
264}
265
266impl Default for OutboxWorkerConfig {
267 fn default() -> Self {
268 Self {
269 poll_interval: Duration::from_millis(100),
270 batch_size: 10,
271 max_attempts: 5,
272 retry_base_delay: Duration::from_secs(1),
273 retry_max_delay: Duration::from_secs(300),
274 jitter: true,
275 min_cycle_delay: Duration::from_millis(5),
276 dispatch_timeout: Duration::from_secs(30),
277 }
278 }
279}
280
281impl OutboxWorkerConfig {
282 #[must_use]
292 pub fn next_retry_delay(&self, attempts: u32) -> Duration {
293 let factor = 1u32.checked_shl(attempts).unwrap_or(u32::MAX);
294 let capped = self
295 .retry_base_delay
296 .saturating_mul(factor)
297 .min(self.retry_max_delay);
298 if self.jitter {
299 let nanos = capped.as_nanos();
300 let nanos_u64 = u64::try_from(nanos).unwrap_or(u64::MAX);
303 Duration::from_nanos(fastrand::u64(0..=nanos_u64))
304 } else {
305 capped
306 }
307 }
308}
309
310pub struct OutboxWorker<S>
318where
319 S: OutboxStore,
320{
321 store: S,
322 handlers: Arc<HashMap<&'static str, Arc<dyn ErasedHandler>>>,
323 config: OutboxWorkerConfig,
324}
325
326impl<S> OutboxWorker<S>
327where
328 S: OutboxStore,
329{
330 #[must_use]
332 pub fn new(
333 store: S,
334 handlers: HashMap<&'static str, Arc<dyn ErasedHandler>>,
335 config: OutboxWorkerConfig,
336 ) -> Self {
337 Self {
338 store,
339 handlers: Arc::new(handlers),
340 config,
341 }
342 }
343
344 pub fn run(
361 self,
362 cancel: CancellationToken,
363 ) -> Pin<Box<dyn Future<Output = Result<(), OutboxError>> + Send>>
364 where
365 for<'a> S::Tx<'a>: Send,
366 {
367 Box::pin(async move {
368 while !cancel.is_cancelled() {
369 let sleep_for = match self.poll_cycle(&cancel).await {
370 Ok(0) => Some(self.config.poll_interval),
371 Ok(_) => {
372 if self.config.min_cycle_delay.is_zero() {
373 None
374 } else {
375 Some(self.config.min_cycle_delay)
376 }
377 }
378 Err(err) => {
379 tracing::error!(
380 error = ?err,
381 "outbox worker poll cycle failed, sleeping before retry"
382 );
383 Some(self.config.poll_interval)
384 }
385 };
386 if let Some(delay) = sleep_for {
387 tokio::select! {
391 () = tokio::time::sleep(delay) => {}
392 () = cancel.cancelled() => break,
393 }
394 }
395 }
396 Ok(())
397 })
398 }
399
400 async fn poll_cycle(&self, cancel: &CancellationToken) -> Result<usize, OutboxError> {
401 let envelopes = {
406 let mut client = self.store.acquire().await?;
407 let mut tx = self.store.begin(&mut client).await?;
408 let batch = self
409 .store
410 .poll(&mut tx, self.config.batch_size, self.config.max_attempts)
411 .await?;
412 if !batch.is_empty() {
413 let ids: Vec<Uuid> = batch.iter().map(|e| e.event_id).collect();
414 self.store
415 .claim(&mut tx, &ids, self.lease_for(ids.len()))
416 .await?;
417 }
418 self.store.commit(tx).await?;
419 batch
420 };
421 let count = envelopes.len();
422
423 for envelope in &envelopes {
425 if let Err(err) = self.dispatch_and_settle(envelope, cancel).await {
430 tracing::error!(
431 event_id = %envelope.event_id,
432 event_type = %envelope.event_type,
433 error = ?err,
434 "failed to settle outbox envelope; continuing with the rest of the batch"
435 );
436 }
437 }
438
439 Ok(count)
440 }
441
442 fn lease_for(&self, batch_len: usize) -> Duration {
451 let factor = u32::try_from(batch_len.max(1)).unwrap_or(u32::MAX);
452 self.config.dispatch_timeout.saturating_mul(factor)
453 }
454
455 async fn dispatch_and_settle(
458 &self,
459 envelope: &OutboxEnvelope,
460 cancel: &CancellationToken,
461 ) -> Result<(), OutboxError> {
462 match self.dispatch(envelope, cancel).await {
463 Ok(()) => {
464 let mut client = self.store.acquire().await?;
465 let mut tx = self.store.begin(&mut client).await?;
466 self.store
467 .mark_delivered(&mut tx, envelope.event_id)
468 .await?;
469 self.store.commit(tx).await?;
470 }
471 Err(err) => {
472 let message = err.to_string();
473 tracing::warn!(
474 event_id = %envelope.event_id,
475 event_type = %envelope.event_type,
476 error = %message,
477 "outbox handler dispatch failed"
478 );
479 let retry_in = self.config.next_retry_delay(envelope.attempts);
480 let mut client = self.store.acquire().await?;
481 let mut tx = self.store.begin(&mut client).await?;
482 self.store
483 .mark_failed(&mut tx, envelope.event_id, &message, retry_in)
484 .await?;
485 if envelope.attempts + 1 >= self.config.max_attempts {
486 tracing::error!(
487 event_id = %envelope.event_id,
488 event_type = %envelope.event_type,
489 attempts = envelope.attempts + 1,
490 "outbox envelope exhausted retry budget, moving to dead letter"
491 );
492 self.store
493 .mark_dead_lettered(&mut tx, envelope.event_id, &message)
494 .await?;
495 }
496 self.store.commit(tx).await?;
497 }
498 }
499 Ok(())
500 }
501
502 async fn dispatch(
503 &self,
504 envelope: &OutboxEnvelope,
505 cancel: &CancellationToken,
506 ) -> Result<(), OutboxError> {
507 let Some(handler) = self.handlers.get(envelope.event_type.as_str()) else {
508 return Err(OutboxError::MissingHandler {
509 event_type: envelope.event_type.clone(),
510 });
511 };
512
513 let ctx = HandlerContext::new(
521 MessageId::from(envelope.event_id),
522 CorrelationId::from(envelope.event_id),
523 )
524 .with_cancellation(cancel.child_token());
525
526 tracing::debug!(
527 event_id = %envelope.event_id,
528 event_type = %envelope.event_type,
529 "dispatching outbox envelope"
530 );
531
532 match tokio::time::timeout(self.config.dispatch_timeout, handler.handle(envelope, &ctx))
537 .await
538 {
539 Ok(result) => result,
540 Err(_elapsed) => {
541 ctx.cancellation.cancel();
542 Err(OutboxError::DispatchTimeout {
543 event_id: envelope.event_id,
544 event_type: envelope.event_type.clone(),
545 timeout: self.config.dispatch_timeout,
546 })
547 }
548 }
549 }
550}
551
552#[cfg(test)]
553mod tests {
554 use super::*;
555 use serde::Deserialize;
556 use serde::Serialize;
557 use std::sync::Mutex;
558 use std::sync::atomic::AtomicBool;
559 use std::sync::atomic::Ordering;
560
561 #[derive(Debug, Serialize, Deserialize, PartialEq)]
562 struct UserRegistered {
563 user_id: Uuid,
564 }
565
566 impl Event for UserRegistered {
567 const EVENT_TYPE: &'static str = "users.registered";
568 }
569
570 struct RecordingHandler {
571 seen: Arc<Mutex<Vec<Uuid>>>,
572 }
573
574 impl Handler<UserRegistered> for RecordingHandler {
575 type Error = OutboxError;
576 async fn handle(
577 &self,
578 event: UserRegistered,
579 _ctx: &HandlerContext,
580 ) -> Result<(), Self::Error> {
581 self.seen.lock().unwrap().push(event.user_id);
582 Ok(())
583 }
584 }
585
586 struct ContextCapturingHandler {
587 captured_ids: Arc<Mutex<Vec<MessageId>>>,
588 }
589
590 impl Handler<UserRegistered> for ContextCapturingHandler {
591 type Error = OutboxError;
592 async fn handle(
593 &self,
594 _event: UserRegistered,
595 ctx: &HandlerContext,
596 ) -> Result<(), Self::Error> {
597 self.captured_ids.lock().unwrap().push(ctx.message_id);
598 Ok(())
599 }
600 }
601
602 struct FailingHandler;
603 impl Handler<UserRegistered> for FailingHandler {
604 type Error = OutboxError;
605 async fn handle(
606 &self,
607 _event: UserRegistered,
608 _ctx: &HandlerContext,
609 ) -> Result<(), Self::Error> {
610 Err(OutboxError::Internal("forced".into()))
611 }
612 }
613
614 fn fresh_envelope(user_id: Uuid) -> OutboxEnvelope {
615 let publisher_test_event = UserRegistered { user_id };
616 OutboxEnvelope::new(Uuid::new_v4(), &publisher_test_event).unwrap()
617 }
618
619 #[tokio::test]
620 async fn typed_handler_decodes_envelope_and_calls_inner_handler() {
621 let seen = Arc::new(Mutex::new(Vec::<Uuid>::new()));
622 let handler = TypedHandler::new(RecordingHandler {
623 seen: Arc::clone(&seen),
624 });
625 let erased: Arc<dyn ErasedHandler> = Arc::new(handler);
626
627 let user_id = Uuid::from_u128(42);
628 let envelope = fresh_envelope(user_id);
629 let ctx = HandlerContext::new(MessageId::new(), CorrelationId::new());
630
631 erased
632 .handle(&envelope, &ctx)
633 .await
634 .expect("erased dispatch must succeed");
635
636 assert_eq!(seen.lock().unwrap().as_slice(), &[user_id]);
637 }
638
639 #[tokio::test]
640 async fn typed_handler_propagates_handler_error_as_outbox_error() {
641 let handler = TypedHandler::new(FailingHandler);
642 let erased: Arc<dyn ErasedHandler> = Arc::new(handler);
643
644 let envelope = fresh_envelope(Uuid::nil());
645 let ctx = HandlerContext::new(MessageId::new(), CorrelationId::new());
646
647 let err = erased.handle(&envelope, &ctx).await.expect_err("must fail");
648 assert!(matches!(err, OutboxError::Internal(_)));
649 }
650
651 #[test]
652 fn typed_handler_reports_event_type_from_const() {
653 let handler = TypedHandler::new(RecordingHandler {
654 seen: Arc::new(Mutex::new(Vec::new())),
655 });
656 assert_eq!(handler.event_type(), "users.registered");
657 }
658
659 #[test]
660 fn default_config_has_expected_values() {
661 let cfg = OutboxWorkerConfig::default();
662 assert_eq!(cfg.poll_interval, Duration::from_millis(100));
663 assert_eq!(cfg.batch_size, 10);
664 assert_eq!(cfg.max_attempts, 5);
665 assert_eq!(cfg.retry_base_delay, Duration::from_secs(1));
666 assert_eq!(cfg.retry_max_delay, Duration::from_secs(300));
667 assert!(cfg.jitter);
668 assert_eq!(cfg.min_cycle_delay, Duration::from_millis(5));
669 assert_eq!(cfg.dispatch_timeout, Duration::from_secs(30));
670 }
671
672 fn deterministic_config(base: Duration, max: Duration) -> OutboxWorkerConfig {
673 OutboxWorkerConfig {
674 retry_base_delay: base,
675 retry_max_delay: max,
676 jitter: false,
677 ..OutboxWorkerConfig::default()
678 }
679 }
680
681 #[test]
682 fn backoff_grows_exponentially_without_jitter() {
683 let cfg = deterministic_config(Duration::from_secs(1), Duration::from_secs(300));
684 assert_eq!(cfg.next_retry_delay(0), Duration::from_secs(1));
685 assert_eq!(cfg.next_retry_delay(1), Duration::from_secs(2));
686 assert_eq!(cfg.next_retry_delay(2), Duration::from_secs(4));
687 assert_eq!(cfg.next_retry_delay(3), Duration::from_secs(8));
688 }
689
690 #[test]
691 fn backoff_caps_at_max_delay() {
692 let cfg = deterministic_config(Duration::from_secs(1), Duration::from_secs(30));
693 assert_eq!(cfg.next_retry_delay(10), Duration::from_secs(30));
694 assert_eq!(cfg.next_retry_delay(100), Duration::from_secs(30));
695 }
696
697 #[test]
698 fn backoff_overflow_safe_for_large_attempts() {
699 let cfg = deterministic_config(Duration::from_secs(1), Duration::from_secs(300));
700 let delay = cfg.next_retry_delay(64);
701 assert_eq!(
702 delay,
703 Duration::from_secs(300),
704 "overflow must saturate at cap"
705 );
706 }
707
708 #[test]
709 fn backoff_jitter_stays_within_bounds() {
710 let base = Duration::from_secs(1);
711 let max = Duration::from_secs(30);
712 let cfg = OutboxWorkerConfig {
713 retry_base_delay: base,
714 retry_max_delay: max,
715 jitter: true,
716 ..OutboxWorkerConfig::default()
717 };
718 for attempts in 0u32..8 {
719 let delay = cfg.next_retry_delay(attempts);
720 let cap = base
721 .saturating_mul(1u32.checked_shl(attempts).unwrap_or(u32::MAX))
722 .min(max);
723 assert!(
724 delay <= cap,
725 "attempt {attempts}: jittered delay {delay:?} must be <= cap {cap:?}"
726 );
727 }
728 }
729
730 #[derive(Clone)]
733 struct PacingStore {
734 pending: Arc<Mutex<Vec<OutboxEnvelope>>>,
735 empty_poll_at: Arc<Mutex<Option<tokio::time::Instant>>>,
736 }
737
738 impl PacingStore {
739 fn new(initial: Vec<OutboxEnvelope>) -> Self {
740 Self {
741 pending: Arc::new(Mutex::new(initial)),
742 empty_poll_at: Arc::new(Mutex::new(None)),
743 }
744 }
745 }
746
747 #[async_trait::async_trait]
748 impl OutboxStore for PacingStore {
749 type Client = MockClient;
750 type Tx<'tx> = MockTx;
751
752 async fn acquire(&self) -> Result<Self::Client, OutboxError> {
753 Ok(MockClient)
754 }
755
756 async fn begin<'a>(
757 &self,
758 _client: &'a mut Self::Client,
759 ) -> Result<Self::Tx<'a>, OutboxError> {
760 Ok(MockTx)
761 }
762
763 async fn poll<'a>(
764 &self,
765 _tx: &mut Self::Tx<'a>,
766 batch_size: usize,
767 _max_attempts: u32,
768 ) -> Result<Vec<OutboxEnvelope>, OutboxError> {
769 let mut pending = self.pending.lock().unwrap();
770 let take = batch_size.min(pending.len());
771 let batch: Vec<OutboxEnvelope> = pending.drain(..take).collect();
772 if batch.is_empty() {
773 let mut slot = self.empty_poll_at.lock().unwrap();
774 if slot.is_none() {
775 *slot = Some(tokio::time::Instant::now());
776 }
777 }
778 Ok(batch)
779 }
780
781 async fn mark_delivered<'a>(
782 &self,
783 _tx: &mut Self::Tx<'a>,
784 _event_id: Uuid,
785 ) -> Result<(), OutboxError> {
786 Ok(())
787 }
788
789 async fn mark_failed<'a>(
790 &self,
791 _tx: &mut Self::Tx<'a>,
792 _event_id: Uuid,
793 _error: &str,
794 _retry_in: Duration,
795 ) -> Result<(), OutboxError> {
796 Ok(())
797 }
798
799 async fn commit<'a>(&self, _tx: Self::Tx<'a>) -> Result<(), OutboxError> {
800 Ok(())
801 }
802 }
803
804 #[tokio::test(start_paused = true)]
805 async fn run_paces_consecutive_non_empty_cycles() {
806 let non_empty_cycles: u32 = 4;
807 let envelopes: Vec<OutboxEnvelope> = (0..non_empty_cycles)
808 .map(|i| fresh_envelope(Uuid::from_u128(u128::from(i) + 1)))
809 .collect();
810 let store = PacingStore::new(envelopes);
811 let empty_poll_at = Arc::clone(&store.empty_poll_at);
812
813 let delay = Duration::from_millis(10);
814 let config = OutboxWorkerConfig {
815 poll_interval: Duration::from_secs(3600),
816 batch_size: 1,
817 min_cycle_delay: delay,
818 ..OutboxWorkerConfig::default()
819 };
820
821 let registry = registry_with(vec![Arc::new(TypedHandler::new(RecordingHandler {
822 seen: Arc::new(Mutex::new(Vec::new())),
823 }))]);
824 let worker = OutboxWorker::new(store, registry, config);
825
826 let cancel = CancellationToken::new();
827 let start = tokio::time::Instant::now();
828 let join = tokio::spawn(worker.run(cancel.clone()));
829
830 tokio::time::sleep(delay * non_empty_cycles + Duration::from_millis(1)).await;
831 cancel.cancel();
832 join.await.unwrap().unwrap();
833
834 let empty_at = empty_poll_at
835 .lock()
836 .unwrap()
837 .expect("the loop should have reached the empty poll");
838 assert_eq!(
839 empty_at.duration_since(start),
840 delay * non_empty_cycles,
841 "each non-empty cycle must be paced by min_cycle_delay before the empty poll"
842 );
843 }
844
845 #[derive(Clone)]
849 struct MockStore {
850 pending: Arc<Mutex<Vec<OutboxEnvelope>>>,
851 delivered: Arc<Mutex<Vec<Uuid>>>,
852 failed: Arc<Mutex<Vec<(Uuid, String)>>>,
853 dead_lettered: Arc<Mutex<Vec<Uuid>>>,
854 claimed: Arc<Mutex<Vec<Uuid>>>,
855 fail_claim: Arc<AtomicBool>,
856 fail_mark_delivered_for: Arc<Mutex<Option<Uuid>>>,
859 }
860
861 impl MockStore {
862 fn new(initial: Vec<OutboxEnvelope>) -> Self {
863 Self {
864 pending: Arc::new(Mutex::new(initial)),
865 delivered: Arc::new(Mutex::new(Vec::new())),
866 failed: Arc::new(Mutex::new(Vec::new())),
867 dead_lettered: Arc::new(Mutex::new(Vec::new())),
868 claimed: Arc::new(Mutex::new(Vec::new())),
869 fail_claim: Arc::new(AtomicBool::new(false)),
870 fail_mark_delivered_for: Arc::new(Mutex::new(None)),
871 }
872 }
873 }
874
875 struct MockClient;
876 struct MockTx;
877
878 #[async_trait::async_trait]
879 impl OutboxStore for MockStore {
880 type Client = MockClient;
881 type Tx<'tx> = MockTx;
882
883 async fn acquire(&self) -> Result<Self::Client, OutboxError> {
884 Ok(MockClient)
885 }
886
887 async fn begin<'a>(
888 &self,
889 _client: &'a mut Self::Client,
890 ) -> Result<Self::Tx<'a>, OutboxError> {
891 Ok(MockTx)
892 }
893
894 async fn poll<'a>(
895 &self,
896 _tx: &mut Self::Tx<'a>,
897 batch_size: usize,
898 _max_attempts: u32,
899 ) -> Result<Vec<OutboxEnvelope>, OutboxError> {
900 let mut pending = self.pending.lock().unwrap();
901 let take = batch_size.min(pending.len());
902 Ok(pending.drain(..take).collect())
903 }
904
905 async fn mark_delivered<'a>(
906 &self,
907 _tx: &mut Self::Tx<'a>,
908 event_id: Uuid,
909 ) -> Result<(), OutboxError> {
910 {
911 let mut slot = self.fail_mark_delivered_for.lock().unwrap();
912 if *slot == Some(event_id) {
913 *slot = None;
914 return Err(OutboxError::PoolTimeout);
915 }
916 }
917 self.delivered.lock().unwrap().push(event_id);
918 Ok(())
919 }
920
921 async fn mark_failed<'a>(
922 &self,
923 _tx: &mut Self::Tx<'a>,
924 event_id: Uuid,
925 error: &str,
926 _retry_in: Duration,
927 ) -> Result<(), OutboxError> {
928 self.failed
929 .lock()
930 .unwrap()
931 .push((event_id, error.to_owned()));
932 Ok(())
933 }
934
935 async fn mark_dead_lettered<'a>(
936 &self,
937 _tx: &mut Self::Tx<'a>,
938 event_id: Uuid,
939 _error: &str,
940 ) -> Result<(), OutboxError> {
941 self.dead_lettered.lock().unwrap().push(event_id);
942 Ok(())
943 }
944
945 async fn commit<'a>(&self, _tx: Self::Tx<'a>) -> Result<(), OutboxError> {
946 Ok(())
947 }
948
949 async fn claim<'a>(
950 &self,
951 _tx: &mut Self::Tx<'a>,
952 event_ids: &[Uuid],
953 _lease_for: Duration,
954 ) -> Result<(), OutboxError> {
955 if self.fail_claim.load(Ordering::Relaxed) {
956 return Err(OutboxError::Internal("claim failed".into()));
957 }
958 self.claimed.lock().unwrap().extend_from_slice(event_ids);
959 Ok(())
960 }
961 }
962
963 fn registry_with(
964 handlers: Vec<Arc<dyn ErasedHandler>>,
965 ) -> HashMap<&'static str, Arc<dyn ErasedHandler>> {
966 let mut map = HashMap::new();
967 for handler in handlers {
968 map.insert(handler.event_type(), handler);
969 }
970 map
971 }
972
973 #[tokio::test]
974 async fn worker_dispatches_pending_envelopes_and_marks_delivered() {
975 let envelopes = vec![
976 fresh_envelope(Uuid::from_u128(1)),
977 fresh_envelope(Uuid::from_u128(2)),
978 ];
979 let event_ids: Vec<Uuid> = envelopes.iter().map(|e| e.event_id).collect();
980 let store = MockStore::new(envelopes);
981
982 let seen = Arc::new(Mutex::new(Vec::new()));
983 let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(RecordingHandler {
984 seen: Arc::clone(&seen),
985 }));
986 let registry = registry_with(vec![handler]);
987
988 let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
989 let cancel = CancellationToken::new();
990 let join = tokio::spawn(worker.run(cancel.clone()));
991
992 tokio::time::sleep(Duration::from_millis(200)).await;
993 cancel.cancel();
994 join.await.unwrap().unwrap();
995
996 assert_eq!(seen.lock().unwrap().len(), 2);
997 assert_eq!(
998 store.delivered.lock().unwrap().as_slice(),
999 event_ids.as_slice()
1000 );
1001 assert!(store.failed.lock().unwrap().is_empty());
1002 }
1003
1004 #[tokio::test]
1005 async fn worker_marks_failed_when_handler_errors() {
1006 let envelope = fresh_envelope(Uuid::from_u128(1));
1007 let event_id = envelope.event_id;
1008 let store = MockStore::new(vec![envelope]);
1009
1010 let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(FailingHandler));
1011 let registry = registry_with(vec![handler]);
1012
1013 let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
1014 let cancel = CancellationToken::new();
1015 let join = tokio::spawn(worker.run(cancel.clone()));
1016
1017 tokio::time::sleep(Duration::from_millis(200)).await;
1018 cancel.cancel();
1019 join.await.unwrap().unwrap();
1020
1021 assert!(store.delivered.lock().unwrap().is_empty());
1022 let failed = store.failed.lock().unwrap();
1023 assert_eq!(failed.len(), 1);
1024 assert_eq!(failed[0].0, event_id);
1025 assert!(failed[0].1.contains("forced"));
1026 }
1027
1028 #[tokio::test]
1029 async fn worker_marks_failed_when_no_handler_registered() {
1030 let envelope = fresh_envelope(Uuid::from_u128(1));
1031 let event_id = envelope.event_id;
1032 let store = MockStore::new(vec![envelope]);
1033
1034 let registry = HashMap::new();
1035
1036 let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
1037 let cancel = CancellationToken::new();
1038 let join = tokio::spawn(worker.run(cancel.clone()));
1039
1040 tokio::time::sleep(Duration::from_millis(200)).await;
1041 cancel.cancel();
1042 join.await.unwrap().unwrap();
1043
1044 let failed = store.failed.lock().unwrap();
1045 assert_eq!(failed.len(), 1);
1046 assert_eq!(failed[0].0, event_id);
1047 assert!(failed[0].1.contains("no handler"));
1048 }
1049
1050 #[tokio::test]
1051 async fn worker_dead_letters_envelope_when_max_attempts_exhausted() {
1052 let envelope = fresh_envelope(Uuid::from_u128(1));
1053 let event_id = envelope.event_id;
1054 let store = MockStore::new(vec![envelope]);
1055
1056 let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(FailingHandler));
1057 let registry = registry_with(vec![handler]);
1058
1059 let config = OutboxWorkerConfig {
1060 max_attempts: 1,
1061 batch_size: 1,
1062 ..OutboxWorkerConfig::default()
1063 };
1064 let worker = OutboxWorker::new(store.clone(), registry, config);
1065 let cancel = CancellationToken::new();
1066 let join = tokio::spawn(worker.run(cancel.clone()));
1067
1068 tokio::time::sleep(Duration::from_millis(200)).await;
1069 cancel.cancel();
1070 join.await.unwrap().unwrap();
1071
1072 let failed = store.failed.lock().unwrap();
1073 assert_eq!(failed.len(), 1);
1074 assert_eq!(failed[0].0, event_id);
1075 let dead = store.dead_lettered.lock().unwrap();
1076 assert_eq!(dead.as_slice(), &[event_id]);
1077 }
1078
1079 #[tokio::test]
1080 async fn worker_does_not_dead_letter_before_attempts_exhausted() {
1081 let envelope = fresh_envelope(Uuid::from_u128(1));
1082 let event_id = envelope.event_id;
1083 let store = MockStore::new(vec![envelope]);
1084
1085 let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(FailingHandler));
1086 let registry = registry_with(vec![handler]);
1087
1088 let config = OutboxWorkerConfig {
1089 max_attempts: 3,
1090 batch_size: 1,
1091 ..OutboxWorkerConfig::default()
1092 };
1093 let worker = OutboxWorker::new(store.clone(), registry, config);
1094 let cancel = CancellationToken::new();
1095 let join = tokio::spawn(worker.run(cancel.clone()));
1096
1097 tokio::time::sleep(Duration::from_millis(200)).await;
1098 cancel.cancel();
1099 join.await.unwrap().unwrap();
1100
1101 let failed = store.failed.lock().unwrap();
1102 assert_eq!(failed.len(), 1);
1103 assert_eq!(failed[0].0, event_id);
1104 assert!(
1105 store.dead_lettered.lock().unwrap().is_empty(),
1106 "should not dead-letter when attempts(1) < max_attempts(3)"
1107 );
1108 }
1109
1110 #[tokio::test]
1111 async fn worker_claims_envelopes_before_dispatch() {
1112 let envelopes = vec![
1113 fresh_envelope(Uuid::from_u128(1)),
1114 fresh_envelope(Uuid::from_u128(2)),
1115 ];
1116 let expected_ids: Vec<Uuid> = envelopes.iter().map(|e| e.event_id).collect();
1117 let store = MockStore::new(envelopes);
1118
1119 let seen = Arc::new(Mutex::new(Vec::new()));
1120 let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(RecordingHandler {
1121 seen: Arc::clone(&seen),
1122 }));
1123 let registry = registry_with(vec![handler]);
1124
1125 let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
1126 let cancel = CancellationToken::new();
1127 let join = tokio::spawn(worker.run(cancel.clone()));
1128
1129 tokio::time::sleep(Duration::from_millis(200)).await;
1130 cancel.cancel();
1131 join.await.unwrap().unwrap();
1132
1133 let claimed = store.claimed.lock().unwrap();
1134 assert_eq!(
1135 claimed.as_slice(),
1136 expected_ids.as_slice(),
1137 "claim must be called with all polled ids"
1138 );
1139 }
1140
1141 #[tokio::test]
1142 async fn worker_does_not_claim_when_batch_is_empty() {
1143 let store = MockStore::new(Vec::new());
1144 let registry = HashMap::new();
1145
1146 let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
1147 let cancel = CancellationToken::new();
1148 let join = tokio::spawn(worker.run(cancel.clone()));
1149
1150 tokio::time::sleep(Duration::from_millis(150)).await;
1151 cancel.cancel();
1152 join.await.unwrap().unwrap();
1153
1154 assert!(
1155 store.claimed.lock().unwrap().is_empty(),
1156 "claim must not be called when no envelopes are polled"
1157 );
1158 }
1159
1160 #[tokio::test]
1161 async fn worker_aborts_poll_cycle_when_claim_fails_without_dispatching() {
1162 let envelope = fresh_envelope(Uuid::from_u128(1));
1163 let store = MockStore::new(vec![envelope]);
1164 store.fail_claim.store(true, Ordering::Relaxed);
1165
1166 let seen = Arc::new(Mutex::new(Vec::new()));
1167 let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(RecordingHandler {
1168 seen: Arc::clone(&seen),
1169 }));
1170 let registry = registry_with(vec![handler]);
1171
1172 let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
1173 let cancel = CancellationToken::new();
1174 let join = tokio::spawn(worker.run(cancel.clone()));
1175
1176 tokio::time::sleep(Duration::from_millis(200)).await;
1177 cancel.cancel();
1178 join.await.unwrap().unwrap();
1179
1180 assert!(
1181 seen.lock().unwrap().is_empty(),
1182 "handler must not be called when claim fails"
1183 );
1184 assert!(
1185 store.delivered.lock().unwrap().is_empty(),
1186 "envelope must not be marked delivered when claim fails"
1187 );
1188 assert!(
1189 store.claimed.lock().unwrap().is_empty(),
1190 "claim ids must not be recorded when claim returns an error"
1191 );
1192 }
1193
1194 #[tokio::test]
1195 async fn worker_derives_context_ids_from_event_id_stable_across_retries() {
1196 let event_id = Uuid::from_u128(99);
1197 let e1 = OutboxEnvelope::new(
1198 event_id,
1199 &UserRegistered {
1200 user_id: Uuid::from_u128(1),
1201 },
1202 )
1203 .unwrap();
1204 let e2 = OutboxEnvelope::new(
1205 event_id,
1206 &UserRegistered {
1207 user_id: Uuid::from_u128(2),
1208 },
1209 )
1210 .unwrap();
1211 let store = MockStore::new(vec![e1, e2]);
1212
1213 let captured_ids = Arc::new(Mutex::new(Vec::new()));
1214 let handler: Arc<dyn ErasedHandler> =
1215 Arc::new(TypedHandler::new(ContextCapturingHandler {
1216 captured_ids: Arc::clone(&captured_ids),
1217 }));
1218 let registry = registry_with(vec![handler]);
1219
1220 let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
1221 let cancel = CancellationToken::new();
1222 let join = tokio::spawn(worker.run(cancel.clone()));
1223
1224 tokio::time::sleep(Duration::from_millis(200)).await;
1225 cancel.cancel();
1226 join.await.unwrap().unwrap();
1227
1228 let ids = captured_ids.lock().unwrap();
1229 assert_eq!(ids.len(), 2, "both envelopes must be dispatched");
1230 assert_eq!(
1231 ids[0], ids[1],
1232 "message_id must be identical across dispatches of the same event_id"
1233 );
1234 assert_eq!(
1235 ids[0],
1236 MessageId::from(event_id),
1237 "message_id must equal MessageId::from(event_id)"
1238 );
1239 }
1240
1241 #[tokio::test]
1242 async fn worker_stops_promptly_on_cancellation() {
1243 let store = MockStore::new(Vec::new());
1244 let registry = HashMap::new();
1245 let worker = OutboxWorker::new(store, registry, OutboxWorkerConfig::default());
1246 let cancel = CancellationToken::new();
1247 let join = tokio::spawn(worker.run(cancel.clone()));
1248
1249 cancel.cancel();
1250 let started = std::time::Instant::now();
1251 join.await.unwrap().unwrap();
1252 assert!(
1253 started.elapsed() < Duration::from_secs(1),
1254 "worker took {:?} to stop",
1255 started.elapsed()
1256 );
1257 }
1258
1259 struct HangingHandler;
1262 impl Handler<UserRegistered> for HangingHandler {
1263 type Error = OutboxError;
1264 async fn handle(
1265 &self,
1266 _event: UserRegistered,
1267 _ctx: &HandlerContext,
1268 ) -> Result<(), Self::Error> {
1269 std::future::pending::<()>().await;
1270 unreachable!("hanging handler never resolves")
1271 }
1272 }
1273
1274 #[tokio::test(start_paused = true)]
1275 async fn worker_enforces_dispatch_timeout_on_a_hung_handler() {
1276 let envelope = fresh_envelope(Uuid::from_u128(1));
1279 let event_id = envelope.event_id;
1280 let store = MockStore::new(vec![envelope]);
1281
1282 let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(HangingHandler));
1283 let registry = registry_with(vec![handler]);
1284
1285 let config = OutboxWorkerConfig {
1286 dispatch_timeout: Duration::from_millis(50),
1287 batch_size: 1,
1288 ..OutboxWorkerConfig::default()
1289 };
1290 let worker = OutboxWorker::new(store.clone(), registry, config);
1291 let cancel = CancellationToken::new();
1292 let join = tokio::spawn(worker.run(cancel.clone()));
1293
1294 tokio::time::sleep(Duration::from_millis(500)).await;
1296 cancel.cancel();
1297 join.await.unwrap().unwrap();
1298
1299 let failed = store.failed.lock().unwrap();
1300 assert_eq!(failed.len(), 1, "hung handler must be recorded as failed");
1301 assert_eq!(failed[0].0, event_id);
1302 assert!(
1303 failed[0].1.contains("timed out"),
1304 "failure must be the dispatch timeout, got {:?}",
1305 failed[0].1
1306 );
1307 assert!(
1308 store.delivered.lock().unwrap().is_empty(),
1309 "a timed-out envelope must not be marked delivered"
1310 );
1311 }
1312
1313 #[tokio::test]
1314 async fn worker_settles_remaining_batch_when_one_ack_fails() {
1315 let e1 = fresh_envelope(Uuid::from_u128(1));
1318 let e2 = fresh_envelope(Uuid::from_u128(2));
1319 let id1 = e1.event_id;
1320 let id2 = e2.event_id;
1321 let store = MockStore::new(vec![e1, e2]);
1322 *store.fail_mark_delivered_for.lock().unwrap() = Some(id1);
1323
1324 let seen = Arc::new(Mutex::new(Vec::new()));
1325 let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(RecordingHandler {
1326 seen: Arc::clone(&seen),
1327 }));
1328 let registry = registry_with(vec![handler]);
1329
1330 let config = OutboxWorkerConfig {
1331 batch_size: 2,
1332 ..OutboxWorkerConfig::default()
1333 };
1334 let worker = OutboxWorker::new(store.clone(), registry, config);
1335 let cancel = CancellationToken::new();
1336 let join = tokio::spawn(worker.run(cancel.clone()));
1337
1338 tokio::time::sleep(Duration::from_millis(200)).await;
1339 cancel.cancel();
1340 join.await.unwrap().unwrap();
1341
1342 let delivered = store.delivered.lock().unwrap();
1345 assert!(
1346 delivered.contains(&id2),
1347 "second envelope must be delivered despite the first ack failing, got {delivered:?}"
1348 );
1349 assert!(
1350 seen.lock().unwrap().len() >= 2,
1351 "both envelopes must have been dispatched to the handler"
1352 );
1353 }
1354
1355 #[tokio::test(start_paused = true)]
1356 async fn worker_observes_cancellation_during_a_long_poll_interval() {
1357 let store = MockStore::new(Vec::new());
1360 let registry = HashMap::new();
1361 let config = OutboxWorkerConfig {
1362 poll_interval: Duration::from_secs(3600),
1363 ..OutboxWorkerConfig::default()
1364 };
1365 let worker = OutboxWorker::new(store, registry, config);
1366 let cancel = CancellationToken::new();
1367 let join = tokio::spawn(worker.run(cancel.clone()));
1368
1369 tokio::time::sleep(Duration::from_millis(1)).await;
1371 cancel.cancel();
1372
1373 tokio::time::timeout(Duration::from_secs(1), join)
1376 .await
1377 .expect("worker must stop without waiting out the poll interval")
1378 .unwrap()
1379 .unwrap();
1380 }
1381
1382 #[test]
1383 fn lease_for_scales_with_batch_size() {
1384 let config = OutboxWorkerConfig {
1387 dispatch_timeout: Duration::from_secs(30),
1388 ..OutboxWorkerConfig::default()
1389 };
1390 let store = MockStore::new(Vec::new());
1391 let worker = OutboxWorker::new(store, HashMap::new(), config);
1392
1393 assert_eq!(worker.lease_for(1), Duration::from_secs(30));
1394 assert_eq!(worker.lease_for(10), Duration::from_secs(300));
1395 assert_eq!(worker.lease_for(0), Duration::from_secs(30));
1397 }
1398}