Skip to main content

hexeract_outbox/
worker.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::marker::PhantomData;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use hexeract_core::CorrelationId;
9use hexeract_core::HandlerContext;
10use hexeract_core::MessageId;
11use tokio_util::sync::CancellationToken;
12use uuid::Uuid;
13
14use crate::Event;
15use crate::Handler;
16use crate::OutboxEnvelope;
17use crate::OutboxError;
18
19/// Pinned, boxed, send future returned by trait object methods.
20pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
21
22/// Backend-agnostic contract for the outbox storage operations driven by
23/// [`OutboxWorker`].
24///
25/// The split between [`Self::Client`] and [`Self::Tx`] keeps the trait
26/// idiomatic: the connection guard is owned by the worker's polling
27/// cycle, and the transaction borrows from it for the duration of the
28/// cycle. Backends that follow the `Pool` + `Transaction` pattern
29/// (deadpool_postgres, sqlx, ...) map onto this trait directly.
30///
31/// Implemented via `async_trait` (boxed futures) to work around the
32/// current Rust limitation around HRTB inference on GATs (see
33/// rust-lang/rust#100013). The runtime cost is one heap allocation per
34/// trait method call, negligible at the outbox dispatch cadence.
35#[async_trait::async_trait]
36pub trait OutboxStore: Send + Sync + 'static {
37    /// Pooled connection guard owned by the worker for one polling cycle.
38    type Client: Send;
39    /// Transaction borrowed from a [`Self::Client`].
40    type Tx<'tx>: Send
41    where
42        Self: 'tx;
43
44    /// Acquire a connection from the underlying pool.
45    async fn acquire(&self) -> Result<Self::Client, OutboxError>;
46
47    /// Open a transaction borrowing from the given connection.
48    async fn begin<'a>(&self, client: &'a mut Self::Client) -> Result<Self::Tx<'a>, OutboxError>;
49
50    /// Poll a batch of pending envelopes, locking them via `FOR UPDATE SKIP LOCKED`.
51    ///
52    /// Envelopes that have reached or exceeded `max_attempts`, or whose
53    /// `next_retry_at` is in the future, are excluded.
54    async fn poll<'a>(
55        &self,
56        tx: &mut Self::Tx<'a>,
57        batch_size: usize,
58        max_attempts: u32,
59    ) -> Result<Vec<OutboxEnvelope>, OutboxError>;
60
61    /// Mark an envelope as successfully delivered.
62    async fn mark_delivered<'a>(
63        &self,
64        tx: &mut Self::Tx<'a>,
65        event_id: Uuid,
66    ) -> Result<(), OutboxError>;
67
68    /// Mark an envelope as failed, recording the error and the backoff delay
69    /// before the next retry.
70    ///
71    /// `retry_in` is the delay from *now*; the backend adds it to its own
72    /// database clock when persisting `next_retry_at`, so retry scheduling is
73    /// not affected by skew between the worker host and the database host.
74    /// The attempt counter is advanced by [`Self::claim`], not here.
75    async fn mark_failed<'a>(
76        &self,
77        tx: &mut Self::Tx<'a>,
78        event_id: Uuid,
79        error: &str,
80        retry_in: Duration,
81    ) -> Result<(), OutboxError>;
82
83    /// Commit the transaction.
84    async fn commit<'a>(&self, tx: Self::Tx<'a>) -> Result<(), OutboxError>;
85
86    /// Move an envelope that has exhausted its retry budget to the dead-letter store.
87    ///
88    /// Called by the worker after [`Self::mark_failed`] when
89    /// `attempts + 1 >= max_attempts`. The default implementation is a no-op so
90    /// backends that do not persist dead-lettered envelopes need not override it.
91    /// Called within the same transaction as `mark_failed` — both succeed or both
92    /// roll back.
93    async fn mark_dead_lettered<'a>(
94        &self,
95        _tx: &mut Self::Tx<'a>,
96        _event_id: Uuid,
97        _error: &str,
98    ) -> Result<(), OutboxError> {
99        Ok(())
100    }
101
102    /// Claim a batch of envelopes: advance the soft lease and consume one
103    /// retry slot.
104    ///
105    /// Called within the same transaction as [`Self::poll`], immediately
106    /// after the batch is fetched. The backend sets `next_retry_at` to its own
107    /// database clock plus `lease_for` (so the lease window is immune to
108    /// app/DB clock skew) and increments `attempts` by one. The worker commits
109    /// the transaction right after this call so `FOR UPDATE SKIP LOCKED`
110    /// row-level locks are released promptly. Competing workers skip envelopes
111    /// whose `next_retry_at` is in the future, so the claiming worker can
112    /// dispatch outside the lock without risking concurrent re-delivery.
113    ///
114    /// Incrementing `attempts` here (rather than only on failure) is what
115    /// makes a worker crash between claim and acknowledgement safe: the
116    /// attempt is already counted, so a poison envelope eventually reaches the
117    /// dead-letter threshold instead of being redelivered forever.
118    ///
119    /// The default implementation is a no-op. Backends that neither lease nor
120    /// track attempts in storage need not override it.
121    async fn claim<'a>(
122        &self,
123        _tx: &mut Self::Tx<'a>,
124        _event_ids: &[Uuid],
125        _lease_for: Duration,
126    ) -> Result<(), OutboxError> {
127        Ok(())
128    }
129}
130
131/// Type-erased handler that the worker dispatches to.
132///
133/// Most users do not implement this trait directly; they use
134/// [`TypedHandler`] to adapt a typed [`Handler<E>`] into an erased one
135/// the worker can store in a registry keyed by `event_type`.
136pub trait ErasedHandler: Send + Sync + 'static {
137    /// Event type this handler reacts to, matching [`Event::EVENT_TYPE`].
138    fn event_type(&self) -> &'static str;
139
140    /// Decode the envelope and dispatch to the underlying typed handler.
141    fn handle<'a>(
142        &'a self,
143        envelope: &'a OutboxEnvelope,
144        ctx: &'a HandlerContext,
145    ) -> BoxFuture<'a, Result<(), OutboxError>>;
146}
147
148/// Adapter that lifts a typed [`Handler<E>`] into an [`ErasedHandler`].
149pub struct TypedHandler<E, H>
150where
151    E: Event,
152    H: Handler<E>,
153{
154    handler: Arc<H>,
155    _phantom: PhantomData<fn() -> E>,
156}
157
158impl<E, H> TypedHandler<E, H>
159where
160    E: Event,
161    H: Handler<E>,
162{
163    /// Wrap a freshly owned handler.
164    #[must_use]
165    pub fn new(handler: H) -> Self {
166        Self {
167            handler: Arc::new(handler),
168            _phantom: PhantomData,
169        }
170    }
171
172    /// Wrap a handler already shared behind an `Arc`.
173    #[must_use]
174    pub fn shared(handler: Arc<H>) -> Self {
175        Self {
176            handler,
177            _phantom: PhantomData,
178        }
179    }
180}
181
182impl<E, H> ErasedHandler for TypedHandler<E, H>
183where
184    E: Event,
185    H: Handler<E>,
186{
187    fn event_type(&self) -> &'static str {
188        E::EVENT_TYPE
189    }
190
191    fn handle<'a>(
192        &'a self,
193        envelope: &'a OutboxEnvelope,
194        ctx: &'a HandlerContext,
195    ) -> BoxFuture<'a, Result<(), OutboxError>> {
196        Box::pin(async move {
197            let event: E = envelope.decode()?;
198            self.handler.handle(event, ctx).await.map_err(Into::into)
199        })
200    }
201}
202
203/// Tuning parameters for an [`OutboxWorker`].
204#[derive(Debug, Clone)]
205pub struct OutboxWorkerConfig {
206    /// Sleep duration between empty polls.
207    pub poll_interval: Duration,
208    /// Maximum number of envelopes returned by a single poll.
209    pub batch_size: usize,
210    /// Number of attempts allowed before an envelope stops being polled.
211    pub max_attempts: u32,
212    /// Base delay for the exponential backoff applied after a failed dispatch.
213    ///
214    /// The actual delay before attempt `n` is:
215    /// `min(retry_max_delay, retry_base_delay × 2^n)`
216    ///
217    /// When [`Self::jitter`] is `true` (the default), a uniform random value
218    /// in `[0, computed_delay]` is drawn instead (full-jitter strategy), which
219    /// spreads retries across the window and avoids thundering-herd spikes.
220    pub retry_base_delay: Duration,
221    /// Upper bound on the computed backoff delay.
222    ///
223    /// Caps `retry_base_delay × 2^n` so retries never wait longer than this
224    /// value regardless of the attempt count.
225    pub retry_max_delay: Duration,
226    /// Apply full jitter to the backoff delay.
227    ///
228    /// When `true` (the default), the worker draws a uniform random value in
229    /// `[0, capped_delay]` instead of using the deterministic exponential.
230    /// Set to `false` to get a predictable delay (useful in tests or
231    /// environments where all workers share the same retry schedule).
232    pub jitter: bool,
233    /// Minimum delay applied between consecutive non-empty poll cycles.
234    ///
235    /// A full batch otherwise loops with no delay, busy-spinning the store
236    /// under a sustained backlog. This floor paces back-to-back non-empty
237    /// cycles without affecting the empty-poll path (which still waits for
238    /// [`Self::poll_interval`]). Set it to [`Duration::ZERO`] to disable
239    /// pacing and restore the previous tight-loop behavior.
240    pub min_cycle_delay: Duration,
241    /// Per-envelope handler deadline and soft-lease unit.
242    ///
243    /// Two related roles:
244    ///
245    /// - **Hard timeout.** Each handler invocation is wrapped in a
246    ///   `tokio::time::timeout` of this duration. A handler that exceeds it has
247    ///   its cancellation token signalled and the dispatch is recorded as a
248    ///   failed attempt ([`OutboxError::DispatchTimeout`]), so a hung handler
249    ///   cannot stall the worker.
250    /// - **Lease unit.** After polling a batch, the worker leases the whole
251    ///   batch by setting `next_retry_at` to the database clock plus
252    ///   `batch_size x dispatch_timeout`, then commits immediately. This
253    ///   releases the `FOR UPDATE SKIP LOCKED` row-level locks before dispatch
254    ///   begins while keeping the lease alive across the worst-case sequential
255    ///   dispatch of every envelope in the batch, so competing workers do not
256    ///   re-pick a tail envelope mid-batch.
257    ///
258    /// Set it to the worst-case duration of a *single* handler; the
259    /// `batch_size` multiplier for the lease is applied internally, so do not
260    /// pre-multiply it yourself.
261    ///
262    /// [`OutboxError::DispatchTimeout`]: crate::OutboxError::DispatchTimeout
263    pub dispatch_timeout: Duration,
264}
265
266impl Default for OutboxWorkerConfig {
267    fn default() -> Self {
268        Self {
269            poll_interval: Duration::from_millis(100),
270            batch_size: 10,
271            max_attempts: 5,
272            retry_base_delay: Duration::from_secs(1),
273            retry_max_delay: Duration::from_secs(300),
274            jitter: true,
275            min_cycle_delay: Duration::from_millis(5),
276            dispatch_timeout: Duration::from_secs(30),
277        }
278    }
279}
280
281impl OutboxWorkerConfig {
282    /// Compute the next retry delay for an envelope that has failed `attempts` times.
283    ///
284    /// Uses bounded exponential backoff: `min(retry_max_delay, retry_base_delay × 2^attempts)`.
285    /// When [`Self::jitter`] is `true`, returns a uniform random value in
286    /// `[0, computed_delay]` (full-jitter strategy).
287    ///
288    /// The computation is overflow-safe: `checked_shl` returns `None` when the
289    /// shift overflows, in which case the factor falls back to `u32::MAX`, and
290    /// `Duration::saturating_mul` clamps the result instead of panicking.
291    #[must_use]
292    pub fn next_retry_delay(&self, attempts: u32) -> Duration {
293        let factor = 1u32.checked_shl(attempts).unwrap_or(u32::MAX);
294        let capped = self
295            .retry_base_delay
296            .saturating_mul(factor)
297            .min(self.retry_max_delay);
298        if self.jitter {
299            let nanos = capped.as_nanos();
300            // fastrand::u64 covers the full jitter range; clamp to u64::MAX nanos
301            // (~584 years) to handle theoretical caps beyond that bound.
302            let nanos_u64 = u64::try_from(nanos).unwrap_or(u64::MAX);
303            Duration::from_nanos(fastrand::u64(0..=nanos_u64))
304        } else {
305            capped
306        }
307    }
308}
309
310/// Worker that polls the outbox in a loop and dispatches envelopes to
311/// their registered handlers.
312///
313/// Generic over any [`OutboxStore`] backend. The worker takes ownership
314/// of the store and a registry mapping `event_type` to its
315/// [`ErasedHandler`], then [`Self::start`] spawns the polling task and
316/// returns a [`JoinHandle`].
317pub struct OutboxWorker<S>
318where
319    S: OutboxStore,
320{
321    store: S,
322    handlers: Arc<HashMap<&'static str, Arc<dyn ErasedHandler>>>,
323    config: OutboxWorkerConfig,
324}
325
326impl<S> OutboxWorker<S>
327where
328    S: OutboxStore,
329{
330    /// Build a new worker.
331    #[must_use]
332    pub fn new(
333        store: S,
334        handlers: HashMap<&'static str, Arc<dyn ErasedHandler>>,
335        config: OutboxWorkerConfig,
336    ) -> Self {
337        Self {
338            store,
339            handlers: Arc::new(handlers),
340            config,
341        }
342    }
343
344    /// Returns the polling loop as a boxed `Send` future that the caller spawns.
345    ///
346    /// The future resolves to `Ok(())` once the supplied
347    /// [`CancellationToken`] is cancelled. Transient store errors are
348    /// logged via `tracing` and the loop continues. Typical usage:
349    ///
350    /// ```ignore
351    /// let cancel = CancellationToken::new();
352    /// let join = tokio::spawn(worker.run(cancel.clone()));
353    /// // ...
354    /// cancel.cancel();
355    /// join.await??;
356    /// ```
357    ///
358    /// The return type is boxed to work around a current Rust compiler
359    /// limitation around HRTB inference on GATs (see rust-lang/rust#100013).
360    pub fn run(
361        self,
362        cancel: CancellationToken,
363    ) -> Pin<Box<dyn Future<Output = Result<(), OutboxError>> + Send>>
364    where
365        for<'a> S::Tx<'a>: Send,
366    {
367        Box::pin(async move {
368            while !cancel.is_cancelled() {
369                let sleep_for = match self.poll_cycle(&cancel).await {
370                    Ok(0) => Some(self.config.poll_interval),
371                    Ok(_) => {
372                        if self.config.min_cycle_delay.is_zero() {
373                            None
374                        } else {
375                            Some(self.config.min_cycle_delay)
376                        }
377                    }
378                    Err(err) => {
379                        tracing::error!(
380                            error = ?err,
381                            "outbox worker poll cycle failed, sleeping before retry"
382                        );
383                        Some(self.config.poll_interval)
384                    }
385                };
386                if let Some(delay) = sleep_for {
387                    // Race the sleep against cancellation so a shutdown during
388                    // a long poll_interval is observed promptly instead of
389                    // after the full delay elapses (#231).
390                    tokio::select! {
391                        () = tokio::time::sleep(delay) => {}
392                        () = cancel.cancelled() => break,
393                    }
394                }
395            }
396            Ok(())
397        })
398    }
399
400    async fn poll_cycle(&self, cancel: &CancellationToken) -> Result<usize, OutboxError> {
401        // Phase 1 — claim (short transaction: SQL only, no handler I/O)
402        // The FOR UPDATE SKIP LOCKED locks are held only for the SELECT +
403        // the claim UPDATE, then released at commit. Competing workers can
404        // immediately pick up other rows.
405        let envelopes = {
406            let mut client = self.store.acquire().await?;
407            let mut tx = self.store.begin(&mut client).await?;
408            let batch = self
409                .store
410                .poll(&mut tx, self.config.batch_size, self.config.max_attempts)
411                .await?;
412            if !batch.is_empty() {
413                let ids: Vec<Uuid> = batch.iter().map(|e| e.event_id).collect();
414                self.store
415                    .claim(&mut tx, &ids, self.lease_for(ids.len()))
416                    .await?;
417            }
418            self.store.commit(tx).await?;
419            batch
420        };
421        let count = envelopes.len();
422
423        // Phase 2 — dispatch + ack (one transaction per envelope, outside the poll lock)
424        for envelope in &envelopes {
425            // Isolate per-envelope settle failures: a transient DB error while
426            // acking one envelope must not abandon the rest of the claimed
427            // batch (those rows would otherwise wait out the whole lease before
428            // being retried). Log and continue (#231).
429            if let Err(err) = self.dispatch_and_settle(envelope, cancel).await {
430                tracing::error!(
431                    event_id = %envelope.event_id,
432                    event_type = %envelope.event_type,
433                    error = ?err,
434                    "failed to settle outbox envelope; continuing with the rest of the batch"
435                );
436            }
437        }
438
439        Ok(count)
440    }
441
442    /// Soft-lease duration covering the worst-case sequential dispatch of a
443    /// whole claimed batch.
444    ///
445    /// A single batch is claimed once, then its envelopes are dispatched
446    /// sequentially. The lease must therefore outlast `batch_len` back-to-back
447    /// dispatches, each bounded by `dispatch_timeout`; sizing it as merely
448    /// `dispatch_timeout` lets the tail envelopes' lease expire mid-batch and a
449    /// competing worker double-dispatch them (#215).
450    fn lease_for(&self, batch_len: usize) -> Duration {
451        let factor = u32::try_from(batch_len.max(1)).unwrap_or(u32::MAX);
452        self.config.dispatch_timeout.saturating_mul(factor)
453    }
454
455    /// Dispatch a single envelope and settle its outcome in its own
456    /// transaction.
457    async fn dispatch_and_settle(
458        &self,
459        envelope: &OutboxEnvelope,
460        cancel: &CancellationToken,
461    ) -> Result<(), OutboxError> {
462        match self.dispatch(envelope, cancel).await {
463            Ok(()) => {
464                let mut client = self.store.acquire().await?;
465                let mut tx = self.store.begin(&mut client).await?;
466                self.store
467                    .mark_delivered(&mut tx, envelope.event_id)
468                    .await?;
469                self.store.commit(tx).await?;
470            }
471            Err(err) => {
472                let message = err.to_string();
473                tracing::warn!(
474                    event_id = %envelope.event_id,
475                    event_type = %envelope.event_type,
476                    error = %message,
477                    "outbox handler dispatch failed"
478                );
479                let retry_in = self.config.next_retry_delay(envelope.attempts);
480                let mut client = self.store.acquire().await?;
481                let mut tx = self.store.begin(&mut client).await?;
482                self.store
483                    .mark_failed(&mut tx, envelope.event_id, &message, retry_in)
484                    .await?;
485                if envelope.attempts + 1 >= self.config.max_attempts {
486                    tracing::error!(
487                        event_id = %envelope.event_id,
488                        event_type = %envelope.event_type,
489                        attempts = envelope.attempts + 1,
490                        "outbox envelope exhausted retry budget, moving to dead letter"
491                    );
492                    self.store
493                        .mark_dead_lettered(&mut tx, envelope.event_id, &message)
494                        .await?;
495                }
496                self.store.commit(tx).await?;
497            }
498        }
499        Ok(())
500    }
501
502    async fn dispatch(
503        &self,
504        envelope: &OutboxEnvelope,
505        cancel: &CancellationToken,
506    ) -> Result<(), OutboxError> {
507        let Some(handler) = self.handlers.get(envelope.event_type.as_str()) else {
508            return Err(OutboxError::MissingHandler {
509                event_type: envelope.event_type.clone(),
510            });
511        };
512
513        // Stable across retries: both IDs are pinned to the envelope's
514        // immutable event_id so every dispatch attempt of the same row
515        // presents an identical context to the handler.
516        //
517        // A child token lets a dispatch timeout cancel only this handler
518        // without tearing down the worker's shared token; the child is still
519        // cancelled if the parent (worker) token is cancelled.
520        let ctx = HandlerContext::new(
521            MessageId::from(envelope.event_id),
522            CorrelationId::from(envelope.event_id),
523        )
524        .with_cancellation(cancel.child_token());
525
526        tracing::debug!(
527            event_id = %envelope.event_id,
528            event_type = %envelope.event_type,
529            "dispatching outbox envelope"
530        );
531
532        // Enforce dispatch_timeout as a hard deadline so a hung handler cannot
533        // stall the worker forever (#229). The cancellation token is signalled
534        // first so a cooperative handler can unwind before the future is
535        // dropped.
536        match tokio::time::timeout(self.config.dispatch_timeout, handler.handle(envelope, &ctx))
537            .await
538        {
539            Ok(result) => result,
540            Err(_elapsed) => {
541                ctx.cancellation.cancel();
542                Err(OutboxError::DispatchTimeout {
543                    event_id: envelope.event_id,
544                    event_type: envelope.event_type.clone(),
545                    timeout: self.config.dispatch_timeout,
546                })
547            }
548        }
549    }
550}
551
552#[cfg(test)]
553mod tests {
554    use super::*;
555    use serde::Deserialize;
556    use serde::Serialize;
557    use std::sync::Mutex;
558    use std::sync::atomic::AtomicBool;
559    use std::sync::atomic::Ordering;
560
561    #[derive(Debug, Serialize, Deserialize, PartialEq)]
562    struct UserRegistered {
563        user_id: Uuid,
564    }
565
566    impl Event for UserRegistered {
567        const EVENT_TYPE: &'static str = "users.registered";
568    }
569
570    struct RecordingHandler {
571        seen: Arc<Mutex<Vec<Uuid>>>,
572    }
573
574    impl Handler<UserRegistered> for RecordingHandler {
575        type Error = OutboxError;
576        async fn handle(
577            &self,
578            event: UserRegistered,
579            _ctx: &HandlerContext,
580        ) -> Result<(), Self::Error> {
581            self.seen.lock().unwrap().push(event.user_id);
582            Ok(())
583        }
584    }
585
586    struct ContextCapturingHandler {
587        captured_ids: Arc<Mutex<Vec<MessageId>>>,
588    }
589
590    impl Handler<UserRegistered> for ContextCapturingHandler {
591        type Error = OutboxError;
592        async fn handle(
593            &self,
594            _event: UserRegistered,
595            ctx: &HandlerContext,
596        ) -> Result<(), Self::Error> {
597            self.captured_ids.lock().unwrap().push(ctx.message_id);
598            Ok(())
599        }
600    }
601
602    struct FailingHandler;
603    impl Handler<UserRegistered> for FailingHandler {
604        type Error = OutboxError;
605        async fn handle(
606            &self,
607            _event: UserRegistered,
608            _ctx: &HandlerContext,
609        ) -> Result<(), Self::Error> {
610            Err(OutboxError::Internal("forced".into()))
611        }
612    }
613
614    fn fresh_envelope(user_id: Uuid) -> OutboxEnvelope {
615        let publisher_test_event = UserRegistered { user_id };
616        OutboxEnvelope::new(Uuid::new_v4(), &publisher_test_event).unwrap()
617    }
618
619    #[tokio::test]
620    async fn typed_handler_decodes_envelope_and_calls_inner_handler() {
621        let seen = Arc::new(Mutex::new(Vec::<Uuid>::new()));
622        let handler = TypedHandler::new(RecordingHandler {
623            seen: Arc::clone(&seen),
624        });
625        let erased: Arc<dyn ErasedHandler> = Arc::new(handler);
626
627        let user_id = Uuid::from_u128(42);
628        let envelope = fresh_envelope(user_id);
629        let ctx = HandlerContext::new(MessageId::new(), CorrelationId::new());
630
631        erased
632            .handle(&envelope, &ctx)
633            .await
634            .expect("erased dispatch must succeed");
635
636        assert_eq!(seen.lock().unwrap().as_slice(), &[user_id]);
637    }
638
639    #[tokio::test]
640    async fn typed_handler_propagates_handler_error_as_outbox_error() {
641        let handler = TypedHandler::new(FailingHandler);
642        let erased: Arc<dyn ErasedHandler> = Arc::new(handler);
643
644        let envelope = fresh_envelope(Uuid::nil());
645        let ctx = HandlerContext::new(MessageId::new(), CorrelationId::new());
646
647        let err = erased.handle(&envelope, &ctx).await.expect_err("must fail");
648        assert!(matches!(err, OutboxError::Internal(_)));
649    }
650
651    #[test]
652    fn typed_handler_reports_event_type_from_const() {
653        let handler = TypedHandler::new(RecordingHandler {
654            seen: Arc::new(Mutex::new(Vec::new())),
655        });
656        assert_eq!(handler.event_type(), "users.registered");
657    }
658
659    #[test]
660    fn default_config_has_expected_values() {
661        let cfg = OutboxWorkerConfig::default();
662        assert_eq!(cfg.poll_interval, Duration::from_millis(100));
663        assert_eq!(cfg.batch_size, 10);
664        assert_eq!(cfg.max_attempts, 5);
665        assert_eq!(cfg.retry_base_delay, Duration::from_secs(1));
666        assert_eq!(cfg.retry_max_delay, Duration::from_secs(300));
667        assert!(cfg.jitter);
668        assert_eq!(cfg.min_cycle_delay, Duration::from_millis(5));
669        assert_eq!(cfg.dispatch_timeout, Duration::from_secs(30));
670    }
671
672    fn deterministic_config(base: Duration, max: Duration) -> OutboxWorkerConfig {
673        OutboxWorkerConfig {
674            retry_base_delay: base,
675            retry_max_delay: max,
676            jitter: false,
677            ..OutboxWorkerConfig::default()
678        }
679    }
680
681    #[test]
682    fn backoff_grows_exponentially_without_jitter() {
683        let cfg = deterministic_config(Duration::from_secs(1), Duration::from_secs(300));
684        assert_eq!(cfg.next_retry_delay(0), Duration::from_secs(1));
685        assert_eq!(cfg.next_retry_delay(1), Duration::from_secs(2));
686        assert_eq!(cfg.next_retry_delay(2), Duration::from_secs(4));
687        assert_eq!(cfg.next_retry_delay(3), Duration::from_secs(8));
688    }
689
690    #[test]
691    fn backoff_caps_at_max_delay() {
692        let cfg = deterministic_config(Duration::from_secs(1), Duration::from_secs(30));
693        assert_eq!(cfg.next_retry_delay(10), Duration::from_secs(30));
694        assert_eq!(cfg.next_retry_delay(100), Duration::from_secs(30));
695    }
696
697    #[test]
698    fn backoff_overflow_safe_for_large_attempts() {
699        let cfg = deterministic_config(Duration::from_secs(1), Duration::from_secs(300));
700        let delay = cfg.next_retry_delay(64);
701        assert_eq!(
702            delay,
703            Duration::from_secs(300),
704            "overflow must saturate at cap"
705        );
706    }
707
708    #[test]
709    fn backoff_jitter_stays_within_bounds() {
710        let base = Duration::from_secs(1);
711        let max = Duration::from_secs(30);
712        let cfg = OutboxWorkerConfig {
713            retry_base_delay: base,
714            retry_max_delay: max,
715            jitter: true,
716            ..OutboxWorkerConfig::default()
717        };
718        for attempts in 0u32..8 {
719            let delay = cfg.next_retry_delay(attempts);
720            let cap = base
721                .saturating_mul(1u32.checked_shl(attempts).unwrap_or(u32::MAX))
722                .min(max);
723            assert!(
724                delay <= cap,
725                "attempt {attempts}: jittered delay {delay:?} must be <= cap {cap:?}"
726            );
727        }
728    }
729
730    /// Store that records the virtual instant of the first empty poll so a
731    /// test can assert non-empty cycles were paced.
732    #[derive(Clone)]
733    struct PacingStore {
734        pending: Arc<Mutex<Vec<OutboxEnvelope>>>,
735        empty_poll_at: Arc<Mutex<Option<tokio::time::Instant>>>,
736    }
737
738    impl PacingStore {
739        fn new(initial: Vec<OutboxEnvelope>) -> Self {
740            Self {
741                pending: Arc::new(Mutex::new(initial)),
742                empty_poll_at: Arc::new(Mutex::new(None)),
743            }
744        }
745    }
746
747    #[async_trait::async_trait]
748    impl OutboxStore for PacingStore {
749        type Client = MockClient;
750        type Tx<'tx> = MockTx;
751
752        async fn acquire(&self) -> Result<Self::Client, OutboxError> {
753            Ok(MockClient)
754        }
755
756        async fn begin<'a>(
757            &self,
758            _client: &'a mut Self::Client,
759        ) -> Result<Self::Tx<'a>, OutboxError> {
760            Ok(MockTx)
761        }
762
763        async fn poll<'a>(
764            &self,
765            _tx: &mut Self::Tx<'a>,
766            batch_size: usize,
767            _max_attempts: u32,
768        ) -> Result<Vec<OutboxEnvelope>, OutboxError> {
769            let mut pending = self.pending.lock().unwrap();
770            let take = batch_size.min(pending.len());
771            let batch: Vec<OutboxEnvelope> = pending.drain(..take).collect();
772            if batch.is_empty() {
773                let mut slot = self.empty_poll_at.lock().unwrap();
774                if slot.is_none() {
775                    *slot = Some(tokio::time::Instant::now());
776                }
777            }
778            Ok(batch)
779        }
780
781        async fn mark_delivered<'a>(
782            &self,
783            _tx: &mut Self::Tx<'a>,
784            _event_id: Uuid,
785        ) -> Result<(), OutboxError> {
786            Ok(())
787        }
788
789        async fn mark_failed<'a>(
790            &self,
791            _tx: &mut Self::Tx<'a>,
792            _event_id: Uuid,
793            _error: &str,
794            _retry_in: Duration,
795        ) -> Result<(), OutboxError> {
796            Ok(())
797        }
798
799        async fn commit<'a>(&self, _tx: Self::Tx<'a>) -> Result<(), OutboxError> {
800            Ok(())
801        }
802    }
803
804    #[tokio::test(start_paused = true)]
805    async fn run_paces_consecutive_non_empty_cycles() {
806        let non_empty_cycles: u32 = 4;
807        let envelopes: Vec<OutboxEnvelope> = (0..non_empty_cycles)
808            .map(|i| fresh_envelope(Uuid::from_u128(u128::from(i) + 1)))
809            .collect();
810        let store = PacingStore::new(envelopes);
811        let empty_poll_at = Arc::clone(&store.empty_poll_at);
812
813        let delay = Duration::from_millis(10);
814        let config = OutboxWorkerConfig {
815            poll_interval: Duration::from_secs(3600),
816            batch_size: 1,
817            min_cycle_delay: delay,
818            ..OutboxWorkerConfig::default()
819        };
820
821        let registry = registry_with(vec![Arc::new(TypedHandler::new(RecordingHandler {
822            seen: Arc::new(Mutex::new(Vec::new())),
823        }))]);
824        let worker = OutboxWorker::new(store, registry, config);
825
826        let cancel = CancellationToken::new();
827        let start = tokio::time::Instant::now();
828        let join = tokio::spawn(worker.run(cancel.clone()));
829
830        tokio::time::sleep(delay * non_empty_cycles + Duration::from_millis(1)).await;
831        cancel.cancel();
832        join.await.unwrap().unwrap();
833
834        let empty_at = empty_poll_at
835            .lock()
836            .unwrap()
837            .expect("the loop should have reached the empty poll");
838        assert_eq!(
839            empty_at.duration_since(start),
840            delay * non_empty_cycles,
841            "each non-empty cycle must be paced by min_cycle_delay before the empty poll"
842        );
843    }
844
845    /// `MockStore` lets us drive the worker without a real database in
846    /// unit tests. Integration testing of the SQL semantics happens in
847    /// `hexeract-outbox-sql` via testcontainers.
848    #[derive(Clone)]
849    struct MockStore {
850        pending: Arc<Mutex<Vec<OutboxEnvelope>>>,
851        delivered: Arc<Mutex<Vec<Uuid>>>,
852        failed: Arc<Mutex<Vec<(Uuid, String)>>>,
853        dead_lettered: Arc<Mutex<Vec<Uuid>>>,
854        claimed: Arc<Mutex<Vec<Uuid>>>,
855        fail_claim: Arc<AtomicBool>,
856        /// When set, `mark_delivered` returns an error for this event id once,
857        /// simulating a transient ack failure (used to test batch isolation).
858        fail_mark_delivered_for: Arc<Mutex<Option<Uuid>>>,
859    }
860
861    impl MockStore {
862        fn new(initial: Vec<OutboxEnvelope>) -> Self {
863            Self {
864                pending: Arc::new(Mutex::new(initial)),
865                delivered: Arc::new(Mutex::new(Vec::new())),
866                failed: Arc::new(Mutex::new(Vec::new())),
867                dead_lettered: Arc::new(Mutex::new(Vec::new())),
868                claimed: Arc::new(Mutex::new(Vec::new())),
869                fail_claim: Arc::new(AtomicBool::new(false)),
870                fail_mark_delivered_for: Arc::new(Mutex::new(None)),
871            }
872        }
873    }
874
875    struct MockClient;
876    struct MockTx;
877
878    #[async_trait::async_trait]
879    impl OutboxStore for MockStore {
880        type Client = MockClient;
881        type Tx<'tx> = MockTx;
882
883        async fn acquire(&self) -> Result<Self::Client, OutboxError> {
884            Ok(MockClient)
885        }
886
887        async fn begin<'a>(
888            &self,
889            _client: &'a mut Self::Client,
890        ) -> Result<Self::Tx<'a>, OutboxError> {
891            Ok(MockTx)
892        }
893
894        async fn poll<'a>(
895            &self,
896            _tx: &mut Self::Tx<'a>,
897            batch_size: usize,
898            _max_attempts: u32,
899        ) -> Result<Vec<OutboxEnvelope>, OutboxError> {
900            let mut pending = self.pending.lock().unwrap();
901            let take = batch_size.min(pending.len());
902            Ok(pending.drain(..take).collect())
903        }
904
905        async fn mark_delivered<'a>(
906            &self,
907            _tx: &mut Self::Tx<'a>,
908            event_id: Uuid,
909        ) -> Result<(), OutboxError> {
910            {
911                let mut slot = self.fail_mark_delivered_for.lock().unwrap();
912                if *slot == Some(event_id) {
913                    *slot = None;
914                    return Err(OutboxError::PoolTimeout);
915                }
916            }
917            self.delivered.lock().unwrap().push(event_id);
918            Ok(())
919        }
920
921        async fn mark_failed<'a>(
922            &self,
923            _tx: &mut Self::Tx<'a>,
924            event_id: Uuid,
925            error: &str,
926            _retry_in: Duration,
927        ) -> Result<(), OutboxError> {
928            self.failed
929                .lock()
930                .unwrap()
931                .push((event_id, error.to_owned()));
932            Ok(())
933        }
934
935        async fn mark_dead_lettered<'a>(
936            &self,
937            _tx: &mut Self::Tx<'a>,
938            event_id: Uuid,
939            _error: &str,
940        ) -> Result<(), OutboxError> {
941            self.dead_lettered.lock().unwrap().push(event_id);
942            Ok(())
943        }
944
945        async fn commit<'a>(&self, _tx: Self::Tx<'a>) -> Result<(), OutboxError> {
946            Ok(())
947        }
948
949        async fn claim<'a>(
950            &self,
951            _tx: &mut Self::Tx<'a>,
952            event_ids: &[Uuid],
953            _lease_for: Duration,
954        ) -> Result<(), OutboxError> {
955            if self.fail_claim.load(Ordering::Relaxed) {
956                return Err(OutboxError::Internal("claim failed".into()));
957            }
958            self.claimed.lock().unwrap().extend_from_slice(event_ids);
959            Ok(())
960        }
961    }
962
963    fn registry_with(
964        handlers: Vec<Arc<dyn ErasedHandler>>,
965    ) -> HashMap<&'static str, Arc<dyn ErasedHandler>> {
966        let mut map = HashMap::new();
967        for handler in handlers {
968            map.insert(handler.event_type(), handler);
969        }
970        map
971    }
972
973    #[tokio::test]
974    async fn worker_dispatches_pending_envelopes_and_marks_delivered() {
975        let envelopes = vec![
976            fresh_envelope(Uuid::from_u128(1)),
977            fresh_envelope(Uuid::from_u128(2)),
978        ];
979        let event_ids: Vec<Uuid> = envelopes.iter().map(|e| e.event_id).collect();
980        let store = MockStore::new(envelopes);
981
982        let seen = Arc::new(Mutex::new(Vec::new()));
983        let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(RecordingHandler {
984            seen: Arc::clone(&seen),
985        }));
986        let registry = registry_with(vec![handler]);
987
988        let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
989        let cancel = CancellationToken::new();
990        let join = tokio::spawn(worker.run(cancel.clone()));
991
992        tokio::time::sleep(Duration::from_millis(200)).await;
993        cancel.cancel();
994        join.await.unwrap().unwrap();
995
996        assert_eq!(seen.lock().unwrap().len(), 2);
997        assert_eq!(
998            store.delivered.lock().unwrap().as_slice(),
999            event_ids.as_slice()
1000        );
1001        assert!(store.failed.lock().unwrap().is_empty());
1002    }
1003
1004    #[tokio::test]
1005    async fn worker_marks_failed_when_handler_errors() {
1006        let envelope = fresh_envelope(Uuid::from_u128(1));
1007        let event_id = envelope.event_id;
1008        let store = MockStore::new(vec![envelope]);
1009
1010        let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(FailingHandler));
1011        let registry = registry_with(vec![handler]);
1012
1013        let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
1014        let cancel = CancellationToken::new();
1015        let join = tokio::spawn(worker.run(cancel.clone()));
1016
1017        tokio::time::sleep(Duration::from_millis(200)).await;
1018        cancel.cancel();
1019        join.await.unwrap().unwrap();
1020
1021        assert!(store.delivered.lock().unwrap().is_empty());
1022        let failed = store.failed.lock().unwrap();
1023        assert_eq!(failed.len(), 1);
1024        assert_eq!(failed[0].0, event_id);
1025        assert!(failed[0].1.contains("forced"));
1026    }
1027
1028    #[tokio::test]
1029    async fn worker_marks_failed_when_no_handler_registered() {
1030        let envelope = fresh_envelope(Uuid::from_u128(1));
1031        let event_id = envelope.event_id;
1032        let store = MockStore::new(vec![envelope]);
1033
1034        let registry = HashMap::new();
1035
1036        let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
1037        let cancel = CancellationToken::new();
1038        let join = tokio::spawn(worker.run(cancel.clone()));
1039
1040        tokio::time::sleep(Duration::from_millis(200)).await;
1041        cancel.cancel();
1042        join.await.unwrap().unwrap();
1043
1044        let failed = store.failed.lock().unwrap();
1045        assert_eq!(failed.len(), 1);
1046        assert_eq!(failed[0].0, event_id);
1047        assert!(failed[0].1.contains("no handler"));
1048    }
1049
1050    #[tokio::test]
1051    async fn worker_dead_letters_envelope_when_max_attempts_exhausted() {
1052        let envelope = fresh_envelope(Uuid::from_u128(1));
1053        let event_id = envelope.event_id;
1054        let store = MockStore::new(vec![envelope]);
1055
1056        let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(FailingHandler));
1057        let registry = registry_with(vec![handler]);
1058
1059        let config = OutboxWorkerConfig {
1060            max_attempts: 1,
1061            batch_size: 1,
1062            ..OutboxWorkerConfig::default()
1063        };
1064        let worker = OutboxWorker::new(store.clone(), registry, config);
1065        let cancel = CancellationToken::new();
1066        let join = tokio::spawn(worker.run(cancel.clone()));
1067
1068        tokio::time::sleep(Duration::from_millis(200)).await;
1069        cancel.cancel();
1070        join.await.unwrap().unwrap();
1071
1072        let failed = store.failed.lock().unwrap();
1073        assert_eq!(failed.len(), 1);
1074        assert_eq!(failed[0].0, event_id);
1075        let dead = store.dead_lettered.lock().unwrap();
1076        assert_eq!(dead.as_slice(), &[event_id]);
1077    }
1078
1079    #[tokio::test]
1080    async fn worker_does_not_dead_letter_before_attempts_exhausted() {
1081        let envelope = fresh_envelope(Uuid::from_u128(1));
1082        let event_id = envelope.event_id;
1083        let store = MockStore::new(vec![envelope]);
1084
1085        let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(FailingHandler));
1086        let registry = registry_with(vec![handler]);
1087
1088        let config = OutboxWorkerConfig {
1089            max_attempts: 3,
1090            batch_size: 1,
1091            ..OutboxWorkerConfig::default()
1092        };
1093        let worker = OutboxWorker::new(store.clone(), registry, config);
1094        let cancel = CancellationToken::new();
1095        let join = tokio::spawn(worker.run(cancel.clone()));
1096
1097        tokio::time::sleep(Duration::from_millis(200)).await;
1098        cancel.cancel();
1099        join.await.unwrap().unwrap();
1100
1101        let failed = store.failed.lock().unwrap();
1102        assert_eq!(failed.len(), 1);
1103        assert_eq!(failed[0].0, event_id);
1104        assert!(
1105            store.dead_lettered.lock().unwrap().is_empty(),
1106            "should not dead-letter when attempts(1) < max_attempts(3)"
1107        );
1108    }
1109
1110    #[tokio::test]
1111    async fn worker_claims_envelopes_before_dispatch() {
1112        let envelopes = vec![
1113            fresh_envelope(Uuid::from_u128(1)),
1114            fresh_envelope(Uuid::from_u128(2)),
1115        ];
1116        let expected_ids: Vec<Uuid> = envelopes.iter().map(|e| e.event_id).collect();
1117        let store = MockStore::new(envelopes);
1118
1119        let seen = Arc::new(Mutex::new(Vec::new()));
1120        let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(RecordingHandler {
1121            seen: Arc::clone(&seen),
1122        }));
1123        let registry = registry_with(vec![handler]);
1124
1125        let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
1126        let cancel = CancellationToken::new();
1127        let join = tokio::spawn(worker.run(cancel.clone()));
1128
1129        tokio::time::sleep(Duration::from_millis(200)).await;
1130        cancel.cancel();
1131        join.await.unwrap().unwrap();
1132
1133        let claimed = store.claimed.lock().unwrap();
1134        assert_eq!(
1135            claimed.as_slice(),
1136            expected_ids.as_slice(),
1137            "claim must be called with all polled ids"
1138        );
1139    }
1140
1141    #[tokio::test]
1142    async fn worker_does_not_claim_when_batch_is_empty() {
1143        let store = MockStore::new(Vec::new());
1144        let registry = HashMap::new();
1145
1146        let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
1147        let cancel = CancellationToken::new();
1148        let join = tokio::spawn(worker.run(cancel.clone()));
1149
1150        tokio::time::sleep(Duration::from_millis(150)).await;
1151        cancel.cancel();
1152        join.await.unwrap().unwrap();
1153
1154        assert!(
1155            store.claimed.lock().unwrap().is_empty(),
1156            "claim must not be called when no envelopes are polled"
1157        );
1158    }
1159
1160    #[tokio::test]
1161    async fn worker_aborts_poll_cycle_when_claim_fails_without_dispatching() {
1162        let envelope = fresh_envelope(Uuid::from_u128(1));
1163        let store = MockStore::new(vec![envelope]);
1164        store.fail_claim.store(true, Ordering::Relaxed);
1165
1166        let seen = Arc::new(Mutex::new(Vec::new()));
1167        let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(RecordingHandler {
1168            seen: Arc::clone(&seen),
1169        }));
1170        let registry = registry_with(vec![handler]);
1171
1172        let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
1173        let cancel = CancellationToken::new();
1174        let join = tokio::spawn(worker.run(cancel.clone()));
1175
1176        tokio::time::sleep(Duration::from_millis(200)).await;
1177        cancel.cancel();
1178        join.await.unwrap().unwrap();
1179
1180        assert!(
1181            seen.lock().unwrap().is_empty(),
1182            "handler must not be called when claim fails"
1183        );
1184        assert!(
1185            store.delivered.lock().unwrap().is_empty(),
1186            "envelope must not be marked delivered when claim fails"
1187        );
1188        assert!(
1189            store.claimed.lock().unwrap().is_empty(),
1190            "claim ids must not be recorded when claim returns an error"
1191        );
1192    }
1193
1194    #[tokio::test]
1195    async fn worker_derives_context_ids_from_event_id_stable_across_retries() {
1196        let event_id = Uuid::from_u128(99);
1197        let e1 = OutboxEnvelope::new(
1198            event_id,
1199            &UserRegistered {
1200                user_id: Uuid::from_u128(1),
1201            },
1202        )
1203        .unwrap();
1204        let e2 = OutboxEnvelope::new(
1205            event_id,
1206            &UserRegistered {
1207                user_id: Uuid::from_u128(2),
1208            },
1209        )
1210        .unwrap();
1211        let store = MockStore::new(vec![e1, e2]);
1212
1213        let captured_ids = Arc::new(Mutex::new(Vec::new()));
1214        let handler: Arc<dyn ErasedHandler> =
1215            Arc::new(TypedHandler::new(ContextCapturingHandler {
1216                captured_ids: Arc::clone(&captured_ids),
1217            }));
1218        let registry = registry_with(vec![handler]);
1219
1220        let worker = OutboxWorker::new(store.clone(), registry, OutboxWorkerConfig::default());
1221        let cancel = CancellationToken::new();
1222        let join = tokio::spawn(worker.run(cancel.clone()));
1223
1224        tokio::time::sleep(Duration::from_millis(200)).await;
1225        cancel.cancel();
1226        join.await.unwrap().unwrap();
1227
1228        let ids = captured_ids.lock().unwrap();
1229        assert_eq!(ids.len(), 2, "both envelopes must be dispatched");
1230        assert_eq!(
1231            ids[0], ids[1],
1232            "message_id must be identical across dispatches of the same event_id"
1233        );
1234        assert_eq!(
1235            ids[0],
1236            MessageId::from(event_id),
1237            "message_id must equal MessageId::from(event_id)"
1238        );
1239    }
1240
1241    #[tokio::test]
1242    async fn worker_stops_promptly_on_cancellation() {
1243        let store = MockStore::new(Vec::new());
1244        let registry = HashMap::new();
1245        let worker = OutboxWorker::new(store, registry, OutboxWorkerConfig::default());
1246        let cancel = CancellationToken::new();
1247        let join = tokio::spawn(worker.run(cancel.clone()));
1248
1249        cancel.cancel();
1250        let started = std::time::Instant::now();
1251        join.await.unwrap().unwrap();
1252        assert!(
1253            started.elapsed() < Duration::from_secs(1),
1254            "worker took {:?} to stop",
1255            started.elapsed()
1256        );
1257    }
1258
1259    /// Handler that never resolves, used to prove `dispatch_timeout` is
1260    /// enforced as a hard deadline (#229).
1261    struct HangingHandler;
1262    impl Handler<UserRegistered> for HangingHandler {
1263        type Error = OutboxError;
1264        async fn handle(
1265            &self,
1266            _event: UserRegistered,
1267            _ctx: &HandlerContext,
1268        ) -> Result<(), Self::Error> {
1269            std::future::pending::<()>().await;
1270            unreachable!("hanging handler never resolves")
1271        }
1272    }
1273
1274    #[tokio::test(start_paused = true)]
1275    async fn worker_enforces_dispatch_timeout_on_a_hung_handler() {
1276        // #229: a handler that never returns must not stall the worker. With
1277        // dispatch_timeout enforced, the envelope is marked failed instead.
1278        let envelope = fresh_envelope(Uuid::from_u128(1));
1279        let event_id = envelope.event_id;
1280        let store = MockStore::new(vec![envelope]);
1281
1282        let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(HangingHandler));
1283        let registry = registry_with(vec![handler]);
1284
1285        let config = OutboxWorkerConfig {
1286            dispatch_timeout: Duration::from_millis(50),
1287            batch_size: 1,
1288            ..OutboxWorkerConfig::default()
1289        };
1290        let worker = OutboxWorker::new(store.clone(), registry, config);
1291        let cancel = CancellationToken::new();
1292        let join = tokio::spawn(worker.run(cancel.clone()));
1293
1294        // Advance virtual time well past the dispatch timeout.
1295        tokio::time::sleep(Duration::from_millis(500)).await;
1296        cancel.cancel();
1297        join.await.unwrap().unwrap();
1298
1299        let failed = store.failed.lock().unwrap();
1300        assert_eq!(failed.len(), 1, "hung handler must be recorded as failed");
1301        assert_eq!(failed[0].0, event_id);
1302        assert!(
1303            failed[0].1.contains("timed out"),
1304            "failure must be the dispatch timeout, got {:?}",
1305            failed[0].1
1306        );
1307        assert!(
1308            store.delivered.lock().unwrap().is_empty(),
1309            "a timed-out envelope must not be marked delivered"
1310        );
1311    }
1312
1313    #[tokio::test]
1314    async fn worker_settles_remaining_batch_when_one_ack_fails() {
1315        // #231: a transient ack failure on one envelope must not abandon the
1316        // rest of the claimed batch.
1317        let e1 = fresh_envelope(Uuid::from_u128(1));
1318        let e2 = fresh_envelope(Uuid::from_u128(2));
1319        let id1 = e1.event_id;
1320        let id2 = e2.event_id;
1321        let store = MockStore::new(vec![e1, e2]);
1322        *store.fail_mark_delivered_for.lock().unwrap() = Some(id1);
1323
1324        let seen = Arc::new(Mutex::new(Vec::new()));
1325        let handler: Arc<dyn ErasedHandler> = Arc::new(TypedHandler::new(RecordingHandler {
1326            seen: Arc::clone(&seen),
1327        }));
1328        let registry = registry_with(vec![handler]);
1329
1330        let config = OutboxWorkerConfig {
1331            batch_size: 2,
1332            ..OutboxWorkerConfig::default()
1333        };
1334        let worker = OutboxWorker::new(store.clone(), registry, config);
1335        let cancel = CancellationToken::new();
1336        let join = tokio::spawn(worker.run(cancel.clone()));
1337
1338        tokio::time::sleep(Duration::from_millis(200)).await;
1339        cancel.cancel();
1340        join.await.unwrap().unwrap();
1341
1342        // The first envelope's ack failed (transient), but the second must
1343        // still have been dispatched and marked delivered.
1344        let delivered = store.delivered.lock().unwrap();
1345        assert!(
1346            delivered.contains(&id2),
1347            "second envelope must be delivered despite the first ack failing, got {delivered:?}"
1348        );
1349        assert!(
1350            seen.lock().unwrap().len() >= 2,
1351            "both envelopes must have been dispatched to the handler"
1352        );
1353    }
1354
1355    #[tokio::test(start_paused = true)]
1356    async fn worker_observes_cancellation_during_a_long_poll_interval() {
1357        // #231 defect 1: cancellation arriving during the poll sleep must be
1358        // observed promptly, not after the full poll_interval elapses.
1359        let store = MockStore::new(Vec::new());
1360        let registry = HashMap::new();
1361        let config = OutboxWorkerConfig {
1362            poll_interval: Duration::from_secs(3600),
1363            ..OutboxWorkerConfig::default()
1364        };
1365        let worker = OutboxWorker::new(store, registry, config);
1366        let cancel = CancellationToken::new();
1367        let join = tokio::spawn(worker.run(cancel.clone()));
1368
1369        // Let the worker reach the sleep, then cancel mid-sleep.
1370        tokio::time::sleep(Duration::from_millis(1)).await;
1371        cancel.cancel();
1372
1373        // Without racing the sleep against cancellation this would hang for the
1374        // full hour of virtual time; with the fix it returns immediately.
1375        tokio::time::timeout(Duration::from_secs(1), join)
1376            .await
1377            .expect("worker must stop without waiting out the poll interval")
1378            .unwrap()
1379            .unwrap();
1380    }
1381
1382    #[test]
1383    fn lease_for_scales_with_batch_size() {
1384        // #215: the batch lease must cover the worst-case sequential dispatch
1385        // of the whole batch, i.e. batch_len * dispatch_timeout.
1386        let config = OutboxWorkerConfig {
1387            dispatch_timeout: Duration::from_secs(30),
1388            ..OutboxWorkerConfig::default()
1389        };
1390        let store = MockStore::new(Vec::new());
1391        let worker = OutboxWorker::new(store, HashMap::new(), config);
1392
1393        assert_eq!(worker.lease_for(1), Duration::from_secs(30));
1394        assert_eq!(worker.lease_for(10), Duration::from_secs(300));
1395        // An empty batch is never claimed, but the helper must stay safe.
1396        assert_eq!(worker.lease_for(0), Duration::from_secs(30));
1397    }
1398}